Compare commits

...

116 Commits

Author SHA1 Message Date
YannC
3f552713a2 chore: upgrade to version 0.18.17 2025-03-31 18:32:37 +02:00
nKwiatkowski
91a0fafd61 fix(test): change EOL centOS docker image 2025-03-19 14:23:46 +01:00
YannC
ff975faa05 fix(runner-memory): delete MemorySchedulerTriggerState back due to cherry-pick 2025-03-06 15:30:10 +01:00
YannC
8113a0b8b2 fix(): align to EE 2025-03-06 11:14:20 +01:00
Florian Hussonnois
578f264214 chore: upgrade to version 0.18.16 2025-01-28 15:39:53 +01:00
Florian Hussonnois
d89ee01f12 fix(webserver): ensure queues are not closed in nioEventLoop 2025-01-27 14:45:11 +01:00
YannC
e83b9fe1c3 chore: upgrade to version v0.18.15 2025-01-24 10:08:48 +01:00
brian.mulier
427d682439 chore(version): version 0.18.14 2024-12-19 11:31:49 +01:00
Loïc Mathieu
b36cfb6f6b chore(version): version 0.18.13 2024-12-13 11:44:29 +01:00
Loïc Mathieu
58fc4a72ff feat(core,jdbc): small trigger / scheduler improvements 2024-12-13 11:41:13 +01:00
Loïc Mathieu
20f4a51aca fix(core): serialize default inputs 2024-12-13 11:38:50 +01:00
Loïc Mathieu
bdef59632f Revert "feat(core): remove the execution state from the scheduler (#1588)"
This reverts commit f7d3d0bcd4.
2024-11-28 14:40:28 +01:00
YannC
ac678b88e4 chore: upgrade to version 0.18.12 2024-11-05 21:47:10 +01:00
Loïc Mathieu
6683cb21b7 fix(webserver): don't load the flow too early so a user with only EXECUTION permission can access execution files
Fixes https://github.com/kestra-io/kestra/issues/4958
2024-11-05 12:56:39 +01:00
Loïc Mathieu
67ad4800e3 chore: version 0.18.11 2024-10-15 14:05:35 +02:00
Loïc Mathieu
2f56df4031 fix(jdbc): always include deleted the the logs and metrics queries
Even if not needed to be sure we use the correct index.
2024-10-15 14:04:30 +02:00
YannC
f838f58068 chore: upgrade to v0.18.10 2024-10-04 09:47:11 +02:00
Yoann Vernageau
963b0ca0a5 feat(core): add label rendering when an execution is triggered (flow and schedule only) (#5025)
* feat: add label rendering when an execution is triggered (flow and schedule only)

* fix: handle IllegalVariableEvaluationException when rendering

* test: add unit tests

* fix: omit label if rendering fails
2024-10-04 09:46:45 +02:00
YannC
977fff5e7f chore: update version to v0.18.9 2024-09-24 16:26:39 +02:00
YannC
38d221e630 fix(core): Slash/backslash windows issue when listing all folders (#5065) 2024-09-24 16:01:01 +02:00
YannC
07a10f3797 fix(jdbc): compatible with Postgres 14 (#5019)
close #4825
2024-09-23 13:51:35 +02:00
brian.mulier
1f2c87a11b fix(ui): route was null in NamespaceDependenciesWrapper in production build
closes kestra-io/kestra-ee#1764
2024-09-20 18:04:03 +02:00
YannC
2cf9104a31 fix(webserver): missing params in file paths (#4966)
close #4958
2024-09-20 11:28:55 +02:00
Loïc Mathieu
75887213c0 chore: build the release branch on each push 2024-09-19 15:17:41 +02:00
Loïc Mathieu
e90f8aa4df chore: version 0.18.8 2024-09-19 15:12:16 +02:00
yuri1969
dd393f8e36 fix(core): make flow plugin defaults override global ones
This handles the OSS precedence.
2024-09-19 14:44:33 +02:00
Loïc Mathieu
d9d9bdd3a9 fix(core): always add the secret consumer
This causes a WARNING in the log for some cases, with the risk to have a secret leaked into the logs.

Fixes https://github.com/kestra-io/kestra-ee/issues/1741
2024-09-19 14:43:44 +02:00
Loïc Mathieu
6f638df047 chore(ui): improve labeling of namespaces in listing (#4957) 2024-09-19 14:43:24 +02:00
Loïc Mathieu
a185f44f01 chore(deps-dev): bump vite from 5.4.5 to 5.4.6 in /ui (#4963)
Bumps [vite](https://github.com/vitejs/vite/tree/HEAD/packages/vite) from 5.4.5 to 5.4.6.
- [Release notes](https://github.com/vitejs/vite/releases)
- [Changelog](https://github.com/vitejs/vite/blob/v5.4.6/packages/vite/CHANGELOG.md)
- [Commits](https://github.com/vitejs/vite/commits/v5.4.6/packages/vite)

---
updated-dependencies:
- dependency-name: vite
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-19 14:42:56 +02:00
Miloš Paunović
fac26d1dec fix(ui): allow sorting triggers table on flow name (#4949) 2024-09-19 14:41:55 +02:00
Miloš Paunović
351269a4c5 fix(ui): make sure the talk to us dialogs do not stack on eachother (#4930) 2024-09-19 14:37:08 +02:00
Loïc Mathieu
330b26d920 fix(ui): filter system flows' executions tab for system data (#4929) 2024-09-19 14:36:48 +02:00
Loïc Mathieu
ad3ac7b4ab fix(ui): filter system namespace overview tab for system data (#4928) 2024-09-19 14:36:11 +02:00
Miloš Paunović
73735144aa chore(ui): remove blueprint browser tags on namespace section (#4926) 2024-09-19 14:35:19 +02:00
Florian Hussonnois
5669cf5c77 chore: update version to v0.18.7 2024-09-13 16:04:56 +02:00
Miloš Paunović
45b1c18b48 chore(ui): add whitespace changes highlighting into diff editor (#4917) 2024-09-13 14:27:34 +02:00
MilosPaunovic
7166addcec fix(ui): better handling of reflecting changes in code from topology 2024-09-13 14:27:23 +02:00
MilosPaunovic
ea72c20388 fix(ui): remove passing input prop to editor 2024-09-13 14:27:11 +02:00
Miloš Paunović
bfe17d0a66 fix(ui): enabled editor content mouse scroll (#4912) 2024-09-13 14:26:59 +02:00
YannC
a3430c1185 fix(core): use subgroup package infos when available (#4908)
close kestra-io/docs#1456
2024-09-13 14:26:48 +02:00
Florian Hussonnois
cc6ccc1fff chore: update version to v0.18.6 2024-09-12 17:36:29 +02:00
Florian Hussonnois
b4999c7cea fix(core): fix compilation after commit merge 2024-09-12 17:34:48 +02:00
brian.mulier
31823ca6d8 fix(ui): fix duplicate import 2024-09-12 17:34:25 +02:00
Florian Hussonnois
1ba67db2dd fix(webserver): don't cpnsume inputs on errors
Consuming inputs on error seems to be the right things to do ... but it causes a memory leak and can totally hang the webserver pretty quickly with the error: java.lang.OutOfMemoryError: Cannot reserve 65562 bytes of direct buffer memory.

Removing the consumption of inputs seems to fix the issue.

Co-authored-by: loicmathieu
2024-09-12 16:34:16 +02:00
Miloš Paunović
561a71e862 fix(ui): don't duplicate read-only warning if existing (#4907) 2024-09-12 16:08:41 +02:00
Miloš Paunović
7f39623930 chore(ui): bump ui-libs for colors fix to take place (#4903) 2024-09-12 16:05:52 +02:00
Miloš Paunović
bc5d2032ea fix(ui): reflect changes in code after making them in topology (#4901) 2024-09-12 16:05:37 +02:00
Miloš Paunović
7772aef729 fix(ui): amend trigger logs flickering (#4894) 2024-09-12 16:05:07 +02:00
Loïc Mathieu
44938e907a fix(core): handle array in outputs
Fixes #4769
2024-09-12 16:00:54 +02:00
Loïc Mathieu
86c4388ac0 fix(core, script): check default enable to not FALSE
As it's true via Lombok Builder.Default but if you set include or exclude without setting enabled: true, the deserializaiton will set it to null.

Fixes #4867
2024-09-12 16:00:30 +02:00
Loïc Mathieu
384a1b8b9b fix(cli): allow HTTP header up to 32k 2024-09-12 15:59:55 +02:00
Miloš Paunović
7f66d57eed chore(ui): add proper axios parameters to send cookies (#4888) 2024-09-12 15:59:37 +02:00
Loïc Mathieu
e38a04fc64 fix(jdbc): handle duplicate key on flow topology save
We usually always include `onDuplicateKeyUpdate()` when we save an object in the database, it was missing for batch insert of flow topologies.

Fixes #4825
2024-09-12 15:59:18 +02:00
Miloš Paunović
1d08bda2c9 fix(ui): pass namespace parameter to file creation (#4876) 2024-09-12 15:58:55 +02:00
Miloš Paunović
b78d04b359 feat(ui): double click on blueprint tag now resets filter (#4875) 2024-09-12 15:58:42 +02:00
Miloš Paunović
83d0705fe4 fix(ui): properly handle blueprints (#4874) 2024-09-12 15:58:12 +02:00
Miloš Paunović
d1c606b8e1 fix(ui): re-import date related component (#4839) 2024-09-12 15:54:31 +02:00
brian.mulier
3bca40a82d fix(core): add flow revision to failed execution in case of trigger evaluation failure 2024-09-12 15:54:15 +02:00
Miloš Paunović
45070bb9fb chore(ui): regular dependency update (#4860)
* chore(deps): move dependabot scan from sunday to monday morning

* chore(deps): regular dependecy update
2024-09-12 15:53:14 +02:00
YannC
b0254f3997 chore: update version to v0.18.5 2024-09-05 13:59:14 +02:00
Loïc Mathieu
788dc1376a fix(webserver): webhook hang indefinitly for 404 status
When we send a 404 status by retuning HttpReponse.notFound(), the webhook hang forever for POST request (but works for GET).

Throwing an HttpStatusException fix the issue, it seems to be caused by a bug in Micronaut, I will try to create a reproducer and open an issue upstream by meanwhile this fixes the issue.
2024-09-05 13:58:10 +02:00
Miloš Paunović
1b0e3cbbd6 chore(ui): add more padding between log lines (#4819) 2024-09-05 13:58:02 +02:00
Loïc Mathieu
5bbc235677 fix(core): ForEachItem NPE when no items
Fixes #4278
2024-09-05 13:57:54 +02:00
Miloš Paunović
e0f8fcbcde chore(ui): reorder namespace tabs (#4817) 2024-09-05 13:57:36 +02:00
Loïc Mathieu
87a667dc27 fix(core): flow validation can NPE 2024-09-05 13:57:32 +02:00
yuri
81d54ef423 feat(ui): make executions view more compact (#4728) 2024-09-05 13:55:03 +02:00
Loïc Mathieu
abe2086dbd fix(core): render output files
Fixes https://github.com/kestra-io/kestra/issues/4759
2024-09-05 13:54:44 +02:00
brian.mulier
7c0b0b14ec chore: version 0.18.4 2024-08-29 15:23:21 +02:00
brian.mulier
7061eef48c fix(ui): task runner can be filled through low code editor
closes kestra-io/kestra-ee#1477
2024-08-29 15:23:21 +02:00
brian.mulier
3c35c8a48f fix(ui): remove empty arrays overriding default values upon task validation in low code
closes #4758
2024-08-29 15:23:21 +02:00
Miloš Paunović
0d4e8f2c26 fix(ui): load blueprint from proper url (#4749) 2024-08-29 15:23:21 +02:00
yuri
e1b7f1ce16 chore(ui): improve copy as curl functionality (#4696) 2024-08-29 15:23:21 +02:00
Miloš Paunović
095e29ee18 fix(ui): add optional check if string exists 2024-08-29 15:23:21 +02:00
Loïc Mathieu
4ede173331 chore: publish latest-full tag 2024-08-22 15:13:53 +02:00
Loïc Mathieu
442306f82e chore: version 0.18.3 2024-08-22 14:56:40 +02:00
Loïc Mathieu
19fda0d1a4 fix(core): failed expression on a trigger condition crash the scheduler
Fixes https://github.com/kestra-io/kestra/issues/4629
2024-08-22 14:49:46 +02:00
Loïc Mathieu
549e3d7bec feat(jdbc): avoid loading all excutions in memory 2024-08-22 14:49:41 +02:00
Loïc Mathieu
880bc67756 feat(jdbc): avoid loading all excutions in memory
Fixes https://github.com/kestra-io/kestra-ee/issues/1262
2024-08-22 14:49:34 +02:00
YannC
249bae8a01 fix(ui): fixed tab error when viewing blueprint details
close kestra-io/kestra-ee#1530
2024-08-22 14:48:46 +02:00
YannC
089b9ee0ca fix(ui): avoid empty navbar in blueprint details view 2024-08-22 14:48:37 +02:00
YannC
615d42db62 fix(core): In nested flowable, return 0 instead of null output for iterationCount in waitFor task (#4700)
close #4657
2024-08-22 14:47:47 +02:00
Florian Hussonnois
a70ca97912 fix(mysql): temproray fix to optimize fetch from queue table
This commit adds a temporary fix to replace FIND_IN_SET with
a IN clause in order to properly use the index on the consumers column
2024-08-22 14:47:37 +02:00
audunsol
f2718797d0 Update PurgeLogs.java
Fixed `WARNING` => `WARN`


Following this example gave me this error:

```
Illegal flow yaml: Cannot deserialize value of type org.slf4j.event.Level from String “WARNING”: not one of the values accepted for Enum class: [ERROR, DEBUG, TRACE, INFO, WARN]
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: io.kestra.core.models.flows.Flow[“tasks”]->java.util.ArrayList[0]->io.kestra.plugin.core.log.PurgeLogs[“logLevels”]->java.util.ArrayList[3])
```

See also: https://www.slf4j.org/api/org/slf4j/event/Level.html
2024-08-22 14:47:27 +02:00
Loïc Mathieu
04ce98e0ef fix(jdbc-postgres): remove not-needed serialization in Postgres queue 2024-08-22 14:47:10 +02:00
Ludovic DEHON
d9e18d5e13 fix(webserver): create execution don't display error
close #4673
2024-08-22 14:46:48 +02:00
Miloš Paunović
b9aafb37d4 chore(ui): improve outputs debugging expression logic (#4682) 2024-08-22 14:46:16 +02:00
Miloš Paunović
6c9833936a feat(ui): add the ability to scroll the outputs error log and copy to clipboard (#4681) 2024-08-22 14:46:03 +02:00
Miloš Paunović
d48f5407e8 chore(ui): improve parameter passing to child component (#4678) 2024-08-22 14:45:49 +02:00
Miloš Paunović
6e3e4810f9 feat(ui): add restart button on execution logs page (#4672) 2024-08-22 14:45:36 +02:00
Miloš Paunović
8afaec885c feat(ui): made output of type uri clickable (#4671) 2024-08-22 14:45:17 +02:00
Miloš Paunović
afc76509f2 chore(test): add data-test-id attribute to global loader 2024-08-22 14:45:05 +02:00
Miloš Paunović
63a8b69221 chore(ui): sort plugins list alphabetically (#4670) 2024-08-22 14:44:56 +02:00
Miloš Paunović
92f5b3face chore(ui): editor sticky scroll to follow theme coloring (#4669) 2024-08-22 14:44:44 +02:00
Miloš Paunović
1a3343ff6c fix(ui): only set editor tabs to dirty if not on creation (#4668) 2024-08-22 14:44:34 +02:00
Milos Paunovic
73a82d1cd5 fix(ui): make sure blueprint base url is always present in component 2024-08-22 14:44:24 +02:00
Miloš Paunović
f1ce76ff9f fix(ui): improved the editor behaviour on switching views (#4667) 2024-08-22 14:44:11 +02:00
Florian Hussonnois
81817291ee chore: bump version to 0.18.2 2024-08-14 18:22:44 +02:00
brian.mulier
9646a42ea7 fix(tests): modify mocks for taskCommands to match the new implementation 2024-08-14 18:22:44 +02:00
Florian Hussonnois
1253d96c8a chore: make DefaultPluginRegistry extendable 2024-08-14 17:25:06 +02:00
YannC
3e40a56c1c fix(core): change property options to values for multiselect, deprecated the older and will be removed in 0.20 2024-08-14 17:13:34 +02:00
YannC
de2467012c fix(ui): prevent event propagate and provide blueprint source in query (#4651)
close #4487
2024-08-14 17:07:43 +02:00
Miloš Paunović
a38bf61c3b fix(ui): add check if property exists on blueprint details page 2024-08-14 17:02:21 +02:00
brian.mulier
92a323a36c fix(script): include directories in docker task runner volume 2024-08-14 17:01:39 +02:00
Loïc Mathieu
d102033a2f fix(core): if empty then
Fixes #4601
2024-08-14 17:01:10 +02:00
yuri
654d62118c feat(ui): add shortcut to Output Debugger (#4612)
Use Ctrl+Enter to run the debug expression.
2024-08-14 16:52:26 +02:00
Miloš Paunović
f9186b29b4 chore(ui): clean template by removing unused data properties (#4615) 2024-08-14 16:52:15 +02:00
brian.mulier
2ee9b9f069 chore: update version to v0.18.1 2024-08-08 17:57:29 +02:00
Miloš Paunović
d57d9dd3e8 fix(ui): graciously handle situation with no tutorial flows loaded (#4609) 2024-08-08 17:56:45 +02:00
Loïc Mathieu
ae15cef7b7 fix(core,jdbc): PurgeLog with levels in postgres
Fixes #4604
2024-08-08 17:55:56 +02:00
eric
0e46055884 fix(ui): improve date handling per locale (#4600) 2024-08-08 17:55:15 +02:00
Miloš Paunović
f91b070dca chore(ui): only show delete logs button on flow logs tab (#4591) 2024-08-08 17:55:07 +02:00
Miloš Paunović
d964d49861 chore(deps): regular dependency updates (#4577) 2024-08-08 17:54:59 +02:00
Ludovic DEHON
dd45545202 fix(core): remove useless logger on jsonfilter 2024-08-08 17:54:51 +02:00
Florian Hussonnois
8d2589485f fix(core): fix NPE for K/V tasks and WorkingDirectory (#4592)
Fix: #4592
2024-08-08 17:54:42 +02:00
YannC
78069b45f8 chore: update version to v0.18.0 2024-08-06 19:41:13 +02:00
YannC
7a13ed79d8 chore(ci): disabled docker image build 2024-08-06 18:41:01 +02:00
131 changed files with 3254 additions and 1296 deletions

View File

@@ -22,12 +22,21 @@ updates:
- "dependency-upgrade"
open-pull-requests-limit: 50
# Maintain dependencies for Npm modules
# Maintain dependencies for NPM modules
- package-ecosystem: "npm"
directory: "/ui"
schedule:
# Check for updates to Npm modules every week
interval: "weekly"
labels:
- "dependency-upgrade"
day: "monday"
time: "08:00"
timezone: "Europe/Paris"
open-pull-requests-limit: 50
labels: ["dependency-upgrade"]
reviewers: ["MilosPaunovic"]
ignore:
# Ignore updates of version 1.x, as we're using beta of 2.x
- dependency-name: "vue-virtual-scroller"
versions: ["1.x"]
# Ignore major versions greater than 8, as it's still known to be flaky
- dependency-name: "eslint"
versions: [">8"]

View File

@@ -109,6 +109,11 @@ jobs:
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
- name: Retag latest to latest-full
if: ${{ github.event.inputs.retag-latest == 'true' && matrix.image.name == ''}}
run: |
regctl image copy kestra/kestra:latest kestra/kestra:latest-full
end:
runs-on: ubuntu-latest
needs:

View File

@@ -4,6 +4,7 @@ on:
push:
branches:
- develop
- releases/*
tags:
- v*
pull_request:
@@ -117,10 +118,12 @@ jobs:
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
uses: docker/setup-buildx-action@v3
# Docker Build
- name: Build & Export Docker Image
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
uses: docker/build-push-action@v6
with:
context: .
@@ -275,7 +278,7 @@ jobs:
release:
name: Github Release
runs-on: ubuntu-latest
needs: [ check, check-e2e ]
needs: build-artifacts
if: startsWith(github.ref, 'refs/tags/v')
steps:
# Download Exec
@@ -367,7 +370,7 @@ jobs:
maven:
name: Publish to Maven
runs-on: ubuntu-latest
needs: [check, check-e2e]
needs: [check]
if: github.ref == 'refs/heads/develop' || startsWith(github.ref, 'refs/tags/v')
steps:
- uses: actions/checkout@v4

View File

@@ -22,6 +22,7 @@ micronaut:
idle-timeout: 60m
netty:
max-chunk-size: 10MB
max-header-size: 32768 # increased from the default of 8k
responses:
file:
cache-seconds: 86400

View File

@@ -2,6 +2,7 @@ package io.kestra.core.docs;
import com.google.common.base.CaseFormat;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.tasks.runners.TaskRunner;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -144,6 +145,10 @@ public abstract class AbstractClassDocumentation<T> {
if (AbstractRetry.class.isAssignableFrom(Class.forName(key))) {
return true;
}
if (TaskRunner.class.isAssignableFrom(Class.forName(key))) {
return true;
}
} catch (ClassNotFoundException ignored) {
log.debug(ignored.getMessage(), ignored);
}

View File

@@ -37,13 +37,16 @@ public class Plugin {
public static Plugin of(RegisteredPlugin registeredPlugin, @Nullable String subgroup) {
Plugin plugin = new Plugin();
plugin.name = registeredPlugin.name();
PluginSubGroup subGroupInfos = null;
if (subgroup == null) {
plugin.title = registeredPlugin.title();
} else {
plugin.title = subgroup.substring(subgroup.lastIndexOf('.') + 1);
subGroupInfos = registeredPlugin.allClass().stream().filter(c -> c.getName().contains(subgroup)).map(clazz -> clazz.getPackage().getDeclaredAnnotation(PluginSubGroup.class)).toList().getFirst();
plugin.title = !subGroupInfos.title().isEmpty() ? subGroupInfos.title() : subgroup.substring(subgroup.lastIndexOf('.') + 1);;
}
plugin.group = registeredPlugin.group();
plugin.description = registeredPlugin.description();
plugin.description = subGroupInfos != null && !subGroupInfos.description().isEmpty() ? subGroupInfos.description() : registeredPlugin.description();
plugin.license = registeredPlugin.license();
plugin.longDescription = registeredPlugin.longDescription();
plugin.version = registeredPlugin.version();
@@ -59,7 +62,9 @@ public class Plugin {
e.getValue().toString()
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
plugin.categories = registeredPlugin
plugin.categories = subGroupInfos != null ?
Arrays.stream(subGroupInfos.categories()).toList() :
registeredPlugin
.allClass()
.stream()
.map(clazz -> clazz.getPackage().getDeclaredAnnotation(PluginSubGroup.class))

View File

@@ -61,14 +61,13 @@ public class LogEntry implements DeletedInterface, TenantInterface {
@Builder.Default
boolean deleted = false;
public static List<String> findLevelsByMin(Level minLevel) {
public static List<Level> findLevelsByMin(Level minLevel) {
if (minLevel == null) {
return Arrays.stream(Level.values()).map(Enum::name).toList();
return Arrays.asList(Level.values());
}
return Arrays.stream(Level.values())
.filter(level -> level.toInt() >= minLevel.toInt())
.map(Enum::name)
.toList();
}

View File

@@ -209,6 +209,7 @@ public class Flow extends AbstractFlow {
public List<String> allTriggerIds() {
return this.triggers != null ? this.triggers.stream()
.map(AbstractTrigger::getId)
.filter(id -> id != null) // this can happen when validation a flow under creation
.collect(Collectors.toList()) : Collections.emptyList();
}

View File

@@ -1,6 +1,5 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@@ -39,7 +38,6 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@JsonSubTypes.Type(value = MultiselectInput.class, name = "MULTISELECT")
})
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class Input<T> implements Data {
@NotNull
@NotBlank

View File

@@ -6,7 +6,6 @@ import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -18,11 +17,19 @@ import java.util.List;
@Getter
@NoArgsConstructor
public class MultiselectInput extends Input<List<String>> implements ItemTypeInterface {
@Schema(
title = "Deprecated, please use `values` instead."
)
// @NotNull
@Deprecated
List<@Regex String> options;
@Schema(
title = "List of values available."
)
@NotNull
List<@Regex String> options;
// FIXME: REMOVE `options` in 0.20 and bring back the NotNull
// @NotNull
List<@Regex String> values;
@Schema(
title = "Type of the different values available.",
@@ -33,10 +40,21 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
@Override
public void validate(List<String> inputs) throws ConstraintViolationException {
if (values != null && options != null) {
throw ManualConstraintViolation.toConstraintViolationException(
"you can't define both `values` and `options`",
this,
MultiselectInput.class,
getId(),
""
);
}
for(String input : inputs){
if (!options.contains(input)) {
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
if (!finalValues.contains(input)) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + options + "`",
"it must match the values `" + finalValues + "`",
this,
MultiselectInput.class,
getId(),

View File

@@ -40,13 +40,17 @@ public interface TaskCommands {
TargetOS getTargetOS();
default List<Path> relativeWorkingDirectoryFilesPaths() throws IOException {
return this.relativeWorkingDirectoryFilesPaths(false);
}
default List<Path> relativeWorkingDirectoryFilesPaths(boolean includeDirectories) throws IOException {
Path workingDirectory = this.getWorkingDirectory();
if (workingDirectory == null) {
return Collections.emptyList();
}
try (Stream<Path> walk = Files.walk(workingDirectory)) {
Stream<Path> filtered = walk.filter(path -> !Files.isDirectory(path));
Stream<Path> filtered = includeDirectories ? walk : walk.filter(path -> !Files.isDirectory(path));
Path outputDirectory = this.getOutputDirectory();
if (outputDirectory != null) {
filtered = filtered.filter(Predicate.not(path -> path.startsWith(outputDirectory)));

View File

@@ -23,9 +23,6 @@ public class Trigger extends TriggerContext {
@Nullable
private String executionId;
@Nullable
private State.Type executionCurrentState;
@Nullable
private Instant updatedDate;
@@ -39,7 +36,6 @@ public class Trigger extends TriggerContext {
protected Trigger(TriggerBuilder<?, ?> b) {
super(b);
this.executionId = b.executionId;
this.executionCurrentState = b.executionCurrentState;
this.updatedDate = b.updatedDate;
this.evaluateRunningDate = b.evaluateRunningDate;
}
@@ -141,7 +137,6 @@ public class Trigger extends TriggerContext {
.date(trigger.getDate())
.nextExecutionDate(trigger.getNextExecutionDate())
.executionId(execution.getId())
.executionCurrentState(execution.getState().getCurrent())
.updatedDate(Instant.now())
.backfill(trigger.getBackfill())
.stopAfter(trigger.getStopAfter())

View File

@@ -18,7 +18,7 @@ import java.util.function.Predicate;
* @see io.kestra.core.plugins.serdes.PluginDeserializer
* @see PluginScanner
*/
public final class DefaultPluginRegistry implements PluginRegistry {
public class DefaultPluginRegistry implements PluginRegistry {
private static class LazyHolder {
static final DefaultPluginRegistry INSTANCE = new DefaultPluginRegistry();
@@ -43,7 +43,7 @@ public final class DefaultPluginRegistry implements PluginRegistry {
return instance;
}
private DefaultPluginRegistry() {
protected DefaultPluginRegistry() {
}
private boolean isInitialized() {
@@ -53,7 +53,7 @@ public final class DefaultPluginRegistry implements PluginRegistry {
/**
* Initializes the registry by loading all core plugins.
*/
private void init() {
protected void init() {
if (initialized.compareAndSet(false, true)) {
register(scanner.scan());
}

View File

@@ -32,7 +32,7 @@ public interface FlowRepositoryInterface {
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
Optional.of(execution.getFlowRevision())
Optional.ofNullable(execution.getFlowRevision())
);
if (find.isEmpty()) {
@@ -50,7 +50,7 @@ public interface FlowRepositoryInterface {
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
Optional.of(execution.getFlowRevision())
Optional.ofNullable(execution.getFlowRevision())
);
if (find.isEmpty()) {

View File

@@ -144,18 +144,16 @@ public class DefaultRunContext extends RunContext {
@Override
public DefaultRunContext clone() {
DefaultRunContext runContext = new DefaultRunContext();
runContext.variableRenderer = this.variableRenderer;
runContext.applicationContext = this.applicationContext;
runContext.storageInterface = this.storageInterface;
runContext.storage = this.storage;
runContext.variables = new HashMap<>(this.variables);
runContext.metrics = new ArrayList<>();
runContext.meterRegistry = this.meterRegistry;
runContext.workingDir = this.workingDir;
runContext.logger = this.logger;
runContext.metrics = new ArrayList<>();
runContext.storage = this.storage;
runContext.pluginConfiguration = this.pluginConfiguration;
runContext.version = version;
runContext.isInitialized.set(this.isInitialized.get());
if (this.isInitialized.get()) {
//Inject all services
runContext.init(applicationContext);
}
return runContext;
}

View File

@@ -358,11 +358,11 @@ public class ExecutorService {
.withState(executor.getExecution().guessFinalState(flow));
if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
try {
Map<String, Object> outputs = flow.getOutputs()
.stream()
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
outputs = runContext.render(outputs);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
newExecution = newExecution.withOutputs(outputs);
@@ -374,8 +374,8 @@ public class ExecutorService {
"Failed to render output values",
e
);
newExecution = newExecution
.withState(State.Type.FAILED);
runContext.logger().error("Failed to render output values: {}", e.getMessage(), e);
newExecution = newExecution.withState(State.Type.FAILED);
}
}

View File

@@ -58,15 +58,19 @@ public abstract class FilesService {
return inputFiles;
}
public static Map<String, URI> outputFiles(RunContext runContext, List<String> outputs) throws Exception {
List<Path> allFilesMatching = runContext.workingDir().findAllFilesMatching(outputs);
var outputFiles = allFilesMatching.stream()
.map(throwFunction(path -> new AbstractMap.SimpleEntry<>(
runContext.workingDir().path().relativize(path).toString(),
runContext.storage().putFile(path.toFile(), resolveUniqueNameForFile(path))
)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
runContext.logger().info("Captured {} output(s).", allFilesMatching.size());
public static Map<String, URI> outputFiles(RunContext runContext, List<String> outputs) throws Exception {
List<String> renderedOutputs = outputs != null ? runContext.render(outputs) : null;
List<Path> allFilesMatching = runContext.workingDir().findAllFilesMatching(renderedOutputs);
var outputFiles = allFilesMatching.stream()
.map(throwFunction(path -> new AbstractMap.SimpleEntry<>(
runContext.workingDir().path().relativize(path).toString(),
runContext.storage().putFile(path.toFile(), resolveUniqueNameForFile(path))
)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (runContext.logger().isTraceEnabled()) {
runContext.logger().trace("Captured {} output(s).", allFilesMatching.size());
}
return outputFiles;
}

View File

@@ -42,6 +42,10 @@ public interface FlowExecutorInterface {
* WARNING: this method will NOT check if the namespace is allowed, so it should not be used inside a task.
*/
default Optional<Flow> findByExecution(Execution execution) {
if (execution.getFlowRevision() == null) {
return Optional.empty();
}
return this.findById(
execution.getTenantId(),
execution.getNamespace(),

View File

@@ -290,10 +290,16 @@ public class FlowInputOutput {
}
}
case ARRAY, MULTISELECT -> {
List<?> asList;
if (current instanceof List<?> list) {
asList = list;
} else {
asList = JacksonMapper.toList(((String) current));
}
if (elementType != null) {
// recursively parse the elements only once
yield JacksonMapper.toList(((String) current))
.stream()
yield asList.stream()
.map(throwFunction(element -> {
try {
return parseType(execution, elementType, id, null, element);
@@ -303,7 +309,7 @@ public class FlowInputOutput {
}))
.toList();
} else {
yield JacksonMapper.toList(((String) current));
yield asList;
}
}
};

View File

@@ -274,9 +274,10 @@ public final class RunVariables {
// adds any additional variables
if (variables != null) {
builder.putAll(variables);
if (logger != null && !variables.containsKey(RunVariables.SECRET_CONSUMER_VARIABLE_NAME)) {
builder.put(RunVariables.SECRET_CONSUMER_VARIABLE_NAME, (Consumer<String>) logger::usedSecret);
}
}
if (logger != null && (variables == null || !variables.containsKey(RunVariables.SECRET_CONSUMER_VARIABLE_NAME))) {
builder.put(RunVariables.SECRET_CONSUMER_VARIABLE_NAME, (Consumer<String>) logger::usedSecret);
}
return builder.build();

View File

@@ -61,6 +61,7 @@ public class Extension extends AbstractExtension {
return operators;
}
@SuppressWarnings("deprecation")
@Override
public Map<String, Filter> getFilters() {
Map<String, Filter> filters = new HashMap<>();
@@ -104,6 +105,7 @@ public class Extension extends AbstractExtension {
return tests;
}
@SuppressWarnings("deprecation")
@Override
public Map<String, Function> getFunctions() {
Map<String, Function> functions = new HashMap<>();

View File

@@ -10,11 +10,8 @@ import java.util.Map;
@Slf4j
@Deprecated
public class JsonFilter extends ToJsonFilter {
@Override
public Object apply(Object input, Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) throws PebbleException {
log.warn("The 'json' filter is deprecated, please use 'toJson' instead");
return super.apply(input, args, self, context, lineNumber);
}
}

View File

@@ -9,11 +9,8 @@ import java.util.Map;
@Slf4j
@Deprecated
public class JsonFunction extends FromJsonFunction {
@Override
public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
log.warn("The 'json' function is deprecated, please use 'fromJson' instead");
return super.execute(args, self, context, lineNumber);
}
}

View File

@@ -73,6 +73,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private final PluginDefaultService pluginDefaultService;
private final WorkerGroupService workerGroupService;
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
private volatile Boolean isReady = false;
@@ -344,7 +345,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
logError(conditionContext, flow, abstractTrigger, e);
return null;
}
this.triggerState.save(triggerContext, scheduleContext);
this.triggerState.save(triggerContext, scheduleContext, "/kestra/services/scheduler/compute-schedulable/save/lastTrigger-nextDate-null");
} else {
triggerContext = lastTrigger;
}
@@ -433,11 +434,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
)
.build()
)
.peek(f -> {
if (f.getTriggerContext().getEvaluateRunningDate() != null || !isExecutionNotRunning(f)) {
this.triggerState.unlock(f.getTriggerContext());
}
})
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
.filter(this::isExecutionNotRunning)
.map(FlowWithWorkerTriggerNextDate::of)
@@ -473,7 +469,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
var flowWithTrigger = f.toBuilder().triggerContext(triggerRunning).build();
try {
this.triggerState.save(triggerRunning, scheduleContext);
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
this.sendWorkerTriggerToWorker(flowWithTrigger);
} catch (InternalException e) {
logService.logTrigger(
@@ -498,7 +494,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
);
trigger = trigger.checkBackfill();
this.triggerState.save(trigger, scheduleContext);
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
}
} else {
logService.logTrigger(
@@ -516,9 +512,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
logError(f, e);
}
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
this.triggerState.save(trigger, scheduleContext);
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-false");
}
} catch (InternalException ie) {
} catch (Exception ie) {
// validate schedule condition can fail to render variables
// in this case, we send a failed execution so the trigger is not evaluated each second.
logger.error("Unable to evaluate the trigger '{}'", f.getAbstractTrigger().getId(), ie);
@@ -527,12 +523,13 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.tenantId(f.getTriggerContext().getTenantId())
.namespace(f.getTriggerContext().getNamespace())
.flowId(f.getTriggerContext().getFlowId())
.flowRevision(f.getFlow().getRevision())
.labels(f.getFlow().getLabels())
.state(new State().withState(State.Type.FAILED))
.build();
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
}
});
});
@@ -572,7 +569,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
// So we must save them by passing the scheduleContext.
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handleEvaluateSchedulingTriggerResult/save"));
}
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {
@@ -593,8 +590,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
return true;
}
// The execution is not yet started, we skip
if (lastTrigger.getExecutionCurrentState() == null) {
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());
// executionState hasn't received the execution, we skip
if (execution.isEmpty()) {
if (lastTrigger.getUpdatedDate() != null) {
metricRegistry
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
@@ -628,7 +627,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
Level.DEBUG,
"Execution '{}' is still '{}', updated at '{}'",
lastTrigger.getExecutionId(),
lastTrigger.getExecutionCurrentState(),
execution.get().getState().getCurrent(),
lastTrigger.getUpdatedDate()
);
}

View File

@@ -1,4 +1,14 @@
package io.kestra.core.schedulers;
import java.util.function.Consumer;
/**
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
* See AbstractScheduler.handle().
*/
public interface ScheduleContextInterface {
/**
* Do trigger retrieval and updating in a single transaction.
*/
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
}

View File

@@ -0,0 +1,19 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
@Inject
private ExecutionRepositoryInterface executionRepository;
@Override
public Optional<Execution> findById(String tenantId, String id) {
return executionRepository.findById(tenantId, id);
}
}

View File

@@ -0,0 +1,9 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import java.util.Optional;
public interface SchedulerExecutionStateInterface {
Optional<Execution> findById(String tenantId, String id);
}

View File

@@ -20,19 +20,22 @@ public interface SchedulerTriggerStateInterface {
Trigger create(Trigger trigger) throws ConstraintViolationException;
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext, String headerContent) throws ConstraintViolationException;
Trigger create(Trigger trigger, String headerContent) throws ConstraintViolationException;
Trigger update(Trigger trigger);
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
/**
* Used by the JDBC implementation: find triggers in all tenants.
*/
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
/**
* Required for Kafka
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
*/
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
/**
* Required for Kafka
*/
void unlock(Trigger trigger);
}

View File

@@ -66,6 +66,10 @@ public class PluginDefaultService {
protected List<PluginDefault> mergeAllDefaults(Flow flow) {
List<PluginDefault> list = new ArrayList<>();
if (flow.getPluginDefaults() != null) {
list.addAll(flow.getPluginDefaults());
}
if (taskGlobalDefault != null && taskGlobalDefault.getDefaults() != null) {
if (warnOnce.compareAndSet(false, true)) {
log.warn("Global Task Defaults are deprecated, please use Global Plugin Defaults instead via the 'kestra.plugins.defaults' property.");
@@ -77,10 +81,6 @@ public class PluginDefaultService {
list.addAll(pluginGlobalDefault.getDefaults());
}
if (flow.getPluginDefaults() != null) {
list.addAll(flow.getPluginDefaults());
}
return list;
}

View File

@@ -84,7 +84,7 @@ public class InternalNamespace implements Namespace {
**/
@Override
public List<NamespaceFile> all(final String prefix, final boolean includeDirectories) throws IOException {
URI namespacePrefix = URI.create(NamespaceFile.of(namespace, Optional.ofNullable(prefix).map(Path::of).orElse(null)).storagePath() + "/");
URI namespacePrefix = URI.create(NamespaceFile.of(namespace, Optional.ofNullable(prefix).map(Path::of).orElse(null)).storagePath().toString().replace("\\","/") + "/");
return storage.allByPrefix(tenant, namespacePrefix, includeDirectories)
.stream()
.map(uri -> new NamespaceFile(relativize(uri), uri, namespace))

View File

@@ -561,6 +561,11 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
// get the list of splits from the outputs of the split task
String taskId = this.id.substring(0, this.id.lastIndexOf('_')) + ForEachItemExecutable.SUFFIX;
var taskOutput = extractOutput(runContext, taskId);
if (taskOutput == null) {
// there were no subflow executions
return null;
}
Integer iterations = (Integer) taskOutput.get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES);
String subflowOutputsBaseUri = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);

View File

@@ -18,6 +18,7 @@ import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.TruthUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
@@ -158,7 +159,7 @@ public class If extends Task implements FlowableTask<VoidOutput> {
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTask = this.childTasks(runContext, parentTaskRun);
if (childTask == null) {
if (ListUtils.isEmpty(childTask)) {
// no next task to run, we guess the state from the parent task
return Optional.of(execution.guessFinalState(null, parentTaskRun, this.isAllowFailure()));
}

View File

@@ -159,7 +159,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
return false;
}
Integer iterationCount = (Integer) parentTaskRun.getOutputs().get("iterationCount");
Integer iterationCount = Optional.ofNullable(parentTaskRun.getOutputs())
.map(outputs -> (Integer) outputs.get("iterationCount"))
.orElse(0);
if (this.checkFrequency.maxIterations != null && iterationCount != null && iterationCount > this.checkFrequency.maxIterations) {
if (printLog) {logger.warn("Max iterations reached");}
return true;
@@ -225,7 +227,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
public WaitFor.Output outputs(TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
String value = parentTaskRun != null ?
parentTaskRun.getOutputs().get("iterationCount").toString() : "0";
String.valueOf(Optional.ofNullable(parentTaskRun.getOutputs())
.map(outputs -> outputs.get("iterationCount"))
.orElse("0")) : "0";
return Output.builder()
.iterationCount(Integer.parseInt(value) + 1)

View File

@@ -28,11 +28,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -64,14 +60,14 @@ import jakarta.validation.constraints.NotNull;
full = true,
title = "Clone a Git repository into the Working Directory and run a Python script in a Docker container.",
code = """
id: gitPython
id: git_python
namespace: company.team
tasks:
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: cloneRepository
- id: clone_repository
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/examples
branch: main
@@ -82,100 +78,103 @@ import jakarta.validation.constraints.NotNull;
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
commands:
- python scripts/etl_script.py"""
- python scripts/etl_script.py
"""
),
@Example(
full = true,
title = "Add input and output files within a Working Directory to use them in a Python script.",
code = """
id: apiJSONtoMongoDB
id: api_json_to_mongodb
namespace: company.team
tasks:
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
outputFiles:
- output.json
inputFiles:
query.sql: |
SELECT sum(total) as total, avg(quantity) as avg_quantity
FROM sales;
tasks:
- id: inlineScript
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: python:3.11-slim
beforeCommands:
- pip install requests kestra > /dev/null
warningOnStdErr: false
script: |
import requests
import json
from kestra import Kestra
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
outputFiles:
- output.json
inputFiles:
query.sql: |
SELECT sum(total) as total, avg(quantity) as avg_quantity
FROM sales;
tasks:
- id: inline_script
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: python:3.11-slim
beforeCommands:
- pip install requests kestra > /dev/null
warningOnStdErr: false
script: |
import requests
import json
from kestra import Kestra
with open('query.sql', 'r') as input_file:
sql = input_file.read()
with open('query.sql', 'r') as input_file:
sql = input_file.read()
response = requests.get('https://api.github.com')
data = response.json()
response = requests.get('https://api.github.com')
data = response.json()
with open('output.json', 'w') as output_file:
json.dump(data, output_file)
with open('output.json', 'w') as output_file:
json.dump(data, output_file)
Kestra.outputs({'receivedSQL': sql, 'status': response.status_code})
Kestra.outputs({'receivedSQL': sql, 'status': response.status_code})
- id: loadToMongoDB
type: io.kestra.plugin.mongodb.Load
connection:
uri: mongodb://host.docker.internal:27017/
database: local
collection: github
from: "{{ outputs.wdir.uris['output.json'] }}"
- id: load_to_mongodb
type: io.kestra.plugin.mongodb.Load
connection:
uri: mongodb://host.docker.internal:27017/
database: local
collection: github
from: "{{ outputs.wdir.uris['output.json'] }}"
"""
),
@Example(
full = true,
code = {
"id: working-directory",
"namespace: company.team",
"",
"tasks:",
" - id: working-directory",
" type: io.kestra.plugin.core.flow.WorkingDirectory",
" tasks:",
" - id: first",
" type: io.kestra.plugin.scripts.shell.Commands",
" commands:",
" - 'echo \"{{ taskrun.id }}\" > {{ workingDir }}/stay.txt'",
" - id: second",
" type: io.kestra.plugin.scripts.shell.Commands",
" commands:",
" - |",
" echo '::{\"outputs\": {\"stay\":\"'$(cat {{ workingDir }}/stay.txt)'\"}}::'"
}
code = """
id: working_directory
namespace: company.team
tasks:
- id: working_directory
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: first
type: io.kestra.plugin.scripts.shell.Commands
commands:
- 'echo "{{ taskrun.id }}" > {{ workingDir }}/stay.txt'
- id: second
type: io.kestra.plugin.scripts.shell.Commands
commands:
- |
echo '::{"outputs": {"stay":"'$(cat {{ workingDir }}/stay.txt)'"}}::''
"""
),
@Example(
full = true,
title = "A working directory with a cache of the node_modules directory.",
code = """
id: node-with-cache
id: node_with_cache
namespace: company.team
tasks:
- id: working-dir
- id: working_dir
type: io.kestra.plugin.core.flow.WorkingDirectory
cache:
patterns:
- node_modules/**
ttl: PT1H
tasks:
- id: script
type: io.kestra.plugin.scripts.node.Script
beforeCommands:
- npm install colors
script: |
const colors = require("colors");
console.log(colors.red("Hello"));"""
- id: script
type: io.kestra.plugin.scripts.node.Script
beforeCommands:
- npm install colors
script: |
const colors = require("colors");
console.log(colors.red("Hello"));
"""
)
},
aliases = {"io.kestra.core.tasks.flows.WorkingDirectory", "io.kestra.core.tasks.flows.Worker"}
@@ -263,7 +262,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
}
}
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
if (this.namespaceFiles != null && !Boolean.FALSE.equals(this.namespaceFiles.getEnabled())) {
runContext.storage()
.namespace()
.findAllFilesMatching(this.namespaceFiles.getInclude(), this.namespaceFiles.getExclude())
@@ -333,9 +332,9 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
}
archive.finish();
File archiveFile = File.createTempFile("archive", ".zip");
Files.write(archiveFile.toPath(), bos.toByteArray());
URI uri = runContext.storage().putCacheFile(archiveFile, getId(), taskRun.getValue());
Path archiveFile = runContext.workingDir().createTempFile( ".zip");
Files.write(archiveFile, bos.toByteArray());
URI uri = runContext.storage().putCacheFile(archiveFile.toFile(), getId(), taskRun.getValue());
runContext.logger().debug("Caching in {}", uri);
}
} else {

View File

@@ -46,7 +46,7 @@ import java.util.List;
"- TRACE",
"- DEBUG",
"- INFO",
"- WARNING",
"- WARN",
}
)
}

View File

@@ -1,7 +1,11 @@
package io.kestra.plugin.core.trigger;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.ArrayList;
import java.util.List;
import lombok.*;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.Example;
@@ -91,7 +95,7 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.labels(flow.getLabels())
.labels(generateLabels(runContext, flow))
.state(new State())
.trigger(ExecutionTrigger.of(
this,
@@ -128,6 +132,34 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
}
}
private List<Label> generateLabels(RunContext runContext, io.kestra.core.models.flows.Flow flow) {
final List<Label> labels = new ArrayList<>();
if (flow.getLabels() != null) {
labels.addAll(flow.getLabels()); // no need for rendering
}
if (this.getLabels() != null) {
for (Label label : this.getLabels()) {
final var value = renderLabelValue(runContext, label);
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
return labels;
}
private String renderLabelValue(RunContext runContext, Label label) {
try {
return runContext.render(label.value());
} catch (IllegalVariableEvaluationException e) {
runContext.logger().warn("Failed to render label '{}', it will be omitted", label.key(), e);
return null;
}
}
@Builder
@ToString
@EqualsAndHashCode

View File

@@ -5,6 +5,7 @@ import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
@@ -361,14 +362,13 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
// validate schedule condition can fail to render variables
// in this case, we return a failed execution so the trigger is not evaluated each second
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
List<Label> labels = generateLabels(conditionContext, backfill);
Execution execution = Execution.builder()
.id(runContext.getTriggerExecutionId())
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.labels(labels)
.labels(generateLabels(runContext, conditionContext, backfill))
.state(new State().withState(State.Type.FAILED))
.build();
return Optional.of(execution);
@@ -402,7 +402,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
} else {
variables = scheduleDates.toMap();
}
List<Label> labels = generateLabels(conditionContext, backfill);
List<Label> labels = generateLabels(runContext, conditionContext, backfill);
ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, variables);
@@ -442,19 +442,29 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
return parser.parse(this.cron);
}
private List<Label> generateLabels(ConditionContext conditionContext, Backfill backfill) {
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
List<Label> labels = new ArrayList<>();
if (conditionContext.getFlow().getLabels() != null) {
labels.addAll(conditionContext.getFlow().getLabels());
labels.addAll(conditionContext.getFlow().getLabels()); // no need for rendering
}
if (backfill != null && backfill.getLabels() != null) {
labels.addAll(backfill.getLabels());
for (Label label : backfill.getLabels()) {
final var value = runContext.render(label.value());
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
if (this.getLabels() != null) {
labels.addAll(this.getLabels());
for (Label label : this.getLabels()) {
final var value = runContext.render(label.value());
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
return labels;

View File

@@ -5,10 +5,8 @@ import com.google.common.collect.ImmutableList;
import io.kestra.core.Helpers;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
@@ -55,6 +53,9 @@ public abstract class AbstractFlowRepositoryTest {
@Inject
protected FlowRepositoryInterface flowRepository;
@Inject
protected ExecutionRepositoryInterface executionRepository;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;
@@ -546,6 +547,67 @@ public abstract class AbstractFlowRepositoryTest {
}
}
@Test
void findByExecution() {
Flow flow = builder()
.revision(1)
.build();
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.state(new State())
.build();
execution = executionRepository.save(execution);
try {
Flow full = flowRepository.findByExecution(execution);
assertThat(full, notNullValue());
assertThat(full.getNamespace(), is(flow.getNamespace()));
assertThat(full.getId(), is(flow.getId()));
full = flowRepository.findByExecutionWithoutAcl(execution);
assertThat(full, notNullValue());
assertThat(full.getNamespace(), is(flow.getNamespace()));
assertThat(full.getId(), is(flow.getId()));
} finally {
deleteFlow(flow);
executionRepository.delete(execution);
}
}
@Test
void findByExecutionNoRevision() {
Flow flow = builder()
.revision(3)
.build();
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.state(new State())
.build();
executionRepository.save(execution);
try {
Flow full = flowRepository.findByExecution(execution);
assertThat(full, notNullValue());
assertThat(full.getNamespace(), is(flow.getNamespace()));
assertThat(full.getId(), is(flow.getId()));
full = flowRepository.findByExecutionWithoutAcl(execution);
assertThat(full, notNullValue());
assertThat(full.getNamespace(), is(flow.getNamespace()));
assertThat(full.getId(), is(flow.getId()));
} finally {
deleteFlow(flow);
executionRepository.delete(execution);
}
}
private void deleteFlow(Flow flow) {
Integer revision = flowRepository.lastRevision(flow.getTenantId(), flow.getNamespace(), flow.getId());
flowRepository.delete(flow.toBuilder().revision(revision).build());

View File

@@ -149,7 +149,7 @@ public abstract class AbstractLogRepositoryTest {
logRepository.save(log1);
logRepository.deleteByQuery(null, "io.kestra.unittest", "flowId", null, null, ZonedDateTime.now().plusMinutes(1));
logRepository.deleteByQuery(null, "io.kestra.unittest", "flowId", List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size(), is(0));

View File

@@ -6,6 +6,7 @@ import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Test;
import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -49,4 +50,13 @@ class FilesServiceTest {
Map<String, URI> outputs = FilesService.outputFiles(runContext, files.keySet().stream().toList());
assertThat(outputs.size(), is(1));
}
@Test
void renderOutputFiles() throws Exception {
RunContext runContext = runContextFactory.of(Map.of("extension", "txt"));
Map<String, String> files = FilesService.inputFiles(runContext, Map.of("file.txt", "content"));
Map<String, URI> outputs = FilesService.outputFiles(runContext, List.of("*.{{extension}}"));
assertThat(outputs.size(), is(1));
}
}

View File

@@ -8,12 +8,20 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
class RunVariablesTest {
@Test
@SuppressWarnings("unchecked")
void shouldGetEmptyVariables() {
Map<String, Object> variables = new RunVariables.DefaultBuilder().build(new RunContextLogger());
Assertions.assertEquals(Map.of("envs", Map.of(), "globals", Map.of()), variables);
assertThat(variables.size(), is(3));
assertThat((Map<String, Object>) variables.get("envs"), is(Map.of()));
assertThat((Map<String, Object>) variables.get("globals"), is(Map.of()));
assertThat(variables.get("addSecretConsumer"), notNullValue());
}
@Test

View File

@@ -18,13 +18,13 @@ import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
@@ -33,6 +33,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
private static Flow createScheduleFlow() {
Schedule schedule = Schedule.builder()
.id("hourly")
@@ -58,6 +61,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(4);
Flow flow = createScheduleFlow();
@@ -74,6 +78,11 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
.when(flowListenersServiceSpy)
.flows();
// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(executionRepositorySpy)
.findById(any(), any());
// scheduler
try (AbstractScheduler scheduler = new JdbcScheduler(
applicationContext,
@@ -94,7 +103,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
});
scheduler.run();
queueCount.await(30, TimeUnit.SECONDS);
queueCount.await(15, TimeUnit.SECONDS);
receive.blockLast();

View File

@@ -39,6 +39,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
@Inject
private SchedulerTriggerStateInterface triggerState;
@Inject
private SchedulerExecutionState schedulerExecutionState;
@Inject
private FlowListeners flowListenersService;
@@ -169,6 +172,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
assertThat(queueCount.getCount(), is(0L));
assertThat(last.get(), notNullValue());
assertThat(last.get().getFlowRevision(), notNullValue());
assertThat(last.get().getState().getCurrent(), is(State.Type.FAILED));
}
}

View File

@@ -38,6 +38,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
protected QueueInterface<LogEntry> logQueue;
@@ -65,7 +68,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.truncatedTo(ChronoUnit.HOURS);
}
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
@@ -77,6 +80,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(6);
CountDownLatch invalidLogCount = new CountDownLatch(1);
Set<String> date = new HashSet<>();
@@ -111,7 +115,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
// wait for execution
Flux<Execution> receiveExecutions = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
@@ -171,7 +175,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> {
@@ -205,7 +209,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
@@ -250,7 +254,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
@@ -294,7 +298,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(lastTrigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
@@ -325,7 +329,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.build();
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> {
@@ -390,7 +394,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
// Wait 3s to see if things happen
@@ -428,7 +432,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(2);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
@@ -488,7 +492,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();

View File

@@ -19,20 +19,20 @@ import reactor.core.publisher.Flux;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
protected FlowListeners flowListenersService;
@Inject
protected SchedulerTriggerStateInterface triggerState;
protected SchedulerExecutionStateInterface executionState;
@Test
void thread() throws Exception {
@@ -53,12 +53,17 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);
doReturn(Collections.singletonList(flow))
.when(flowListenersServiceSpy)
.flows();
// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(schedulerExecutionStateSpy)
.findById(any(), any());
// scheduler
try (
AbstractScheduler scheduler = new JdbcScheduler(

View File

@@ -123,10 +123,10 @@ class YamlFlowParserTest {
void inputs() {
Flow flow = this.parse("flows/valids/inputs.yaml");
assertThat(flow.getInputs().size(), is(27));
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(9L));
assertThat(flow.getInputs().size(), is(28));
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(10L));
assertThat(flow.getInputs().stream().filter(r -> !r.getRequired()).count(), is(18L));
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(1L));
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(2L));
assertThat(flow.getInputs().stream().filter(r -> r instanceof StringInput && ((StringInput)r).getValidator() != null).count(), is(1L));
}

View File

@@ -25,12 +25,16 @@ import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -99,6 +103,51 @@ class PluginDefaultServiceTest {
), result);
}
@ParameterizedTest
@MethodSource
void flowDefaultsOverrideGlobalDefaults(boolean flowDefaultForced, boolean globalDefaultForced, String fooValue, String barValue, String bazValue) {
final DefaultPrecedenceTester task = DefaultPrecedenceTester.builder()
.id("test")
.type(DefaultPrecedenceTester.class.getName())
.propBaz("taskValue")
.build();
final PluginDefault flowDefault = new PluginDefault(DefaultPrecedenceTester.class.getName(), flowDefaultForced, ImmutableMap.of(
"propBar", "flowValue",
"propBaz", "flowValue"
));
final PluginDefault globalDefault = new PluginDefault(DefaultPrecedenceTester.class.getName(), globalDefaultForced, ImmutableMap.of(
"propFoo", "globalValue",
"propBar", "globalValue",
"propBaz", "globalValue"
));
final Flow flowWithPluginDefault = Flow.builder()
.tasks(Collections.singletonList(task))
.pluginDefaults(List.of(flowDefault))
.build();
final PluginGlobalDefaultConfiguration pluginGlobalDefaultConfiguration = new PluginGlobalDefaultConfiguration();
pluginGlobalDefaultConfiguration.defaults = List.of(globalDefault);
pluginDefaultService.pluginGlobalDefault = pluginGlobalDefaultConfiguration;
final Flow injected = pluginDefaultService.injectDefaults(flowWithPluginDefault);
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropFoo(), is(fooValue));
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropBar(), is(barValue));
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropBaz(), is(bazValue));
}
private static Stream<Arguments> flowDefaultsOverrideGlobalDefaults() {
return Stream.of(
Arguments.of(false, false, "globalValue", "flowValue", "taskValue"),
Arguments.of(false, true, "globalValue", "globalValue", "globalValue"),
Arguments.of(true, false, "globalValue", "flowValue", "flowValue"),
Arguments.of(true, true, "globalValue", "flowValue", "flowValue")
);
}
@Test
public void injectFlowAndGlobals() {
DefaultTester task = DefaultTester.builder()
@@ -297,4 +346,23 @@ class PluginDefaultServiceTest {
private Map<String, String> val;
}
}
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(aliases = "io.kestra.core.services.DefaultPrecedenceTesterAlias")
public static class DefaultPrecedenceTester extends Task implements RunnableTask<VoidOutput> {
private String propFoo;
private String propBar;
private String propBaz;
@Override
public VoidOutput run(RunContext runContext) throws Exception {
return null;
}
}
}

View File

@@ -5,25 +5,33 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
class FlowOutputTest extends AbstractMemoryRunnerTest {
static final String NAMESPACE = "io.kestra.tests";
@Test
void shouldGetSuccessExecutionForFlowFlowWithOutputs() throws TimeoutException {
void shouldGetSuccessExecutionForFlowWithOutputs() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, NAMESPACE, "flow-with-outputs", null, null);
assertThat(execution.getOutputs(), aMapWithSize(1));
assertThat(execution.getOutputs().get("key"), is("{\"value\":\"flow-with-outputs\"}"));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
}
@Test
@SuppressWarnings("unchecked")
void shouldGetSuccessExecutionForFlowWithArrayOutputs() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, NAMESPACE, "flow-with-array-outputs", null, null);
assertThat(execution.getOutputs(), aMapWithSize(1));
assertThat((List<String>) execution.getOutputs().get("myout"), hasItems("1rstValue", "2ndValue"));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
}
@Test
void shouldGetFailExecutionForFlowWithInvalidOutputs() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, NAMESPACE, "flow-with-outputs-failed", null, null);

View File

@@ -102,6 +102,21 @@ public class ForEachItemCaseTest {
assertThat(triggered.get().getTaskRunList(), hasSize(1));
}
@SuppressWarnings("unchecked")
public void forEachItemEmptyItems() throws TimeoutException, URISyntaxException, IOException {
URI file = emptyItems();
Map<String, Object> inputs = Map.of("file", file.toString());
Execution execution = runnerUtils.runOne(null, TEST_NAMESPACE, "for-each-item", null,
(flow, execution1) -> flowIO.typedInputs(flow, execution1, inputs),
Duration.ofSeconds(30));
// assert on the main flow execution
assertThat(execution.getTaskRunList(), hasSize(4));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
assertThat(outputs, nullValue());
}
@SuppressWarnings("unchecked")
public void forEachItemNoWait() throws TimeoutException, InterruptedException, URISyntaxException, IOException {
CountDownLatch countDownLatch = new CountDownLatch(26);
@@ -261,6 +276,16 @@ public class ForEachItemCaseTest {
);
}
private URI emptyItems() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");
return storageInterface.put(
null,
new URI("/file/storage/file.txt"),
new FileInputStream(tempFile)
);
}
private List<String> content() {
return IntStream
.range(0, 102)

View File

@@ -16,6 +16,7 @@ import java.util.Optional;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@KestraTest
@@ -109,4 +110,49 @@ class FlowTest {
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
}
@Test
void success_withLabels() {
var flow = io.kestra.core.models.flows.Flow.builder()
.id("flow-with-flow-trigger")
.namespace("io.kestra.unittest")
.revision(1)
.labels(List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2")
))
.tasks(Collections.singletonList(Return.builder()
.id("test")
.type(Return.class.getName())
.format("test")
.build()))
.build();
var execution = Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("flow-with-flow-trigger")
.flowRevision(1)
.state(State.of(State.Type.RUNNING, Collections.emptyList()))
.build();
var flowTrigger = Flow.builder()
.id("flow")
.type(Flow.class.getName())
.labels(List.of(
new Label("trigger-label-1", "trigger-label-1"),
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
new Label("trigger-label-3", "{{ null }}"), // should return an empty string
new Label("trigger-label-4", "{{ foobar }}") // should fail
))
.build();
Optional<Execution> evaluate = flowTrigger.evaluate(runContextFactory.of(), flow, execution);
assertThat(evaluate.isPresent(), is(true));
assertThat(evaluate.get().getLabels(), hasSize(5));
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
}
}

View File

@@ -135,6 +135,35 @@ class ScheduleTest {
assertThat(inputs.get("input2"), is("default"));
}
@Test
void success_withLabels() throws Exception {
var scheduleTrigger = Schedule.builder()
.id("schedule")
.cron("0 0 1 * *")
.labels(List.of(
new Label("trigger-label-1", "trigger-label-1"),
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
new Label("trigger-label-3", "{{ null }}")
))
.build();
var conditionContext = conditionContext(scheduleTrigger);
var date = ZonedDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS)
.minusMonths(1);
var triggerContext = triggerContext(date, scheduleTrigger);
Optional<Execution> evaluate = scheduleTrigger.evaluate(conditionContext, triggerContext);
assertThat(evaluate.isPresent(), is(true));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
}
@SuppressWarnings("unchecked")
@Test
void everyMinute() throws Exception {

View File

@@ -0,0 +1,16 @@
id: flow-with-array-outputs
namespace: io.kestra.tests
tasks:
- id: return
type: io.kestra.plugin.core.output.OutputValues
values:
one: "1rstValue"
two: "2ndValue"
outputs:
- id: myout
type: ARRAY
value:
- "{{ outputs.return.values.one }}"
- "{{ outputs.return.values.two }}"

View File

@@ -95,6 +95,11 @@ inputs:
- name: array
type: ARRAY
itemType: INT
# required true and an empty default value will only work if we correctly serialize default values which is what this input is about to test.
- name: empty
type: STRING
defaults: ''
required: true
tasks:
- id: string

View File

@@ -1,5 +1,5 @@
version=0.18.0-SNAPSHOT
version=0.18.17
org.gradle.parallel=true
org.gradle.caching=true
org.gradle.priority=low
org.gradle.priority=low

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.h2;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class H2SchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -2,9 +2,13 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.DMLQuery;
import org.jooq.DSLContext;
import org.jooq.Record;
@Singleton
@MysqlRepositoryEnabled
@@ -13,4 +17,13 @@ public class MysqlFlowTopologyRepository extends AbstractJdbcFlowTopologyReposit
public MysqlFlowTopologyRepository(@Named("flowtopologies") MysqlRepository<FlowTopology> repository) {
super(repository);
}
@Override
protected DMLQuery<Record> buildMergeStatement(DSLContext context, FlowTopology flowTopology) {
return context.insertInto(this.jdbcRepository.getTable())
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(flowTopology))
.set(this.jdbcRepository.persistFields(flowTopology))
.onDuplicateKeyUpdate()
.set(this.jdbcRepository.persistFields(flowTopology));
}
}

View File

@@ -7,9 +7,17 @@ import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class MysqlQueue<T> extends JdbcQueue<T> {
// TODO - remove once 'queue' table is re-designed
private static final MysqlQueueConsumers QUEUE_CONSUMERS = new MysqlQueueConsumers();
public MysqlQueue(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
}
@@ -59,7 +67,7 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
.where(AbstractJdbcRepository.field("type").eq(this.cls.getName()))
.and(DSL.or(List.of(
AbstractJdbcRepository.field("consumers").isNull(),
DSL.condition("NOT(FIND_IN_SET(?, consumers) > 0)", queueType)
AbstractJdbcRepository.field("consumers").in(QUEUE_CONSUMERS.allForConsumerNotIn(queueType))
)));
if (consumerGroup != null) {
@@ -101,4 +109,38 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
update.execute();
}
private static final class MysqlQueueConsumers {
private static final Set<String> CONSUMERS;
static {
CONSUMERS = new HashSet<>();
String[] elements = {"indexer", "executor", "worker", "scheduler"};
List<String> results = new ArrayList<>();
// Generate all combinations and their permutations
generateCombinations(elements, new boolean[elements.length], new ArrayList<>(), results);
CONSUMERS.addAll(results);
}
public Set<String> allForConsumerNotIn(String consumer) {
return CONSUMERS.stream().filter(s -> !s.contains(consumer)).collect(Collectors.toSet());
}
private static void generateCombinations(String[] elements, boolean[] used, List<String> current, List<String> results) {
if (!current.isEmpty()) {
results.add(String.join(",", current));
}
for (int i = 0; i < elements.length; i++) {
if (!used[i]) {
used[i] = true;
current.add(elements[i]);
generateCombinations(elements, used, current, results);
current.removeLast();
used[i] = false;
}
}
}
}
}

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.mysql;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -2,9 +2,13 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.DMLQuery;
import org.jooq.DSLContext;
import org.jooq.Record;
@Singleton
@PostgresRepositoryEnabled
@@ -13,4 +17,14 @@ public class PostgresFlowTopologyRepository extends AbstractJdbcFlowTopologyRepo
public PostgresFlowTopologyRepository(@Named("flowtopologies") PostgresRepository<FlowTopology> repository) {
super(repository);
}
@Override
protected DMLQuery<Record> buildMergeStatement(DSLContext context, FlowTopology flowTopology) {
return context.insertInto(this.jdbcRepository.getTable())
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(flowTopology))
.set(this.jdbcRepository.persistFields(flowTopology))
.onConflict(AbstractJdbcRepository.field("key"))
.doUpdate()
.set(this.jdbcRepository.persistFields(flowTopology));
}
}

View File

@@ -10,6 +10,7 @@ import org.jooq.impl.DSL;
import org.slf4j.event.Level;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -27,10 +28,9 @@ public class PostgresLogRepository extends AbstractJdbcLogRepository {
}
@Override
protected Condition minLevel(Level minLevel) {
protected Condition levelsCondition(List<Level> levels) {
return DSL.condition("level in (" +
LogEntry
.findLevelsByMin(minLevel)
levels
.stream()
.map(s -> "'" + s + "'::log_level")
.collect(Collectors.joining(", ")) +

View File

@@ -1,5 +1,6 @@
package io.kestra.repository.postgres;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.jdbc.JdbcMapper;
@@ -21,6 +22,7 @@ import org.jooq.Result;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import jakarta.annotation.Nullable;
@@ -52,12 +54,10 @@ public class PostgresRepository<T> extends io.kestra.jdbc.AbstractJdbcRepository
@SneakyThrows
@Override
public Map<Field<Object>, Object> persistFields(T entity) {
Map<Field<Object>, Object> fields = super.persistFields(entity);
String json = JdbcMapper.of().writeValueAsString(entity);
fields.replace(AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)));
return fields;
return new HashMap<>(ImmutableMap
.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)))
);
}
@SneakyThrows

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.postgres;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -59,6 +59,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface<Execution> {
private static final int FETCH_SIZE = 100;
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
private final ApplicationContext applicationContext;
@@ -108,10 +110,13 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
.where(this.defaultFilter(tenantId))
.and(field("trigger_execution_id").eq(triggerExecutionId));
select.fetch()
.map(this.jdbcRepository::map)
.forEach(emitter::next);
emitter.complete();
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
stream.map(this.jdbcRepository::map).forEach(emitter::next);
} finally {
emitter.complete();
}
}),
FluxSink.OverflowStrategy.BUFFER
);
@@ -213,11 +218,13 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
deleted
);
select.fetch()
.map(this.jdbcRepository::map)
.forEach(emitter::next);
emitter.complete();
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
stream.map(this.jdbcRepository::map).forEach(emitter::next);
} finally {
emitter.complete();
}
}),
FluxSink.OverflowStrategy.BUFFER
);

View File

@@ -4,14 +4,13 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import jakarta.inject.Singleton;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRepository implements FlowTopologyRepositoryInterface, JdbcIndexerInterface<FlowTopology> {
protected final io.kestra.jdbc.AbstractJdbcRepository<FlowTopology> jdbcRepository;
@@ -105,10 +104,7 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
context
.batch(flowTopologies
.stream()
.map(flowTopology -> context.insertInto(this.jdbcRepository.getTable())
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(flowTopology))
.set(this.jdbcRepository.persistFields(flowTopology))
)
.map(flowTopology -> buildMergeStatement(context, flowTopology))
.toList()
)
.execute();
@@ -116,6 +112,17 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
});
}
protected DMLQuery<Record> buildMergeStatement(DSLContext context, FlowTopology flowTopology) {
return context.mergeInto(this.jdbcRepository.getTable())
.using(context.selectOne())
.on(AbstractJdbcRepository.field("key").eq(this.jdbcRepository.key(flowTopology)))
.whenMatchedThenUpdate()
.set(this.jdbcRepository.persistFields(flowTopology))
.whenNotMatchedThenInsert()
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(flowTopology))
.set(this.jdbcRepository.persistFields(flowTopology));
}
@Override
public FlowTopology save(FlowTopology flowTopology) {
this.jdbcRepository.persist(flowTopology);

View File

@@ -350,7 +350,10 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
DSLContext context = DSL.using(configuration);
return context.delete(this.jdbcRepository.getTable())
.where(field("execution_id", String.class).eq(execution.getId()))
// The deleted field is not used, so ti will always be false.
// We add it here to be sure to use the correct index.
.where(field("deleted", Boolean.class).eq(false))
.and(field("execution_id", String.class).eq(execution.getId()))
.execute();
});
}
@@ -441,7 +444,7 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
}
if (logLevels != null) {
delete = delete.and(field("level").in(logLevels));
delete = delete.and(levelsCondition(logLevels));
}
return delete.execute();
@@ -493,7 +496,11 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
});
}
protected Condition minLevel(Level minLevel) {
return field("level").in(LogEntry.findLevelsByMin(minLevel));
private Condition minLevel(Level minLevel) {
return levelsCondition(LogEntry.findLevelsByMin(minLevel));
}
protected Condition levelsCondition(List<Level> levels) {
return field("level").in(levels.stream().map(level -> level.name()).toList());
}
}

View File

@@ -150,7 +150,10 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
DSLContext context = DSL.using(configuration);
return context.delete(this.jdbcRepository.getTable())
.where(field("execution_id", String.class).eq(execution.getId()))
// The deleted field is not used, so ti will always be false.
// We add it here to be sure to use the correct index.
.where(field("deleted", Boolean.class).eq(false))
.and(field("execution_id", String.class).eq(execution.getId()))
.execute();
});
}
@@ -168,8 +171,7 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
SelectConditionStep<Record1<Object>> select = context
.selectDistinct(field(field))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
@@ -185,8 +187,7 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));

View File

@@ -169,7 +169,6 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
Trigger current = optionalTrigger.get();
current = current.toBuilder()
.executionId(trigger.getExecutionId())
.executionCurrentState(trigger.getExecutionCurrentState())
.updatedDate(trigger.getUpdatedDate())
.build();
this.save(context, current);

View File

@@ -31,10 +31,10 @@ import java.util.function.BiConsumer;
public class JdbcScheduler extends AbstractScheduler {
private final QueueInterface<Execution> executionQueue;
private final TriggerRepositoryInterface triggerRepository;
private final ConditionService conditionService;
private final FlowRepositoryInterface flowRepository;
private final JooqDSLContextWrapper dslContextWrapper;
private final ConditionService conditionService;
@SuppressWarnings("unchecked")
@@ -48,6 +48,7 @@ public class JdbcScheduler extends AbstractScheduler {
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class);
conditionService = applicationContext.getBean(ConditionService.class);
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
@@ -75,14 +76,6 @@ public class JdbcScheduler extends AbstractScheduler {
.ifPresent(trigger -> {
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
});
} else {
// update execution state on each state change so the scheduler knows the execution is running
triggerRepository
.findByExecution(execution)
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
.ifPresent(trigger -> {
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
});
}
}
}
@@ -105,7 +98,7 @@ public class JdbcScheduler extends AbstractScheduler {
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
schedulerContext.startTransaction(scheduleContextInterface -> {
schedulerContext.doInTransaction(scheduleContextInterface -> {
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
consumer.accept(triggers, scheduleContextInterface);

View File

@@ -18,17 +18,14 @@ public class JdbcSchedulerContext implements ScheduleContextInterface {
this.dslContextWrapper = dslContextWrapper;
}
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
this.dslContextWrapper.transaction(configuration -> {
this.context = DSL.using(configuration);
consumer.accept(this);
this.commit();
this.context.commit();
});
}
public void commit() {
this.context.commit();
}
}

View File

@@ -54,6 +54,18 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
return trigger;
}
@Override
public Trigger create(Trigger trigger, String headerContent) {
return this.triggerRepository.create(trigger);
}
@Override
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
this.triggerRepository.save(trigger, scheduleContextInterface);
return trigger;
}
@Override
public Trigger create(Trigger trigger) {
@@ -84,7 +96,4 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}
@Override
public void unlock(Trigger trigger) {}
}

View File

@@ -276,6 +276,11 @@ public abstract class JdbcRunnerTest {
forEachItemCaseTest.forEachItem();
}
@Test
void forEachItemEmptyItems() throws URISyntaxException, IOException, TimeoutException {
forEachItemCaseTest.forEachItemEmptyItems();
}
@Test
void forEachItemNoWait() throws URISyntaxException, IOException, InterruptedException, TimeoutException {
forEachItemCaseTest.forEachItemNoWait();

View File

@@ -132,7 +132,7 @@ public class CommandsWrapper implements TaskCommands {
}
public ScriptOutput run() throws Exception {
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
if (this.namespaceFiles != null && !Boolean.FALSE.equals(this.namespaceFiles.getEnabled())) {
List<NamespaceFile> matchedNamespaceFiles = runContext.storage()
.namespace()

View File

@@ -342,7 +342,7 @@ public class Docker extends TaskRunner {
CreateContainerResponse exec = container.exec();
logger.debug("Container created: {}", exec.getId());
List<Path> relativeWorkingDirectoryFilesPaths = taskCommands.relativeWorkingDirectoryFilesPaths();
List<Path> relativeWorkingDirectoryFilesPaths = taskCommands.relativeWorkingDirectoryFilesPaths(true);
boolean hasFilesToUpload = !ListUtils.isEmpty(relativeWorkingDirectoryFilesPaths);
boolean hasFilesToDownload = !ListUtils.isEmpty(filesToDownload);
boolean outputDirectoryEnabled = taskCommands.outputDirectoryEnabled();

View File

@@ -7,6 +7,6 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
class DockerTest extends AbstractTaskRunnerTest {
@Override
protected TaskRunner taskRunner() {
return Docker.builder().image("centos").build();
return Docker.builder().image("rockylinux:9.3-minimal").build();
}
}

View File

@@ -201,7 +201,7 @@ public abstract class AbstractTaskRunnerTest {
Mockito.when(commands.getEnableOutputDirectory()).thenReturn(true);
Mockito.when(commands.outputDirectoryEnabled()).thenReturn(true);
Mockito.when(commands.getTimeout()).thenReturn(null);
Mockito.when(commands.relativeWorkingDirectoryFilesPaths()).thenCallRealMethod();
Mockito.when(commands.relativeWorkingDirectoryFilesPaths(true)).thenCallRealMethod();
return commands;
}

View File

@@ -1,5 +1,2 @@
public/vscode/
public/vscode-web/
node/
node_modules/

View File

@@ -30,8 +30,8 @@
<strong>We're sorry but Kestra doesn't work properly without JavaScript enabled. Please enable it to continue.</strong>
</noscript>
<div id="loader-wrapper">
<div id="loader"></div>
<div id="loader-wrapper" data-test-id="loader-wrapper">
<div id="loader" data-test-id="loader"></div>
</div>
<div id="app-container">

2443
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,9 @@
{
"name": "kestra",
"version": "0.1.0",
"version": "0.18.6",
"private": true,
"packageManager": "npm@9.8.1",
"type": "module",
"packageManager": "npm@9.9.3",
"scripts": {
"dev": "vite --host",
"build": "vite build --emptyOutDir",
@@ -12,25 +13,25 @@
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
},
"dependencies": {
"@kestra-io/ui-libs": "^0.0.53",
"@kestra-io/ui-libs": "^0.0.59",
"@vue-flow/background": "^1.3.0",
"@vue-flow/controls": "^1.1.2",
"@vue-flow/core": "^1.39.1",
"@vue-flow/core": "^1.41.1",
"ansi-to-html": "^0.7.2",
"axios": "^1.7.2",
"axios": "^1.7.7",
"bootstrap": "^5.3.3",
"buffer": "^6.0.3",
"chart.js": "^4.4.3",
"chart.js": "^4.4.4",
"chartjs-chart-treemap": "^2.3.1",
"core-js": "^3.37.1",
"core-js": "^3.38.1",
"cronstrue": "^2.50.0",
"dagre": "^0.8.5",
"element-plus": "^2.7.8",
"element-plus": "^2.8.2",
"humanize-duration": "^3.32.1",
"js-yaml": "^4.1.0",
"lodash": "^4.17.21",
"markdown-it": "^14.1.0",
"markdown-it-anchor": "^9.0.1",
"markdown-it-anchor": "^9.2.0",
"markdown-it-container": "^4.0.0",
"markdown-it-mark": "^4.0.0",
"markdown-it-meta": "0.0.1",
@@ -39,51 +40,51 @@
"moment": "^2.30.1",
"moment-range": "4.0.2",
"moment-timezone": "^0.5.45",
"node-modules-polyfill": "^0.1.4",
"nprogress": "^0.2.0",
"pdfjs-dist": "^4.5.136",
"posthog-js": "^1.150.1",
"pdfjs-dist": "^4.6.82",
"posthog-js": "^1.160.3",
"throttle-debounce": "^5.0.2",
"vite-plugin-eslint": "^1.8.1",
"vue": "^3.4.34",
"vue": "^3.5.3",
"vue-axios": "3.5.2",
"vue-chartjs": "^5.3.1",
"vue-gtag": "^2.0.1",
"vue-i18n": "^9.13.1",
"vue-i18n": "^9.14.0",
"vue-material-design-icons": "^5.3.0",
"vue-router": "^4.4.0",
"vue-sidebar-menu": "^5.4.0",
"vue-router": "^4.4.3",
"vue-sidebar-menu": "^5.4.1",
"vue-virtual-scroller": "^2.0.0-beta.8",
"vue3-popper": "^1.5.0",
"vue3-runtime-template": "^1.0.2",
"vue3-tour": "github:kestra-io/vue3-tour",
"vuex": "^4.1.0",
"xss": "^1.0.15",
"yaml": "^2.5.0"
"yaml": "^2.5.1"
},
"devDependencies": {
"@esbuild-plugins/node-modules-polyfill": "^0.2.2",
"@rushstack/eslint-patch": "^1.10.4",
"@shikijs/markdown-it": "^1.6.3",
"@typescript-eslint/parser": "^7.17.0",
"@vitejs/plugin-vue": "^5.1.1",
"@shikijs/markdown-it": "^1.16.2",
"@typescript-eslint/parser": "^8.4.0",
"@vitejs/plugin-vue": "^5.1.3",
"@vue/eslint-config-prettier": "^9.0.0",
"@vue/test-utils": "^2.4.6",
"decompress": "^4.2.1",
"eslint": "^8.57.0",
"eslint-plugin-vue": "^9.27.0",
"jsdom": "^24.1.1",
"monaco-editor": "^0.50.0",
"eslint-plugin-vue": "^9.28.0",
"jsdom": "^25.0.0",
"monaco-editor": "^0.51.0",
"monaco-yaml": "^5.2.2",
"prettier": "^3.3.3",
"rollup-plugin-copy": "^3.5.0",
"rollup-plugin-visualizer": "^5.12.0",
"sass": "^1.77.4",
"sass": "^1.78.0",
"typescript": "^5.5.4",
"vite": "^5.3.5",
"vitest": "^2.0.4"
"vite": "^5.4.6",
"vite-plugin-eslint": "^1.8.1",
"vitest": "^2.0.5"
},
"optionalDependencies": {
"@rollup/rollup-linux-x64-gnu": "^4.19.0"
"@rollup/rollup-linux-x64-gnu": "^4.21.2"
},
"overrides": {
"bootstrap": {

30
ui/plugins/commit.ts Normal file
View File

@@ -0,0 +1,30 @@
import type {Plugin} from "vite";
import {execSync} from "child_process";
const getInfo = (formats: string[]): string[] => formats.map(format => execSync(`git log -1 --format=${format}`).toString().trim());
const comment = (message: string, author: string, date: string): string => `
<!--
Last Commit:
${message}
----------
Author: ${author}
Date: ${date}
-->`;
export const commit = (): Plugin => {
const [message, author, date] = getInfo(["%s", "%an", "%cd"]);
return {
name: "commit",
transformIndexHtml: {
order: "pre",
handler(html: string): string {
return comment(message, author, date) + html;
},
},
};
};

View File

@@ -1,29 +0,0 @@
import type {Plugin} from "vite";
import {execSync} from "child_process";
const comment = (hash: string, date: string): string => {
return `
<!--
Last Commit:
URL: https://github.com/kestra-io/kestra/commit/${hash}
Date: ${date}
-->`;
};
export const details = (): Plugin => {
const hash: string = execSync("git rev-parse --short HEAD").toString().trim();
const date: string = execSync("git log -1 --format=%cd").toString().trim();
return {
name: "details",
transformIndexHtml: {
order: "pre",
handler(html: string): string {
return comment(hash, date) + html;
},
},
};
};

View File

@@ -68,6 +68,7 @@
},
methods: {
changeVisibility(visible = true) {
if (visible) document.querySelector(".ee-tooltip")?.remove();
this.visible = visible
}
},

View File

@@ -86,6 +86,7 @@
:label="$t('id')"
/>
<el-table-column
prop="flowId"
sortable="custom"
:sort-orders="['ascending', 'descending']"
:label="$t('flow')"
@@ -124,16 +125,6 @@
</router-link>
</template>
</el-table-column>
<el-table-column :label="$t('state')">
<template #default="scope">
<status
v-if="scope.row.executionCurrentState"
:status="scope.row.executionCurrentState"
size="small"
/>
</template>
</el-table-column>
<el-table-column prop="workerId" :label="$t('workerId')">
<template #default="scope">
<id
@@ -269,7 +260,6 @@
import RefreshButton from "../layout/RefreshButton.vue";
import DateAgo from "../layout/DateAgo.vue";
import Id from "../Id.vue";
import Status from "../Status.vue";
import {mapState} from "vuex";
import SelectTableActions from "../../mixins/selectTableActions";
import _merge from "lodash/merge";
@@ -284,7 +274,6 @@
SearchField,
NamespaceSelect,
DateAgo,
Status,
Id,
LogsWrapper
},

View File

@@ -61,6 +61,14 @@
@update:filter-value="onDataTableValue"
/>
</el-form-item>
<el-form-item>
<scope-filter-buttons
:label="$t('executions')"
:value="$route.query.scope"
:system="namespace === 'system'"
@update:model-value="onDataTableValue('scope', $event)"
/>
</el-form-item>
<el-form-item>
<label-filter
:model-value="$route.query.labels"
@@ -106,6 +114,13 @@
/>
</el-select>
</el-form-item>
<el-form-item>
<el-switch
:model-value="showChart"
@update:model-value="onShowChartChange"
:active-text="$t('show chart')"
/>
</el-form-item>
<el-form-item>
<filters :storage-key="filterStorageKey" />
</el-form-item>
@@ -118,7 +133,7 @@
</el-form-item>
</template>
<template #top v-if="isDisplayedTop">
<template #top v-if="showStatChart()">
<state-global-chart
v-if="daily"
class="mb-4"
@@ -410,7 +425,6 @@
import StatusFilterButtons from "../layout/StatusFilterButtons.vue"
import StateGlobalChart from "../../components/stats/StateGlobalChart.vue";
import TriggerAvatar from "../../components/flows/TriggerAvatar.vue";
import DateAgo from "../layout/DateAgo.vue";
import Kicon from "../Kicon.vue"
import Labels from "../layout/Labels.vue"
import RestoreUrl from "../../mixins/restoreUrl";
@@ -423,6 +437,7 @@
import {storageKeys} from "../../utils/constants";
import LabelInput from "../../components/labels/LabelInput.vue";
import {ElMessageBox, ElSwitch, ElFormItem, ElAlert, ElCheckbox} from "element-plus";
import DateAgo from "../layout/DateAgo.vue";
import {h, ref} from "vue";
import {filterLabels} from "./utils"
@@ -493,6 +508,7 @@
dblClickRouteName: "executions/update",
flowTriggerDetails: undefined,
recomputeInterval: false,
showChart: ["true", null].includes(localStorage.getItem(storageKeys.SHOW_CHART)),
optionalColumns: [
{
label: "start date",
@@ -568,7 +584,6 @@
}
this.displayColumns = localStorage.getItem(this.storageKey)?.split(",")
|| this.optionalColumns.filter(col => col.default).map(col => col.prop);
},
computed: {
...mapState("execution", ["executions", "total"]),
@@ -641,6 +656,17 @@
displayColumn(column) {
return this.hidden ? !this.hidden.includes(column) : this.displayColumns.includes(column);
},
onShowChartChange(value) {
this.showChart = value;
localStorage.setItem(storageKeys.SHOW_CHART, value);
if (this.showChart) {
this.loadStats();
}
},
showStatChart() {
return this.isDisplayedTop && this.showChart;
},
refresh() {
this.recomputeInterval = !this.recomputeInterval;
this.load();
@@ -676,19 +702,22 @@
return _merge(base, queryFilter)
},
loadStats() {
this.dailyReady = false;
this.$store
.dispatch("stat/daily", this.loadQuery({
startDate: this.$moment(this.startDate).toISOString(true),
endDate: this.$moment(this.endDate).toISOString(true)
}, true))
.then(() => {
this.dailyReady = true;
});
},
loadData(callback) {
this.refreshDates = !this.refreshDates;
if (this.isDisplayedTop) {
this.dailyReady = false;
this.$store
.dispatch("stat/daily", this.loadQuery({
startDate: this.$moment(this.startDate).toISOString(true),
endDate: this.$moment(this.endDate).toISOString(true)
}, true))
.then(() => {
this.dailyReady = true;
});
if (this.showStatChart()) {
this.loadStats();
}
this.$store.dispatch("execution/findExecutions", this.loadQuery({
@@ -765,7 +794,7 @@
},
deleteExecutions() {
const includeNonTerminated = ref(false);
const deleteLogs = ref(true);
const deleteMetrics = ref(true);
const deleteStorage = ref(true);
@@ -784,7 +813,7 @@
"onUpdate:modelValue": (val) => {
includeNonTerminated.value = val
},
}),
}),
]),
h(ElAlert, {
title: this.$t("execution-warn-deleting-still-running"),

View File

@@ -34,6 +34,7 @@
</el-form-item>
<el-form-item>
<el-button-group>
<restart :execution="execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
<el-button @click="downloadContent()">
<kicon :tooltip="$t('download logs')">
<download />
@@ -79,6 +80,7 @@
import State from "../../utils/state";
import Utils from "../../utils/utils";
import LogLine from "../logs/LogLine.vue";
import Restart from "./Restart.vue";
export default {
components: {
@@ -88,7 +90,8 @@
Kicon,
Download,
Magnify,
Collapse
Collapse,
Restart
},
data() {
return {

View File

@@ -10,6 +10,13 @@
</el-button>
</el-button-group>
<el-button-group v-else-if="isURI(value)">
<a class="el-button el-button--small el-button--primary" :href="value" target="_blank">
<OpenInNew />
{{ $t('open') }}
</a>
</el-button-group>
<span v-else-if="value === null">
<em>null</em>
</span>
@@ -20,6 +27,7 @@
<script setup>
import Download from "vue-material-design-icons/Download.vue";
import OpenInNew from "vue-material-design-icons/OpenInNew.vue";
import FilePreview from "./FilePreview.vue";
</script>
@@ -37,6 +45,14 @@
isFile(value) {
return typeof(value) === "string" && value.startsWith("kestra:///")
},
isURI(value) {
try {
new URL(value);
return true;
} catch (e) {
return false;
}
},
itemUrl(value) {
return `${apiUrl(this.$store)}/executions/${this.execution.id}/file?path=${value}`;
},

View File

@@ -17,12 +17,12 @@
@expand-change="() => scrollRight()"
>
<template #default="{data}">
<div v-if="data.heading" class="pe-none d-flex fs-5">
<div v-if="data.heading" @click="expandedValue = data.path" class="pe-none d-flex fs-5">
<component :is="data.component" class="me-2" />
<span>{{ data.label }}</span>
</div>
<div v-else class="w-100 d-flex justify-content-between">
<div v-else @click="expandedValue = data.path" class="w-100 d-flex justify-content-between">
<div class="pe-5 d-flex task">
<TaskIcon v-if="data.icon" :icons="allIcons" :cls="icons[data.taskId]" only-icon />
<span :class="{'ms-3': data.icon}">{{ data.label }}</span>
@@ -65,6 +65,7 @@
:input="true"
:navbar="false"
:model-value="computedDebugValue"
@confirm="onDebugExpression($event)"
class="w-100"
/>
@@ -91,9 +92,13 @@
</el-collapse-item>
</el-collapse>
<el-alert v-if="debugError" type="error" :closable="false">
<el-alert v-if="debugError" type="error" :closable="false" class="overflow-auto">
<p><strong>{{ debugError }}</strong></p>
<pre class="mb-0">{{ debugStackTrace }}</pre>
<div class="my-2">
<CopyToClipboard :text="debugError" label="Copy Error" class="d-inline-block me-2" />
<CopyToClipboard :text="debugStackTrace" label="Copy Stack Trace" class="d-inline-block" />
</div>
<pre class="mb-0" style="overflow: scroll;">{{ debugStackTrace }}</pre>
</el-alert>
<VarValue :value="selectedValue" :execution="execution" />
@@ -115,15 +120,25 @@
import {apiUrl} from "override/utils/route";
import CopyToClipboard from "../../layout/CopyToClipboard.vue"
import Editor from "../../inputs/Editor.vue";
const debugEditor = ref(null);
const debugExpression = ref("");
const computedDebugValue = computed(() => `{{ outputs${selectedTask()?.taskId ? `.${selectedTask().taskId}` : ""} }}`);
const computedDebugValue = computed(() => {
const task = selectedTask()?.taskId;
if(!task) return "";
const path = expandedValue.value;
if(!path) return `{{ outputs.${task} }}`
return `{{ outputs.${path} }}`
});
const debugError = ref("");
const debugStackTrace = ref("");
const isJSON = ref(false);
const selectedTask = () => {
const filter = selected.value.length ? selected.value[0] : (cascader.value as any).menuList?.[0]?.panel?.expandingNode?.label;
const filter = selected.value?.length ? selected.value[0] : (cascader.value as any).menuList?.[0]?.panel?.expandingNode?.label;
const taskRunList = [...execution.value.taskRunList];
return taskRunList.find(e => e.taskId === filter);
};
@@ -162,22 +177,41 @@
const execution = computed(() => store.state.execution.execution);
const processedValue = (data): { label: string, regular: boolean; } => {
function isValidURL(url) {
try {
new URL(url);
return true;
} catch (e) {
return false;
}
}
const processedValue = (data) => {
const regular = false;
if (!data.value && !data.children?.length) return {label: data.value, regular};
else if (data?.children?.length) {
if (!data.value && !data.children?.length) {
return {label: data.value, regular};
} else if (data?.children?.length) {
const message = (length) => ({label: `${length} items`, regular});
const length = data.children.length;
return data.children[0].isFirstPass ? message(length - 1) : message(length);
}
return data.value.toString().startsWith("kestra:///") ? {label: "Internal link", regular} : {label: trim(data.value), regular: true};
// Check if the value is a valid URL and not an internal "kestra:///" link
if (isValidURL(data.value)) {
return data.value.startsWith("kestra:///")
? {label: "Internal link", regular}
: {label: "External link", regular};
}
return {label: trim(data.value), regular: true};
};
const expandedValue = ref([])
const selected = ref([]);
const selectedValue = computed(() => {
if (selected.value.length) return selected.value[selected.value.length - 1];
if (selected.value?.length) return selected.value[selected.value.length - 1];
return undefined;
});
const selectedNode = () => {
@@ -190,21 +224,34 @@
return {label, value};
};
const transform = (o, isFirstPass = true) => {
const transform = (o, isFirstPass, path = "") => {
const result = Object.keys(o).map(key => {
const value = o[key];
const isObject = typeof value === "object" && value !== null;
const currentPath = `${path}["${key}"]`;
// If the value is an array with exactly one element, use that element as the value
if (Array.isArray(value) && value.length === 1) {
return {label: key, value: value[0], children: []};
return {label: key, value: value[0], children: [], path: currentPath};
}
return {label: key, value: isObject && !Array.isArray(value) ? null : value, children: isObject ? transform(value, false) : []};
return {
label: key,
value: isObject && !Array.isArray(value) ? key : value,
children: isObject ? transform(value, false, currentPath) : [],
path: currentPath
};
});
if (isFirstPass) {
const OUTPUTS = {label: t("outputs"), heading: true, component: shallowRef(TextBoxSearchOutline), isFirstPass: true};
const OUTPUTS = {
label: t("outputs"),
heading: true,
component: shallowRef(TextBoxSearchOutline),
isFirstPass: true,
path: path
};
result.unshift(OUTPUTS);
}
@@ -212,7 +259,7 @@
};
const outputs = computed(() => {
const tasks = store.state.execution.execution.taskRunList.map((task) => {
return {label: task.taskId, value: task.taskId, ...task, icon: true, children: task?.outputs ? transform(task.outputs) : []};
return {label: task.taskId, value: task.taskId, ...task, icon: true, children: task?.outputs ? transform(task.outputs, true, task.taskId) : []};
});
const HEADING = {label: t("tasks"), heading: true, component: shallowRef(TimelineTextOutline)};

View File

@@ -36,9 +36,23 @@
default: true
}
},
data() {
return {
exampleFileName: "kestra.json"
}
},
computed: {
curlCommand() {
return this.generateCurlCommand();
const mainCommand = this.generateCurlCommand();
if (this.flow.inputs && this.flow.inputs.find((input) => input.type === "FILE")) {
return `${this.toShell(this.generatePrefix())} && \\\n${this.toShell(mainCommand)}`;
} else {
return `${this.toShell(mainCommand)}`;
}
},
exampleFileInputUrl() {
return `https://huggingface.co/datasets/kestra/datasets/resolve/main/json/${this.exampleFileName}`;
}
},
methods: {
@@ -55,16 +69,25 @@
switch (input.type) {
case "FILE": {
const fileInput = this.inputs[input.id];
if (fileInput) {
inputValue = fileInput.name;
}
inputValue = this.exampleFileName;
break;
}
case "SECRET": {
inputValue = this.inputs[input.id] ? "******" : undefined;
break;
}
case "DURATION": {
inputValue = this.$moment.duration(this.$moment(this.inputs[input.id]).format("hh:mm:ss")).toJSON();
break;
}
case "DATE": {
inputValue = this.$moment(this.inputs[input.id]).format("YYYY-MM-DD");
break;
}
case "TIME": {
inputValue = this.$moment(this.inputs[input.id]).format("hh:mm:ss");
break;
}
default:
inputValue = this.inputs[input.id];
}
@@ -115,8 +138,22 @@
command.push(`'${this.generateUrl()}'`);
return command
},
generatePrefix() {
return ["curl", "-O", `'${this.exampleFileInputUrl}'`];
},
toShell(command) {
return command.join(" ");
}
}
}
</script>
</script>
<style lang="scss" scoped>
/* Allow line-wraps */
code {
display: block;
white-space: pre-wrap;
}
</style>

View File

@@ -59,7 +59,7 @@
async setupFlow() {
if (this.$route.query.copy && this.flow){
this.source = this.flow.source;
} else if (this.$route.query.blueprintId) {
} else if (this.$route.query.blueprintId && this.$route.query.blueprintSource) {
this.source = await this.queryBlueprint(this.$route.query.blueprintId)
} else {
this.source = `id: myflow
@@ -86,7 +86,7 @@ tasks:
};
},
blueprintUri() {
return `${apiUrl(this.$store)}/blueprints/community`
return `${apiUrl(this.$store)}/blueprints/${this.$route.query.blueprintSource}`
},
flowParsed() {
return YamlUtils.parse(this.source);

View File

@@ -9,7 +9,7 @@
>
<el-table-column type="expand">
<template #default="props">
<LogsWrapper class="m-3" :filters="props.row" :charts="false" embed />
<LogsWrapper class="m-3" :filters="props.row" purge-filters :charts="false" embed />
</template>
</el-table-column>
<el-table-column prop="id" :label="$t('id')">

View File

@@ -21,7 +21,7 @@
:icon="ContentSave"
@click="saveTask"
v-if="canSave && !readOnly"
:disabled="errors"
:disabled="errors && !!errors.length"
type="primary"
>
{{ $t("save task") }}
@@ -160,8 +160,7 @@
if (this.task) {
this.taskYaml = YamlUtils.stringify(this.task);
if (this.task.type) {
this.$store
.dispatch("plugin/load", {cls: this.task.type})
this.$store.dispatch("plugin/load", {cls: this.task.type})
}
} else {
this.taskYaml = "";
@@ -173,8 +172,7 @@
handler() {
const task = YamlUtils.parse(this.taskYaml);
if (task?.type && task.type !== this.type) {
this.$store
.dispatch("plugin/load", {cls: task.type})
this.$store.dispatch("plugin/load", {cls: task.type})
this.type = task.type
}
},

View File

@@ -109,7 +109,7 @@
type: this.selectedTaskType
};
if (this.section !== SECTIONS.TRIGGERS) {
if (this.section !== SECTIONS.TRIGGERS && this.section !== SECTIONS.TASK_RUNNERS) {
value["id"] = this.taskObject && this.taskObject.id ? this.taskObject.id : "";
}

View File

@@ -11,6 +11,7 @@
:expanded-subflows="expandedSubflows"
view-type="topology"
@expand-subflow="onExpandSubflow($event)"
@on-edit="(event) => emit('on-edit', event, true)"
/>
<el-alert v-else type="warning" :closable="false">
{{ $t("unable to generate graph") }}
@@ -27,7 +28,7 @@
LowCodeEditor,
},
emits: [
"expand-subflow"
"expand-subflow", "on-edit"
],
props: {
isReadOnly: {

View File

@@ -41,8 +41,17 @@
<el-col :md="24" :lg="embed ? 24 : 18">
<h4>{{ $t("source") }}</h4>
<el-card>
<editor class="position-relative" :read-only="true" :input="true" :full-height="false" :minimap="false" :model-value="blueprint.flow" lang="yaml">
<template #nav>
<editor
class="position-relative"
:read-only="true"
:input="true"
:full-height="false"
:minimap="false"
:model-value="blueprint.flow"
lang="yaml"
:navbar="false"
>
<template #absolute>
<copy-to-clipboard class="position-absolute" :text="blueprint.flow" />
</template>
</editor>
@@ -91,7 +100,7 @@
label: this.$t("blueprints.title"),
link: {
name: "blueprints",
params: this.$route.params
params: this.$route.params.tab ? this.$route.params.tab : {...this.$route.params, tab: this.tab},
}
}
]
@@ -109,6 +118,10 @@
tab: {
type: String,
default: "community"
},
blueprintBaseUri: {
type: String,
default: undefined,
}
},
methods: {
@@ -119,22 +132,26 @@
this.$router.push({
name: "blueprints",
params: {
tenant: this.$route.params.tenant
tenant: this.$route.params.tenant,
tab: this.tab
}
})
}
}
},
async created() {
this.blueprint = (await this.$http.get(`${this.blueprintBaseUri}/${this.blueprintId}`)).data
const TAB = this.$route.query?.tab ?? (this.embed ? this.tab : (this.$route?.params?.tab ?? "community"));
const URL = this.blueprintBaseUri ?? `${apiUrl(this.$store)}/blueprints/` + TAB;
this.blueprint = (await this.$http.get(`${URL}/${this.blueprintId}`)).data;
try {
if (this.blueprintBaseUri.endsWith("community")) {
if (this.blueprintBaseUri?.endsWith("community")) {
this.flowGraph = (await this.$http.get(`${this.blueprintBaseUri}/${this.blueprintId}/graph`, {
validateStatus: (status) => {
return status === 200;
}
})).data;
}))?.data;
} else {
this.flowGraph = await this.$store.dispatch("flow/getGraphFromSourceResponse", {
flow: this.blueprint.flow, config: {
@@ -159,9 +176,6 @@
...YamlUtils.parse(this.blueprint.flow),
source: this.blueprint.flow
}
},
blueprintBaseUri() {
return `${apiUrl(this.$store)}/blueprints/` + (this.embed ? this.tab : (this.$route?.params?.tab ?? "community"));
}
}
};
@@ -257,4 +271,4 @@
}
}
}
</style>
</style>

View File

@@ -49,6 +49,10 @@ export default {
return "condition"
}
if (property.$ref.includes("tasks.runners.TaskRunner")) {
return "task-runner"
}
return "complex";
}

View File

@@ -32,13 +32,13 @@
mixins: [Task],
emits: ["update:modelValue"],
created() {
if (!Array.isArray(this.modelValue)) {
if (!Array.isArray(this.modelValue) && this.modelValue !== undefined) {
this.$emit("update:modelValue", []);
}
},
computed: {
values() {
if (this.modelValue === undefined || (Array.isArray(this.modelValue) && this.modelValue.length === 0)) {
if (this.modelValue === undefined) {
return this.schema.default || [undefined];
}

View File

@@ -19,7 +19,7 @@
<task-editor
ref="editor"
:model-value="taskYaml"
:section="SECTIONS.TASKS"
:section="section"
@update:model-value="onInput"
/>
</el-form>
@@ -34,7 +34,6 @@
<script setup>
import TextSearch from "vue-material-design-icons/TextSearch.vue";
import ContentSave from "vue-material-design-icons/ContentSave.vue";
import {SECTIONS} from "../../../utils/constants.js";
</script>
<script>
@@ -42,11 +41,18 @@
import YamlUtils from "../../../utils/yamlUtils";
import TaskEditor from "../TaskEditor.vue"
import Drawer from "../../Drawer.vue"
import {SECTIONS as SECTION} from "../../../utils/constants.js";
export default {
mixins: [Task],
components: {TaskEditor, Drawer},
emits: ["update:modelValue"],
props: {
section: {
type: String,
default: SECTION.TASKS
},
},
data() {
return {
isOpen: false,

View File

@@ -0,0 +1,9 @@
<template>
<task-task @update:model-value="$emit('update:modelValue', $event)" v-bind="$attrs" :section="SECTION.TASK_RUNNERS" />
</template>
<script setup>
import {SECTIONS as SECTION} from "../../../utils/constants.js";
import TaskTask from "./TaskTask.vue";
defineEmits(["update:modelValue"]);
</script>

View File

@@ -45,6 +45,14 @@
@update:filter-value="updateQuery"
/>
</el-form-item>
<el-form-item>
<scope-filter-buttons
:label="$t('data')"
:value="scope"
:system="namespace === 'system'"
@update:model-value="onScopeSelect"
/>
</el-form-item>
<el-form-item>
<refresh-button class="float-right" @refresh="load" :can-auto-refresh="canAutoRefresh" />
</el-form-item>
@@ -236,7 +244,8 @@
namespaceRestricted: !!this.namespace,
refreshDates: false,
canAutoRefresh: false,
state: []
state: [],
scope: []
};
},
methods: {

View File

@@ -27,7 +27,7 @@
</div>
</slot>
</nav>
<slot name="absolute" />
<div class="editor-container" ref="container" :class="containerClass">
<div ref="editorContainer" class="editor-wrapper position-relative">
<monaco-editor
@@ -96,7 +96,7 @@
components: {
MonacoEditor,
},
emits: ["save", "execute", "focusout", "tab", "update:modelValue", "cursor"],
emits: ["save", "execute", "focusout", "tab", "update:modelValue", "cursor", "confirm"],
editor: undefined,
data() {
return {
@@ -155,11 +155,14 @@
vertical: "hidden",
horizontal: "hidden",
alwaysConsumeMouseWheel: false,
handleMouseWheel: false,
handleMouseWheel: true,
horizontalScrollbarSize: 0,
verticalScrollbarSize: 0,
useShadows: false,
};
options.stickyScroll = {
enabled: false
};
options.find = {
addExtraSpaceOnTop: false,
autoFindInSelection: "never",
@@ -262,6 +265,19 @@
}
});
this.editor.addAction({
id: "confirm",
label: "Confirm",
keybindings: [
KeyMod.CtrlCmd | KeyCode.Enter,
],
contextMenuGroupId: "navigation",
contextMenuOrder: 1.5,
run: (ed) => {
this.$emit("confirm", ed.getValue())
}
});
// TabFocus is global to all editor so revert the behavior on non inputs
this.editor.onDidFocusEditorText?.(() => {
TabFocus.setTabFocusMode(this.input);
@@ -316,6 +332,7 @@
if (!this.fullHeight) {
editor.onDidContentSizeChange(e => {
if(!this.$refs.container) return;
this.$refs.container.style.height = (e.contentHeight + this.customHeight) + "px";
});
}

Some files were not shown because too many files have changed in this diff Show More