Compare commits

...

69 Commits

Author SHA1 Message Date
brian.mulier
9da8ba2f22 fix(tests): wrong task run count 2025-12-04 17:46:34 +01:00
brian-mulier-p
20330384ca fix(core): safeguard for null flow when trying to reset trigger in JdbcExecutor (#13381) 2025-12-04 12:47:31 +01:00
Miloš Paunović
d5ba45acba refactor(core): remove all traces of the old e2e setup (#13356) 2025-12-04 12:15:28 +01:00
github-actions[bot]
f701f15dcb chore(version): update to version '1.0.16' 2025-12-04 10:10:34 +00:00
brian.mulier
55a507b621 fix(core): deprecate Await util (#13369)
This reverts commit 9fa94deba9.
2025-12-04 10:14:47 +01:00
Loïc Mathieu
3cd357f311 chore(system): compilation issue after merge 2025-12-04 10:07:09 +01:00
Loïc Mathieu
c3e0b6d740 fix(execution): NORMAL kind should also be retrieved
Fixes #13262
2025-12-03 13:01:52 +01:00
YannC
3209ea9657 fix: correct regex when importing flow (#13320) 2025-12-03 09:06:53 +01:00
Loïc Mathieu
72b261129d fix(executions): support Download content dispositions with brackets
By escaping them with %5B and %5D.

Fixes #13299
2025-12-02 16:03:47 +01:00
github-actions[bot]
26c83390ba chore(version): update to version '1.0.15' 2025-12-02 14:01:04 +00:00
brian-mulier-p
7ba6bc6d30 fix(executions): avoid infinite loop in some cases of execution failure (#13293) 2025-12-02 13:12:49 +01:00
kkash08
fc86ef7bb4 Fix ZIP download so that file extension remains .yaml 2025-12-02 09:25:49 +01:00
brian.mulier
69b46fa3b8 fix(tests): concurrency limit test was emitting duplicate execution 2025-12-01 22:40:57 +01:00
brian.mulier
d681e349a1 refacto(core): compilation issue after Await.until renaming 2025-12-01 19:51:07 +01:00
brian.mulier
165951a8f3 refacto(core): rename Await.until(sleep) and (timeout) to avoid confusions 2025-12-01 18:55:50 +01:00
brian.mulier
be8de252ae fix(tests): ensure Executor is running before proceeding 2025-12-01 18:45:27 +01:00
brian.mulier
8a6093615a fix(executions): avoid JdbcExecutor from being stuck due to missing flow
In tests it can occur for example
2025-12-01 18:43:18 +01:00
brian.mulier
09b6964f16 fix(tests): use another db name on webserver to avoid colliding with repositories 2025-12-01 18:34:40 +01:00
brian-mulier-p
7f2d4d02d6 fix(core): concurrency limit on JDBC was decrementing when using FAIL or CANCEL behavior (#13220)
closes https://github.com/kestra-io/kestra/issues/13141
2025-12-01 13:09:16 +01:00
Roman Acevedo
7e200d9ebc fix(core): make sure inputs use defaults 2025-11-28 15:54:41 +01:00
Roman Acevedo
d361c33f63 fix(backfills): avoid console error after backfilling 2025-11-28 15:54:41 +01:00
Roman Acevedo
31438ffff0 test: useAxios is not available in this version 2025-11-28 15:54:41 +01:00
Roman Acevedo
18caf45521 fix(backfills): inputs was always the default one in the ui
- fix https://github.com/kestra-io/kestra/issues/13143
2025-11-28 15:54:41 +01:00
Loïc Mathieu
50d6de75f4 fix(executions): don't ends flowable if any subtasks should be retried
Fixes #11444
2025-11-28 11:08:14 +01:00
Loïc Mathieu
4c054f9d24 fix(execution): sequential with empty subtasks should ends in SUCCESS
Fixes https://github.com/kestra-io/kestra-ee/issues/5714

It fixes the aforementionned issue as there is a race with Parallel and restart which is caused by subsequent updates on the execution ending in a state where the parallel has no more task to process by didn't ends normally as it should have some.
2025-11-26 18:12:10 +01:00
Loïc Mathieu
5bad8dd3c7 feat(execution): add an attemps on skipped tasks 2025-11-26 18:12:03 +01:00
github-actions[bot]
69b1921236 chore(version): update to version '1.0.14' 2025-11-25 12:53:12 +00:00
Miloš Paunović
4e99a253e3 fix(core): redirect welcome page action button to flow creation in the enterprise edition (#13136)
Closes https://github.com/kestra-io/kestra-ee/issues/5933.
2025-11-25 08:16:04 +01:00
Loïc Mathieu
97d0a93e01 fix(system): WorkerTask should not FAILED when interrupting so they would be resubmitted
When a Worker is stopping, it will first wait for all running tasks to stop, then kill them. For those that didn't implement kill their thread would be interrupted.

But if the task is properly killed, or support interrupts (like the Sleep task), it would ends in FAILED then a WorkerTaskWould be send that would fail the flow preventing the WorkerTask to be resubmitted.

We nows check if the worker is terminating and should resubmit, in this case we didn't emit any WorkerTaskResult

Fixes #13108
Part-of: https://github.com/kestra-io/kestra-ee/issues/5556
2025-11-24 12:26:59 +01:00
Loïc Mathieu
e2d8d51843 fix(execution): improve property skip cache
When using Property.ofExpression(), the cache should never be used as this is usually used as providing a default value inside a task, which can change from rendering to rendering as it's an expression.

Also retain skipCache in a boolean so it can be rendered more than 2 times ans still skip the cache.

It should prevent future issues like #13027
2025-11-20 10:40:41 +01:00
github-actions[bot]
8567ff5490 chore(version): update to version '1.0.13' 2025-11-18 13:10:22 +00:00
Loïc Mathieu
050e22dd09 fix(execution): use jdbcRepository.findOne to be tolerant of multiple results
It uses findAny() under the cover which does not throw if more than one result is returned.

Fixes #12943
2025-11-18 10:23:39 +01:00
Florian Hussonnois
3552eeefbb fix(scheduler): mysql convert 'now' to UTC to avoid any offset error on next_execution_date
Fixed a previous commit to only apply the change for MySQL

Related-to: kestra-io/kestra-ee#5611
2025-11-18 09:58:01 +01:00
Loïc Mathieu
2e47fb8285 fix(core): compilation issue 2025-11-17 15:07:33 +01:00
Piyush Bhaskar
b52a07e562 fix(core): add resize observer for editor container (#12991) 2025-11-17 14:00:19 +05:30
Loïc Mathieu
3f7c01db41 fix(flow): flow trigger with both conditions and preconditions
When a flow have both a condition and a precondition, the condition was evaluated twice which lead to double execution triggered.

Fixes
2025-11-14 18:11:24 +01:00
MilosPaunovic
f5dbec96e0 chore(core): count only direct dependencies for badge number
Closes https://github.com/kestra-io/kestra/issues/12817.
2025-11-14 08:20:14 +01:00
github-actions[bot]
fe7a6d9af9 chore(version): update to version '1.0.12' 2025-11-13 13:42:02 +00:00
Loïc Mathieu
06c8c35061 fix(flow): don't URLEncode the fileName inside the Download task
Also provide a `fileName` property that when set would override any filename from the content disposition in case it causes issues.
2025-11-13 11:12:18 +01:00
Loïc Mathieu
8f23e813f2 fix(system): consume the trigger queue so it is properly cleaned
Fixes https://github.com/kestra-io/kestra/issues/11671
2025-11-13 11:12:02 +01:00
Piyush Bhaskar
47b7c7cd2e fix(core): adjust overflow behavior (#12879) 2025-11-13 13:59:42 +05:30
Loïc Mathieu
aca7c2f694 fix(system): access log configuration
Due to a change in the configuration file, access log configuration was in the wrong sub-document.

Fixes https://github.com/kestra-io/kestra-ee/issues/5670
2025-11-12 15:09:06 +01:00
mustafatarek
a0f29b7d5d feat(core): add attempts for flowable tasks 2025-11-12 09:35:31 +01:00
Piyush Bhaskar
0176c8c101 fix(secrets): NS update for a secret should be disabled properly with correct prop (#12834) 2025-11-12 12:41:55 +05:30
YannC
b0036bbfca fix: where prop can be null (#12828) 2025-11-10 18:41:22 +01:00
github-actions[bot]
fad5edbde8 chore(version): update to version '1.0.11' 2025-11-10 14:35:23 +00:00
Loïc Mathieu
f125f63ae5 fix(executions): allow reading from subflow even if we have a parent
This fixes an issue where you cannot read from a Subflow file if the execution has iteself be triggered by another Subflow task.
It was caused by the trigger check beeing too aggressive, if it didn't pass the check it fail instead of return false so the other check would not be processed.

Fixes #12629
2025-11-10 13:27:06 +01:00
Florian Hussonnois
6db1bfb2ce fix(core): fix plugin stable version resolution (kestra-io/kestra-ee#5129)
Rename incremental field to patch

Fixes: kestra-io/kestra-ee#5129
2025-11-10 11:05:40 +01:00
Florian Hussonnois
0957e07c78 fix(plugins): remove regex validation on version property
Changes:
* Fixes stable method in Version class
* Remove regex validation on 'version' property

Related-to: kestra-io/kestra-ee#5090
2025-11-10 11:05:39 +01:00
Florian Hussonnois
5a4a5e44df fix(core): resolution of plugin must be done with a stable version 2025-11-10 11:05:39 +01:00
Florian Hussonnois
faee3f1827 fix(core): fix PluginCatalogService resolve method 2025-11-10 11:05:39 +01:00
Florian Hussonnois
3604762da0 fix(system): add resolveVersions method to PluginCatalogService
Related-to: kestra-io/kestra-ee#5171
2025-11-10 11:05:38 +01:00
YannC
6ceb0de1d5 fix: when removing a queued execution, directly delete instead of fetching then delete to reduce deadlock (#12789) 2025-11-10 10:32:23 +01:00
Loïc Mathieu
4a62f9c818 fix(executions): don't urlencode files as they would already be inside the storage 2025-11-10 09:28:30 +01:00
brian-mulier-p
d14f3e3317 fix(tests): bump amount of threads on tests (#12777) 2025-11-07 09:44:44 +01:00
Piyush Bhaskar
7e9030dfcf refactor(core): properly do trigger filter (#12780) 2025-11-07 11:46:23 +05:30
Ludovic DEHON
2fce17a8a9 feat(cli): add --flow-path on executor to preload some flows
close kestra-io/kestra-ee#5721
2025-11-06 19:26:04 +01:00
Loïc Mathieu
67d8509106 fix(system): killing a paused flow should kill the Pause task attempt
Fixes #12421
2025-11-06 15:34:19 +01:00
Piyush Bhaskar
01e92a6d79 Revert "fix(core): apply timeRange filter in triggers (#12721)" 2025-11-06 19:07:27 +05:30
Piyush Bhaskar
883b7c8610 fix(core): apply timeRange filter in triggers (#12721) 2025-11-06 16:31:48 +05:30
Piyush Bhaskar
11ef823567 fix(core): remove double info icon (#12623) 2025-11-06 11:54:07 +05:30
Loïc Mathieu
771cca1441 fix(system): trigger an execution once per condition on flow triggers
Fixes #12560
2025-11-05 15:33:44 +01:00
YannC.
53e8674dfc fix: set FlowWithSource as implementation for getFlow method 2025-11-04 16:14:51 +01:00
github-actions[bot]
59016ae1af chore(version): update to version '1.0.10' 2025-11-04 13:52:28 +00:00
Roman Acevedo
7503d6fa21 test: set retryWithFlowableErrors as FlakyTest 2025-11-04 13:46:49 +01:00
Roman Acevedo
0234a4c64c test(kv): only plain text header is sent now 2025-11-04 13:15:36 +01:00
Roman Acevedo
98c9c4d21f Fix/sdk changes (#12411)
* fix: kv controller remove namespace check

* clean(API): add query to filter parameter

* fix: flow update not deprecated

* clean(API): add deprecated on open api

* feat: executions annotations for skipping, follow method generation in sdk

* feat: add typing indication to validateTask

* fix(flowController): set correct hidden for json method in

* fix: optional params in delete executions endpoints

* fix: inputs/outputs as object

* change KV schema type to be object

* add back , deprecated = false on flow update, otherwise its marked as deprecated

* Revert "add back , deprecated = false on flow update, otherwise its marked as deprecated"

This reverts commit 3772404b68f14f0a80af9e0adb9952d58e9102b4.

* feat(API): add multipart to openAPI

* feat(API): add multipart to openAPI

* fix: only use plain-text for setKeyValue endpoint

* fix: KV command test

* chore: add multipart vendor annotations for custom generation on SDK

---------

Co-authored-by: YannC. <ycoornaert@kestra.io>
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-11-03 18:05:46 +01:00
github-actions[bot]
8e54183a44 chore(version): update to version '1.0.9' 2025-11-03 11:11:56 +00:00
github-actions[bot]
8aa332c629 chore(core): localize to languages other than english (#12550)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-11-03 10:22:11 +01:00
113 changed files with 3590 additions and 921 deletions

View File

@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
Duration ttl = expiration == null ? null : Duration.parse(expiration);
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
.contentType(MediaType.APPLICATION_JSON_TYPE);
.contentType(MediaType.TEXT_PLAIN);
if (ttl != null) {
request.header("ttl", ttl.toString());

View File

@@ -1,7 +1,9 @@
package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
@@ -10,6 +12,8 @@ import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -19,6 +23,9 @@ import java.util.Map;
description = "Start the Kestra executor"
)
public class ExecutorCommand extends AbstractServerCommand {
@CommandLine.Spec
CommandLine.Model.CommandSpec spec;
@Inject
private ApplicationContext applicationContext;
@@ -28,22 +35,28 @@ public class ExecutorCommand extends AbstractServerCommand {
@Inject
private StartExecutorService startExecutorService;
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "The list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "Tenant identifier required to load flows from the specified path")
private File flowPath;
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path")
private String tenantId;
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "List of execution IDs to skip, separated by commas; for troubleshooting only")
private List<String> skipExecutions = Collections.emptyList();
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "The list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "List of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting only")
private List<String> skipFlows = Collections.emptyList();
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "The list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "List of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting only")
private List<String> skipNamespaces = Collections.emptyList();
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "The list of tenants to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "List of tenants to skip, separated by a coma; for troubleshooting only")
private List<String> skipTenants = Collections.emptyList();
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "The list of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "List of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue; for debugging only")
private List<String> startExecutors = Collections.emptyList();
@CommandLine.Option(names = {"--not-start-executors"}, split=",", description = "The list of Kafka Stream executors to not start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
@CommandLine.Option(names = {"--not-start-executors"}, split=",", description = "Lst of Kafka Stream executors to not start, separated by a command. Use it only with the Kafka queue; for debugging only")
private List<String> notStartExecutors = Collections.emptyList();
@SuppressWarnings("unused")
@@ -64,6 +77,16 @@ public class ExecutorCommand extends AbstractServerCommand {
super.call();
if (flowPath != null) {
try {
LocalFlowRepositoryLoader localFlowRepositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
TenantIdSelectorService tenantIdSelectorService = applicationContext.getBean(TenantIdSelectorService.class);
localFlowRepositoryLoader.load(tenantIdSelectorService.getTenantId(this.tenantId), this.flowPath);
} catch (IOException e) {
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow path", e);
}
}
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
executorService.run();

View File

@@ -23,7 +23,7 @@ public class IndexerCommand extends AbstractServerCommand {
@Inject
private SkipExecutionService skipExecutionService;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
private List<String> skipIndexerRecords = Collections.emptyList();
@SuppressWarnings("unused")

View File

@@ -42,7 +42,7 @@ public class StandAloneCommand extends AbstractServerCommand {
@Nullable
private FileChangedEventListener fileWatcher;
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "the flow path containing flow to inject at startup (when running with a memory flow repository)")
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "Tenant identifier required to load flows from the specified path")
private File flowPath;
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
@@ -51,19 +51,19 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
private int workerThread = defaultWorkerThread();
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting only")
private List<String> skipExecutions = Collections.emptyList();
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting only")
private List<String> skipFlows = Collections.emptyList();
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting only")
private List<String> skipNamespaces = Collections.emptyList();
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting only")
private List<String> skipTenants = Collections.emptyList();
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
private List<String> skipIndexerRecords = Collections.emptyList();
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")

View File

@@ -40,7 +40,7 @@ public class WebServerCommand extends AbstractServerCommand {
@Option(names = {"--no-indexer"}, description = "Flag to disable starting an embedded indexer.")
private boolean indexerDisabled = false;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
private List<String> skipIndexerRecords = Collections.emptyList();
@Override

View File

@@ -30,15 +30,15 @@ micronaut:
read-idle-timeout: 60m
write-idle-timeout: 60m
idle-timeout: 60m
netty:
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
max-chunk-size: 10MB
max-header-size: 32768 # increased from the default of 8k
responses:
file:
cache-seconds: 86400
cache-control:
public: true
netty:
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
max-chunk-size: 10MB
max-header-size: 32768 # increased from the default of 8k
# Access log configuration, see https://docs.micronaut.io/latest/guide/index.html#accessLogger
access-logger:

View File

@@ -0,0 +1,23 @@
package io.kestra.core.exceptions;
/**
* Exception that can be thrown when a Flow is not found.
*/
public class FlowNotFoundException extends NotFoundException {
/**
* Creates a new {@link FlowNotFoundException} instance.
*/
public FlowNotFoundException() {
super();
}
/**
* Creates a new {@link NotFoundException} instance.
*
* @param message the error message.
*/
public FlowNotFoundException(final String message) {
super(message);
}
}

View File

@@ -7,7 +7,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@@ -65,7 +64,7 @@ public interface HasSource {
if (isYAML(fileName)) {
byte[] bytes = inputStream.readAllBytes();
List<String> sources = List.of(new String(bytes).split("---"));
List<String> sources = List.of(new String(bytes).split("(?m)^---\\s*$"));
for (int i = 0; i < sources.size(); i++) {
String source = sources.get(i);
reader.accept(source, String.valueOf(i));

View File

@@ -1,16 +1,33 @@
package io.kestra.core.models;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Pattern;
import java.util.List;
import java.util.Map;
/**
* Interface that can be implemented by classes supporting plugin versioning.
*
* @see Plugin
*/
public interface PluginVersioning {
String TITLE = "Plugin Version";
String DESCRIPTION = """
Defines the version of the plugin to use.
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
@Schema(title = "The version of the plugin to use.")
The version must follow the Semantic Versioning (SemVer) specification:
- A single-digit MAJOR version (e.g., `1`).
- A MAJOR.MINOR version (e.g., `1.1`).
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
""";
@Schema(
title = TITLE,
description = DESCRIPTION
)
String getVersion();
}

View File

@@ -5,6 +5,8 @@ import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.filters.AbstractFilter;
import io.kestra.core.repositories.QueryBuilderInterface;
import io.kestra.plugin.core.dashboard.data.IData;
import jakarta.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -36,6 +38,8 @@ public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F
private Map<String, C> columns;
@Setter
@Valid
@Nullable
private List<AbstractFilter<F>> where;
private List<OrderBy> orderBy;

View File

@@ -28,6 +28,7 @@ import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -77,10 +78,12 @@ public class Execution implements DeletedInterface, TenantInterface {
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> inputs;
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> outputs;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@@ -88,6 +91,7 @@ public class Execution implements DeletedInterface, TenantInterface {
List<Label> labels;
@With
@Schema(implementation = Object.class)
Map<String, Object> variables;
@NotNull
@@ -647,18 +651,20 @@ public class Execution implements DeletedInterface, TenantInterface {
public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
return this.findTaskRunByTasks(resolvedTasks, parentTaskRun)
.stream()
.anyMatch(taskRun -> {
ResolvedTask resolvedTask = resolvedTasks.stream()
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
.orElse(null);
if (resolvedTask == null) {
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry())
&& taskRun.getState().isFailed();
});
// NOTE: we check on isFailed first to avoid the costly shouldBeRetried() method
.anyMatch(taskRun -> taskRun.getState().isFailed() && shouldNotBeRetried(resolvedTasks, parentTaskRun, taskRun));
}
private static boolean shouldNotBeRetried(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun, TaskRun taskRun) {
ResolvedTask resolvedTask = resolvedTasks.stream()
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
.orElse(null);
if (resolvedTask == null) {
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry());
}
public boolean hasCreated() {

View File

@@ -3,10 +3,13 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -53,6 +56,7 @@ public class TaskRun implements TenantInterface {
@With
@JsonInclude(JsonInclude.Include.ALWAYS)
@Nullable
@Schema(implementation = Object.class)
Variables outputs;
@NotNull
@@ -310,4 +314,11 @@ public class TaskRun implements TenantInterface {
.build();
}
public TaskRun addAttempt(TaskRunAttempt attempt) {
if (this.attempts == null) {
this.attempts = new ArrayList<>();
}
this.attempts.add(attempt);
return this;
}
}

View File

@@ -24,4 +24,8 @@ public class Concurrency {
public enum Behavior {
QUEUE, CANCEL, FAIL;
}
public static boolean possibleTransitions(State.Type type) {
return type.equals(State.Type.CANCELLED) || type.equals(State.Type.FAILED);
}
}

View File

@@ -35,7 +35,6 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@JsonDeserialize(using = Property.PropertyDeserializer.class)
@JsonSerialize(using = Property.PropertySerializer.class)
@Builder
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE)
@Schema(
oneOf = {
@@ -51,6 +50,7 @@ public class Property<T> {
.copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
private final boolean skipCache;
private String expression;
private T value;
@@ -60,13 +60,23 @@ public class Property<T> {
@Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) {
this.expression = expression;
this(expression, false);
}
private Property(String expression, boolean skipCache) {
this.expression = expression;
this.skipCache = skipCache;
}
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@VisibleForTesting
@Deprecated
public Property(Map<?, ?> map) {
try {
expression = MAPPER.writeValueAsString(map);
this.skipCache = false;
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
@@ -79,9 +89,6 @@ public class Property<T> {
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
*
* @return a new {@link Property} without a pre-rendered value
*/
@@ -133,6 +140,7 @@ public class Property<T> {
/**
* Build a new Property object with a Pebble expression.<br>
* This property object will not cache its rendered value.
* <p>
* Use {@link #ofValue(Object)} to build a property with a value instead.
*/
@@ -142,11 +150,11 @@ public class Property<T> {
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
}
return new Property<>(expression);
return new Property<>(expression, true);
}
/**
* Render a property then convert it to its target type.<br>
* Render a property, then convert it to its target type.<br>
* <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
@@ -164,7 +172,7 @@ public class Property<T> {
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
if (property.skipCache || property.value == null) {
String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -192,7 +200,7 @@ public class Property<T> {
*/
@SuppressWarnings("unchecked")
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
if (property.skipCache || property.value == null) {
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
try {
String trimmedExpression = property.expression.trim();
@@ -244,7 +252,7 @@ public class Property<T> {
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
if (property.skipCache || property.value == null) {
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);
try {

View File

@@ -1,5 +1,6 @@
package io.kestra.core.plugins;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Builder;
import java.io.File;
@@ -33,7 +34,7 @@ public record PluginArtifact(
String version,
URI uri
) implements Comparable<PluginArtifact> {
private static final Pattern ARTIFACT_PATTERN = Pattern.compile(
"([^: ]+):([^: ]+)(:([^: ]*)(:([^: ]+))?)?:([^: ]+)"
);
@@ -42,7 +43,8 @@ public record PluginArtifact(
);
public static final String JAR_EXTENSION = "jar";
public static final String KESTRA_GROUP_ID = "io.kestra";
/**
* Static helper method for constructing a new {@link PluginArtifact} from a JAR file.
*
@@ -135,6 +137,11 @@ public record PluginArtifact(
public String toString() {
return toCoordinates();
}
@JsonIgnore
public boolean isOfficial() {
return groupId.startsWith(KESTRA_GROUP_ID);
}
public String toCoordinates() {
return Stream.of(groupId, artifactId, extension, classifier, version)

View File

@@ -1,9 +1,13 @@
package io.kestra.core.plugins;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Version;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.HttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,9 +19,12 @@ import java.util.Base64;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Services for retrieving available plugin artifacts for Kestra.
@@ -39,6 +46,8 @@ public class PluginCatalogService {
private final boolean icons;
private final boolean oss;
private final Version currentStableVersion;
/**
* Creates a new {@link PluginCatalogService} instance.
@@ -53,11 +62,55 @@ public class PluginCatalogService {
this.httpClient = httpClient;
this.icons = icons;
this.oss = communityOnly;
Version version = Version.of(KestraContext.getContext().getVersion());
this.currentStableVersion = new Version(version.majorVersion(), version.minorVersion(), version.patchVersion(), null);
// Immediately trigger an async load of plugin artifacts.
this.isLoaded.set(true);
this.plugins = CompletableFuture.supplyAsync(this::load);
}
/**
* Resolves the version for the given artifacts.
*
* @param artifacts The list of artifacts to resolve.
* @return The list of results.
*/
public List<PluginResolutionResult> resolveVersions(List<PluginArtifact> artifacts) {
if (ListUtils.isEmpty(artifacts)) {
return List.of();
}
final Map<String, ApiPluginArtifact> pluginsByGroupAndArtifactId = getAllCompatiblePlugins().stream()
.collect(Collectors.toMap(it -> it.groupId() + ":" + it.artifactId(), Function.identity()));
return artifacts.stream().map(it -> {
// Get all compatible versions for current artifact
List<String> versions = Optional
.ofNullable(pluginsByGroupAndArtifactId.get(it.groupId() + ":" + it.artifactId()))
.map(ApiPluginArtifact::versions)
.orElse(List.of());
// Try to resolve the version
String resolvedVersion = null;
if (!versions.isEmpty()) {
if (it.version().equalsIgnoreCase("LATEST")) {
resolvedVersion = versions.getFirst();
} else {
resolvedVersion = versions.contains(it.version()) ? it.version() : null;
}
}
// Build the PluginResolutionResult
return new PluginResolutionResult(
it,
resolvedVersion,
versions,
resolvedVersion != null
);
}).toList();
}
public synchronized List<PluginManifest> get() {
try {
@@ -140,7 +193,27 @@ public class PluginCatalogService {
isLoaded.set(false);
}
}
private List<ApiPluginArtifact> getAllCompatiblePlugins() {
MutableHttpRequest<Object> request = HttpRequest.create(
HttpMethod.GET,
"/v1/plugins/artifacts/core-compatibility/" + currentStableVersion
);
if (oss) {
request.getParameters().add("license", "OPENSOURCE");
}
try {
return httpClient
.toBlocking()
.exchange(request, Argument.listOf(ApiPluginArtifact.class))
.body();
} catch (Exception e) {
log.debug("Failed to retrieve available plugins from Kestra API. Cause: ", e);
return List.of();
}
}
public record PluginManifest(
String title,
String icon,
@@ -153,4 +226,11 @@ public class PluginCatalogService {
return groupId + ":" + artifactId + ":LATEST";
}
}
public record ApiPluginArtifact(
String groupId,
String artifactId,
String license,
List<String> versions
) {}
}

View File

@@ -144,7 +144,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::asText).orElse(null);
if (type == null || type.isEmpty()) {
return null;

View File

@@ -56,12 +56,10 @@ public final class ExecutableUtils {
}
public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskrun, Execution execution) {
List<TaskRunAttempt> attempts = parentTaskrun.getAttempts() == null ? new ArrayList<>() : new ArrayList<>(parentTaskrun.getAttempts());
attempts.add(TaskRunAttempt.builder().state(parentTaskrun.getState()).build());
return SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(parentTaskrun.getState().getCurrent())
.parentTaskRun(parentTaskrun.withAttempts(attempts))
.parentTaskRun(parentTaskrun.addAttempt(TaskRunAttempt.builder().state(parentTaskrun.getState()).build()))
.build();
}

View File

@@ -82,8 +82,7 @@ public abstract class FilesService {
}
private static String resolveUniqueNameForFile(final Path path) {
String filename = path.getFileName().toString();
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
return IdUtils.from(path.toString()) + "-" + encodedFilename;
String filename = path.getFileName().toString().replace(' ', '+');
return IdUtils.from(path.toString()) + "-" + filename;
}
}

View File

@@ -11,6 +11,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.Dag;
import java.util.*;
@@ -152,6 +153,35 @@ public class FlowableUtils {
return Collections.emptyList();
}
public static Optional<State.Type> resolveSequentialState(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> _finally,
TaskRun parentTaskRun,
RunContext runContext,
boolean allowFailure,
boolean allowWarning
) {
if (ListUtils.emptyOnNull(tasks).stream()
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
.findAny()
.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return resolveState(
execution,
tasks,
errors,
_finally,
parentTaskRun,
runContext,
allowFailure,
allowWarning
);
}
public static Optional<State.Type> resolveState(
Execution execution,
List<ResolvedTask> tasks,
@@ -207,7 +237,7 @@ public class FlowableUtils {
}
} else {
// first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows
if (execution.hasFailed(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
if (execution.hasFailedNoRetry(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
}
}

View File

@@ -3,14 +3,19 @@ package io.kestra.core.runners;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.utils.Await;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@@ -34,9 +39,16 @@ public class RunnerUtils {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
private ExecutionService executionService;
@@ -146,6 +158,11 @@ public class RunnerUtils {
}), duration);
}
@VisibleForTesting
public Execution awaitExecution(Predicate<Execution> predicate, Duration duration) throws TimeoutException {
return awaitExecution(predicate, () -> {}, duration);
}
@VisibleForTesting
public Execution awaitExecution(Predicate<Execution> predicate, Runnable executionEmitter, Duration duration) throws TimeoutException {
AtomicReference<Execution> receive = new AtomicReference<>();
@@ -172,6 +189,62 @@ public class RunnerUtils {
return receive.get();
}
public List<Execution> awaitFlowExecutionNumber(int number, String tenantId, String namespace, String flowId) {
return awaitFlowExecutionNumber(number, tenantId, namespace, flowId, null);
}
public List<Execution> awaitFlowExecutionNumber(int number, String tenantId, String namespace, String flowId, Duration duration) {
AtomicReference<List<Execution>> receive = new AtomicReference<>();
Flow flow = flowRepository
.findById(tenantId, namespace, flowId, Optional.empty())
.orElseThrow(
() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'"));
try {
if (duration == null){
duration = Duration.ofSeconds(20);
}
Await.until(() -> {
ArrayListTotal<Execution> byFlowId = executionRepository.findByFlowId(
tenantId, namespace, flowId, Pageable.UNPAGED);
if (byFlowId.size() == number
&& byFlowId.stream()
.filter(e -> executionService.isTerminated(flow, e))
.toList().size() == number) {
receive.set(byFlowId);
return true;
}
return false;
}, Duration.ofMillis(50), duration);
} catch (TimeoutException e) {
ArrayListTotal<Execution> byFlowId = executionRepository.findByFlowId(
tenantId, namespace, flowId, Pageable.UNPAGED);
if (!byFlowId.isEmpty()) {
throw new RuntimeException("%d Execution found for flow %s, but %d where awaited".formatted(byFlowId.size(), flowId, number));
} else {
throw new RuntimeException("No execution for flow %s exist in the database".formatted(flowId));
}
}
return receive.get();
}
public Execution killExecution(Execution execution) throws QueueException, TimeoutException {
killQueue.emit(ExecutionKilledExecution.builder()
.executionId(execution.getId())
.isOnKillCascade(true)
.state(ExecutionKilled.State.REQUESTED)
.tenantId(execution.getTenantId())
.build());
return awaitExecution(isTerminatedExecution(
execution,
flowRepository
.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), Optional.ofNullable(execution.getFlowRevision()))
.orElse(null)
), throwRunnable(() -> this.executionQueue.emit(execution)), Duration.ofSeconds(60));
}
@VisibleForTesting
public Execution awaitChildExecution(Flow flow, Execution parentExecution, Runnable executionEmitter, Duration duration) throws TimeoutException {
return this.awaitExecution(isTerminatedChildExecution(parentExecution, flow), executionEmitter, duration);

View File

@@ -151,10 +151,7 @@ abstract class AbstractFileFunction implements Function {
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);
if (!isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path)) {
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the parent execution");
}
return true;
return isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path);
}
return false;
}

View File

@@ -383,6 +383,7 @@ public class ExecutionService {
if (!isFlowable || s.equals(taskRunId)) {
TaskRun newTaskRun;
State.Type targetState = newState;
if (task instanceof Pause pauseTask) {
State.Type terminalState = newState == State.Type.RUNNING ? State.Type.SUCCESS : newState;
Pause.Resumed _resumed = resumed != null ? resumed : Pause.Resumed.now(terminalState);
@@ -392,23 +393,23 @@ public class ExecutionService {
// if it's a Pause task with no subtask, we terminate the task
if (ListUtils.isEmpty(pauseTask.getTasks()) && ListUtils.isEmpty(pauseTask.getErrors()) && ListUtils.isEmpty(pauseTask.getFinally())) {
if (newState == State.Type.RUNNING) {
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
targetState = State.Type.SUCCESS;
} else if (newState == State.Type.KILLING) {
newTaskRun = newTaskRun.withState(State.Type.KILLED);
} else {
newTaskRun = newTaskRun.withState(newState);
targetState = State.Type.KILLED;
}
} else {
// we should set the state to RUNNING so that subtasks are executed
newTaskRun = newTaskRun.withState(State.Type.RUNNING);
targetState = State.Type.RUNNING;
}
newTaskRun = newTaskRun.withState(targetState);
} else {
newTaskRun = originalTaskRun.withState(newState);
newTaskRun = originalTaskRun.withState(targetState);
}
if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {
ArrayList<TaskRunAttempt> attempts = new ArrayList<>(originalTaskRun.getAttempts());
attempts.set(attempts.size() - 1, attempts.getLast().withState(newState));
attempts.set(attempts.size() - 1, attempts.getLast().withState(targetState));
newTaskRun = newTaskRun.withAttempts(attempts);
}

View File

@@ -32,48 +32,84 @@ public class Version implements Comparable<Version> {
* @param version the version.
* @return a new {@link Version} instance.
*/
public static Version of(String version) {
public static Version of(final Object version) {
if (version.startsWith("v")) {
version = version.substring(1);
if (Objects.isNull(version)) {
throw new IllegalArgumentException("Invalid version, cannot parse null version");
}
String strVersion = version.toString();
if (strVersion.startsWith("v")) {
strVersion = strVersion.substring(1);
}
int qualifier = version.indexOf("-");
int qualifier = strVersion.indexOf("-");
final String[] versions = qualifier > 0 ?
version.substring(0, qualifier).split("\\.") :
version.split("\\.");
strVersion.substring(0, qualifier).split("\\.") :
strVersion.split("\\.");
try {
final int majorVersion = Integer.parseInt(versions[0]);
final int minorVersion = versions.length > 1 ? Integer.parseInt(versions[1]) : 0;
final int incrementalVersion = versions.length > 2 ? Integer.parseInt(versions[2]) : 0;
final Integer minorVersion = versions.length > 1 ? Integer.parseInt(versions[1]) : null;
final Integer incrementalVersion = versions.length > 2 ? Integer.parseInt(versions[2]) : null;
return new Version(
majorVersion,
minorVersion,
incrementalVersion,
qualifier > 0 ? version.substring(qualifier + 1) : null,
version
qualifier > 0 ? strVersion.substring(qualifier + 1) : null,
strVersion
);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid version, cannot parse '" + version + "'");
}
}
/**
* Static helper method for returning the most recent stable version for a current {@link Version}.
* Resolves the most appropriate stable version from a collection, based on a given input version.
* <p>
* The matching rules are:
* <ul>
* <li>If {@code from} specifies only a major version (e.g. {@code 1}), return the latest stable version
* with the same major (e.g. {@code 1.2.3}).</li>
* <li>If {@code from} specifies a major and minor version only (e.g. {@code 1.2}), return the latest
* stable version with the same major and minor (e.g. {@code 1.2.3}).</li>
* <li>If {@code from} specifies a full version with major, minor, and patch (e.g. {@code 1.2.2}),
* then only return it if it is exactly present (and stable) in {@code versions}.
* No "upgrade" is performed in this case.</li>
* <li>If no suitable version is found, returns {@code null}.</li>
* </ul>
*
* @param from the current version.
* @param versions the list of version.
*
* @return the last stable version.
* @param from the reference version (may specify only major, or major+minor, or major+minor+patch).
* @param versions the collection of candidate versions to resolve against.
* @return the best matching stable version, or {@code null} if none match.
*/
public static Version getStable(final Version from, final Collection<Version> versions) {
List<Version> compatibleVersions = versions.stream()
.filter(v -> v.majorVersion() == from.majorVersion() && v.minorVersion() == from.minorVersion())
.toList();
if (compatibleVersions.isEmpty()) return null;
return Version.getLatest(compatibleVersions);
// Case 1: "from" is only a major (e.g. 1)
if (from.hasOnlyMajor()) {
List<Version> sameMajor = versions.stream()
.filter(v -> v.majorVersion() == from.majorVersion())
.toList();
return sameMajor.isEmpty() ? null : Version.getLatest(sameMajor);
}
// Case 2: "from" is major+minor only (e.g. 1.2)
if (from.hasMajorAndMinorOnly()) {
List<Version> sameMinor = versions.stream()
.filter(v -> v.majorVersion() == from.majorVersion()
&& v.minorVersion() == from.minorVersion())
.toList();
return sameMinor.isEmpty() ? null : Version.getLatest(sameMinor);
}
// Case 3: "from" is full version (major+minor+patch)
if (versions.contains(from)) {
return from;
}
// No match
return null;
}
/**
@@ -123,8 +159,8 @@ public class Version implements Comparable<Version> {
}
private final int majorVersion;
private final int minorVersion;
private final int incrementalVersion;
private final Integer minorVersion;
private final Integer patchVersion;
private final Qualifier qualifier;
private final String originalVersion;
@@ -134,14 +170,14 @@ public class Version implements Comparable<Version> {
*
* @param majorVersion the major version (must be superior or equal to 0).
* @param minorVersion the minor version (must be superior or equal to 0).
* @param incrementalVersion the incremental version (must be superior or equal to 0).
* @param patchVersion the incremental version (must be superior or equal to 0).
* @param qualifier the qualifier.
*/
public Version(final int majorVersion,
final int minorVersion,
final int incrementalVersion,
final int patchVersion,
final String qualifier) {
this(majorVersion, minorVersion, incrementalVersion, qualifier, null);
this(majorVersion, minorVersion, patchVersion, qualifier, null);
}
/**
@@ -149,25 +185,25 @@ public class Version implements Comparable<Version> {
*
* @param majorVersion the major version (must be superior or equal to 0).
* @param minorVersion the minor version (must be superior or equal to 0).
* @param incrementalVersion the incremental version (must be superior or equal to 0).
* @param patchVersion the incremental version (must be superior or equal to 0).
* @param qualifier the qualifier.
* @param originalVersion the original string version.
*/
private Version(final int majorVersion,
final int minorVersion,
final int incrementalVersion,
private Version(final Integer majorVersion,
final Integer minorVersion,
final Integer patchVersion,
final String qualifier,
final String originalVersion) {
this.majorVersion = requirePositive(majorVersion, "major");
this.minorVersion = requirePositive(minorVersion, "minor");
this.incrementalVersion = requirePositive(incrementalVersion, "incremental");
this.patchVersion = requirePositive(patchVersion, "incremental");
this.qualifier = qualifier != null ? new Qualifier(qualifier) : null;
this.originalVersion = originalVersion;
}
private static int requirePositive(int version, final String message) {
if (version < 0) {
private static Integer requirePositive(Integer version, final String message) {
if (version != null && version < 0) {
throw new IllegalArgumentException(String.format("The '%s' version must super or equal to 0", message));
}
return version;
@@ -178,11 +214,11 @@ public class Version implements Comparable<Version> {
}
public int minorVersion() {
return minorVersion;
return minorVersion != null ? minorVersion : 0;
}
public int incrementalVersion() {
return incrementalVersion;
public int patchVersion() {
return patchVersion != null ? patchVersion : 0;
}
public Qualifier qualifier() {
@@ -197,9 +233,9 @@ public class Version implements Comparable<Version> {
if (this == o) return true;
if (!(o instanceof Version)) return false;
Version version = (Version) o;
return majorVersion == version.majorVersion &&
minorVersion == version.minorVersion &&
incrementalVersion == version.incrementalVersion &&
return Objects.equals(majorVersion,version.majorVersion) &&
Objects.equals(minorVersion, version.minorVersion) &&
Objects.equals(patchVersion,version.patchVersion) &&
Objects.equals(qualifier, version.qualifier);
}
@@ -208,7 +244,7 @@ public class Version implements Comparable<Version> {
*/
@Override
public int hashCode() {
return Objects.hash(majorVersion, minorVersion, incrementalVersion, qualifier);
return Objects.hash(majorVersion, minorVersion, patchVersion, qualifier);
}
/**
@@ -218,7 +254,7 @@ public class Version implements Comparable<Version> {
public String toString() {
if (originalVersion != null) return originalVersion;
String version = majorVersion + "." + minorVersion + "." + incrementalVersion;
String version = majorVersion + "." + minorVersion + "." + patchVersion;
return (qualifier != null) ? version +"-" + qualifier : version;
}
@@ -238,7 +274,7 @@ public class Version implements Comparable<Version> {
return compareMinor;
}
int compareIncremental = Integer.compare(that.incrementalVersion, this.incrementalVersion);
int compareIncremental = Integer.compare(that.patchVersion, this.patchVersion);
if (compareIncremental != 0) {
return compareIncremental;
}
@@ -253,6 +289,21 @@ public class Version implements Comparable<Version> {
return this.qualifier.compareTo(that.qualifier);
}
/**
* @return true if only major is specified (e.g. "1")
*/
private boolean hasOnlyMajor() {
return minorVersion == null && patchVersion == null;
}
/**
* @return true if major+minor are specified, but no patch (e.g. "1.2")
*/
private boolean hasMajorAndMinorOnly() {
return minorVersion != null && patchVersion == null;
}
/**
* Checks whether this version is before the given one.

View File

@@ -33,11 +33,13 @@ public class ExecutionsDataFilterValidator implements ConstraintValidator<Execut
}
});
executionsDataFilter.getWhere().forEach(filter -> {
if (filter.getField() == Executions.Fields.LABELS && filter.getLabelKey() == null) {
violations.add("Label filters must have a `labelKey`.");
}
});
if (executionsDataFilter.getWhere() != null) {
executionsDataFilter.getWhere().forEach(filter -> {
if (filter.getField() == Executions.Fields.LABELS && filter.getLabelKey() == null) {
violations.add("Label filters must have a `labelKey`.");
}
});
}
if (!violations.isEmpty()) {
context.disableDefaultConstraintViolation();

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
@@ -15,6 +16,7 @@ import io.kestra.core.models.tasks.*;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.DagTaskValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -176,6 +178,22 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
);
}
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
return FlowableUtils.resolveSequentialState(
execution,
childTasks,
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun,
runContext,
this.isAllowFailure(),
this.isAllowWarning()
);
}
public List<String> dagCheckNotExistTask(List<DagTask> taskDepends) {
List<String> dependenciesIds = taskDepends
.stream()

View File

@@ -163,15 +163,9 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
.toList();
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
if (childTasks.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return FlowableUtils.resolveState(
return FlowableUtils.resolveSequentialState(
execution,
childTasks,
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),

View File

@@ -127,14 +127,9 @@ public class EachSequential extends Sequential implements FlowableTask<VoidOutpu
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
.toList();
if (childTasks.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
return FlowableUtils.resolveState(
return FlowableUtils.resolveSequentialState(
execution,
childTasks,
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),

View File

@@ -245,15 +245,9 @@ public class ForEach extends Sequential implements FlowableTask<VoidOutput> {
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
.toList();
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
if (childTasks.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return FlowableUtils.resolveState(
return FlowableUtils.resolveSequentialState(
execution,
childTasks,
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),

View File

@@ -2,7 +2,9 @@ package io.kestra.plugin.core.flow;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.utils.ListUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
@@ -23,6 +25,7 @@ import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.GraphUtils;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
@@ -176,4 +179,20 @@ public class Parallel extends Task implements FlowableTask<VoidOutput> {
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
);
}
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
return FlowableUtils.resolveSequentialState(
execution,
childTasks,
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun,
runContext,
this.isAllowFailure(),
this.isAllowWarning()
);
}
}

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.AbstractGraph;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
@@ -23,6 +24,7 @@ import lombok.experimental.SuperBuilder;
import jakarta.validation.Valid;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
@SuperBuilder
@@ -113,6 +115,22 @@ public class Sequential extends Task implements FlowableTask<VoidOutput> {
return FlowableUtils.resolveTasks(this.getTasks(), parentTaskRun);
}
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
return FlowableUtils.resolveSequentialState(
execution,
childTasks,
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun,
runContext,
this.isAllowFailure(),
this.isAllowWarning()
);
}
@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveSequentialNexts(

View File

@@ -60,7 +60,15 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
public class Download extends AbstractHttp implements RunnableTask<Download.Output> {
@Schema(title = "Should the task fail when downloading an empty file.")
@Builder.Default
private final Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
private Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
@Schema(
title = "Name of the file inside the output.",
description = """
If not provided, the filename will be extracted from the `Content-Disposition` header.
If no `Content-Disposition` header, a name would be generated."""
)
private Property<String> saveAs;
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
@@ -111,20 +119,26 @@ public class Download extends AbstractHttp implements RunnableTask<Download.Outp
}
}
String filename = null;
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
filename = filenameFromHeader(runContext, contentDisposition);
}
if (filename != null) {
filename = URLEncoder.encode(filename, StandardCharsets.UTF_8);
String rFilename = runContext.render(this.saveAs).as(String.class).orElse(null);
if (rFilename == null) {
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
rFilename = filenameFromHeader(runContext, contentDisposition);
if (rFilename != null) {
URLEncoder.encode(rFilename, StandardCharsets.UTF_8);
rFilename = rFilename.replace(' ', '+');
// brackets are IPv6 reserved characters
rFilename = rFilename.replace("[", "%5B");
rFilename = rFilename.replace("]", "%5D");
}
}
}
logger.debug("File '{}' downloaded with size '{}'", from, size);
return Output.builder()
.code(response.getStatus().getCode())
.uri(runContext.storage().putFile(tempFile, filename))
.uri(runContext.storage().putFile(tempFile, rFilename))
.headers(response.getHeaders().map())
.length(size.get())
.build();

View File

@@ -154,6 +154,15 @@ public abstract class AbstractExecutionRepositoryTest {
).trigger(executionTrigger).build());
}
// add a NORMAL kind execution, it should be fetched correctly
executionRepository.save(builder(
State.Type.SUCCESS,
null
)
.trigger(executionTrigger)
.kind(ExecutionKind.NORMAL)
.build());
// add a test execution, this should be ignored in search & statistics
executionRepository.save(builder(
State.Type.SUCCESS,
@@ -176,16 +185,16 @@ public abstract class AbstractExecutionRepositoryTest {
static Stream<Arguments> filterCombinations() {
return Stream.of(
Arguments.of(QueryFilter.builder().field(Field.QUERY).value("unittest").operation(Op.EQUALS).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.unittest").operation(Op.EQUALS).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.QUERY).value("unittest").operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.unittest").operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build(), 1),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(FLOW).operation(Op.EQUALS).build(), 15),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(FLOW).operation(Op.EQUALS).build(), 16),
Arguments.of(QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.STATE).value(Type.RUNNING).operation(Op.EQUALS).build(), 5),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(), 28),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(), 28)
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(), 29),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(), 29)
);
}
@@ -211,7 +220,7 @@ public abstract class AbstractExecutionRepositoryTest {
inject();
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null);
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.getTotal()).isEqualTo(29L);
assertThat(executions.size()).isEqualTo(10);
List<QueryFilter> filters = List.of(QueryFilter.builder()
@@ -275,7 +284,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value("io.kestra")
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.getTotal()).isEqualTo(29L);
}
@Test
@@ -291,7 +300,7 @@ public abstract class AbstractExecutionRepositoryTest {
.value(executionTriggerId)
.build());
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.getTotal()).isEqualTo(29L);
assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
filters = List.of(QueryFilter.builder()
@@ -301,7 +310,7 @@ public abstract class AbstractExecutionRepositoryTest {
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.getTotal()).isEqualTo(29L);
assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
@@ -312,12 +321,12 @@ public abstract class AbstractExecutionRepositoryTest {
.build());
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters );
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.getTotal()).isEqualTo(29L);
assertThat(executions.size()).isEqualTo(10);
assertThat(executions.getFirst().getTrigger()).isNull();
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null);
assertThat(executions.getTotal()).isEqualTo(56L);
assertThat(executions.getTotal()).isEqualTo(58L);
}
@Test
@@ -325,7 +334,7 @@ public abstract class AbstractExecutionRepositoryTest {
inject();
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), MAIN_TENANT, null);
assertThat(executions.getTotal()).isEqualTo(28L);
assertThat(executions.getTotal()).isEqualTo(29L);
assertThat(executions.size()).isEqualTo(10);
var filters = List.of(QueryFilter.builder()
@@ -342,7 +351,7 @@ public abstract class AbstractExecutionRepositoryTest {
inject();
ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, null);
assertThat(taskRuns.getTotal()).isEqualTo(74L);
assertThat(taskRuns.getTotal()).isEqualTo(77L);
assertThat(taskRuns.size()).isEqualTo(10);
var filters = List.of(QueryFilter.builder()
@@ -732,7 +741,7 @@ public abstract class AbstractExecutionRepositoryTest {
inject();
List<Execution> executions = executionRepository.findAllAsync(MAIN_TENANT).collectList().block();
assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions
assertThat(executions).hasSize(30); // used by the backup so it contains TEST executions
}
@Test

View File

@@ -249,31 +249,33 @@ public abstract class AbstractLogRepositoryTest {
for (int i = 0; i < 20; i++) {
logRepository.save(builder.build());
}
// normal kind should also be retrieved
logRepository.save(builder.executionKind(ExecutionKind.NORMAL).build());
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(1, 50));
assertThat(find.size()).isEqualTo(50);
assertThat(find.getTotal()).isEqualTo(101L);
assertThat(find.getTotal()).isEqualTo(102L);
find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(3, 50));
assertThat(find.size()).isEqualTo(1);
assertThat(find.getTotal()).isEqualTo(101L);
assertThat(find.size()).isEqualTo(2);
assertThat(find.getTotal()).isEqualTo(102L);
find = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50));
assertThat(find.size()).isEqualTo(21);
assertThat(find.getTotal()).isEqualTo(21L);
assertThat(find.size()).isEqualTo(22);
assertThat(find.getTotal()).isEqualTo(22L);
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10));
assertThat(find.size()).isEqualTo(10);
assertThat(find.getTotal()).isEqualTo(21L);
assertThat(find.getTotal()).isEqualTo(22L);
find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10));
assertThat(find.size()).isEqualTo(10);
assertThat(find.getTotal()).isEqualTo(21L);
assertThat(find.getTotal()).isEqualTo(22L);
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));

View File

@@ -31,20 +31,22 @@ public abstract class AbstractMetricRepositoryTest {
TaskRun taskRun1 = taskRun(executionId, "task");
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST);
MetricEntry normalCounter = MetricEntry.of(taskRun1, counter("normal"), ExecutionKind.NORMAL);
TaskRun taskRun2 = taskRun(executionId, "task");
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
metricRepository.save(counter);
metricRepository.save(testCounter); // should only be retrieved by execution id
metricRepository.save(normalCounter);
metricRepository.save(timer);
List<MetricEntry> results = metricRepository.findByExecutionId(null, executionId, Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3);
assertThat(results.size()).isEqualTo(4);
results = metricRepository.findByExecutionIdAndTaskId(null, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(3);
assertThat(results.size()).isEqualTo(4);
results = metricRepository.findByExecutionIdAndTaskRunId(null, executionId, taskRun1.getId(), Pageable.from(1, 10));
assertThat(results.size()).isEqualTo(2);
assertThat(results.size()).isEqualTo(3);
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
null,

View File

@@ -33,6 +33,7 @@ import org.junitpioneer.jupiter.RetryingTest;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
// must be per-class to allow calling once init() which took a lot of time
public abstract class AbstractRunnerTest {
public static final String TENANT_1 = "tenant1";
@Inject
protected RunnerUtils runnerUtils;
@@ -436,9 +437,9 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
}
@Test
@@ -597,4 +598,4 @@ public abstract class AbstractRunnerTest {
public void shouldCallTasksAfterListener(Execution execution) {
afterExecutionTestCase.shouldCallTasksAfterListener(execution);
}
}
}

View File

@@ -330,7 +330,7 @@ class ExecutionServiceTest {
assertThat(restart.findTaskRunByTaskIdAndValue("1_each", List.of()).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getHistories()).hasSize(4);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getAttempts()).isNull();
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
restart = executionService.markAs(execution, flow, execution.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getId(), State.Type.FAILED);
@@ -441,6 +441,7 @@ class ExecutionServiceTest {
assertThat(killed.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(killed.getState().getHistories()).hasSize(5);
}

View File

@@ -103,28 +103,28 @@ class FilesServiceTest {
var runContext = runContextFactory.of();
Path fileWithSpace = tempDir.resolve("with space.txt");
Path fileWithUnicode = tempDir.resolve("สวัสดี.txt");
Path fileWithUnicode = tempDir.resolve("สวัสดี&.txt");
Files.writeString(fileWithSpace, "content");
Files.writeString(fileWithUnicode, "content");
Path targetFileWithSpace = runContext.workingDir().path().resolve("with space.txt");
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี.txt");
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี&.txt");
Files.copy(fileWithSpace, targetFileWithSpace);
Files.copy(fileWithUnicode, targetFileWithUnicode);
Map<String, URI> outputFiles = FilesService.outputFiles(
runContext,
List.of("with space.txt", "สวัสดี.txt")
List.of("with space.txt", "สวัสดี&.txt")
);
assertThat(outputFiles).hasSize(2);
assertThat(outputFiles).containsKey("with space.txt");
assertThat(outputFiles).containsKey("สวัสดี.txt");
assertThat(outputFiles).containsKey("สวัสดี&.txt");
assertThat(runContext.storage().getFile(outputFiles.get("with space.txt"))).isNotNull();
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี.txt"))).isNotNull();
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี&.txt"))).isNotNull();
}
private URI createFile() throws IOException {

View File

@@ -2,14 +2,12 @@ package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.State.Type;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.reporter.model.Count;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
@@ -31,16 +29,18 @@ import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton
public class FlowConcurrencyCaseTest {
public static final String NAMESPACE = "io.kestra.tests";
@Inject
private StorageInterface storageInterface;
@@ -64,50 +64,34 @@ public class FlowConcurrencyCaseTest {
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
public void flowConcurrencyCancel() throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
try {
List<Execution> shouldFailExecutions = List.of(
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel"),
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel")
);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
CountDownLatch latch1 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(Type.CANCELLED::equals);
} finally {
runnerUtils.killExecution(execution1);
}
}
public void flowConcurrencyFail() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-fail");
public void flowConcurrencyFail() throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
try {
List<Execution> shouldFailExecutions = List.of(
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail"),
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail")
);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.FAILED);
CountDownLatch latch1 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(State.Type.FAILED::equals);
} finally {
runnerUtils.killExecution(execution1);
}
}
public void flowConcurrencyQueue() throws TimeoutException, QueueException, InterruptedException {
@@ -265,28 +249,25 @@ public class FlowConcurrencyCaseTest {
assertThat(secondExecutionResult.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.CANCELLED);
}
public void flowConcurrencyWithForEachItem() throws TimeoutException, QueueException, InterruptedException, URISyntaxException, IOException {
URI file = storageUpload();
public void flowConcurrencyWithForEachItem(String tenantId) throws QueueException, URISyntaxException, IOException, TimeoutException {
URI file = storageUpload(tenantId);
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution forEachItem = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-for-each-item", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
Execution forEachItem = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-for-each-item", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
assertThat(forEachItem.getState().getCurrent()).isEqualTo(Type.RUNNING);
Set<String> executionIds = new HashSet<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if ("flow-concurrency-queue".equals(e.getLeft().getFlowId()) && e.getLeft().getState().isRunning()) {
executionIds.add(e.getLeft().getId());
}
});
// wait a little to be sure there are not too many executions started
Thread.sleep(500);
assertThat(executionIds).hasSize(1);
receive.blockLast();
Execution terminated = runnerUtils.awaitExecution(e -> e.getId().equals(forEachItem.getId()) && e.getState().isTerminated(), () -> {}, Duration.ofSeconds(10));
Execution terminated = runnerUtils.awaitExecution(e -> e.getState().isTerminated(), Duration.ofSeconds(60));
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
List<Execution> executions = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-queue");
assertThat(executions).extracting(e -> e.getState().getCurrent()).containsOnly(Type.SUCCESS);
assertThat(executions.stream()
.map(e -> e.getState().getHistories())
.flatMap(List::stream)
.map(State.History::getState)
.toList()).contains(Type.QUEUED);
}
public void flowConcurrencyQueueRestarted() throws Exception {
@@ -445,12 +426,16 @@ public class FlowConcurrencyCaseTest {
}
private URI storageUpload() throws URISyntaxException, IOException {
return storageUpload(MAIN_TENANT);
}
private URI storageUpload(String tenantId) throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), content());
return storageInterface.put(
MAIN_TENANT,
tenantId,
null,
new URI("/file/storage/file.txt"),
new FileInputStream(tempFile)

View File

@@ -83,24 +83,37 @@ class RunContextPropertyTest {
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
}
@Test
void asShouldReturnCachedRenderedProperty() throws IllegalVariableEvaluationException {
var runContext = runContextFactory.of();
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value1"));
}
@Test
void asShouldNotReturnCachedRenderedPropertyWithSkipCache() throws IllegalVariableEvaluationException {
var runContext = runContextFactory.of();
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
assertThat(runContextProperty.skipCache().as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
var skippedCache = runContextProperty.skipCache();
assertThat(skippedCache.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
// assure skipCache is preserved across calls
assertThat(skippedCache.as(String.class, Map.of("variable", "value3"))).isEqualTo(Optional.of("value3"));
}
@Test
void asShouldNotReturnCachedRenderedPropertyWithOfExpression() throws IllegalVariableEvaluationException {
var runContext = runContextFactory.of();
var runContextProperty = new RunContextProperty<String>(Property.ofExpression("{{ variable }}"), runContext);
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
}
}

View File

@@ -20,7 +20,9 @@ class TaskWithRunIfTest {
assertThat(execution.getTaskRunList()).hasSize(5);
assertThat(execution.findTaskRunsByTaskId("executed").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.findTaskRunsByTaskId("notexecuted").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("notexecuted").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("notexecutedflowable").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("notexecutedflowable").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("willfailedtheflow").getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
}
@@ -31,6 +33,7 @@ class TaskWithRunIfTest {
assertThat(execution.getTaskRunList()).hasSize(3);
assertThat(execution.findTaskRunsByTaskId("log_orders").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
}
@Test
@@ -39,7 +42,9 @@ class TaskWithRunIfTest {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getTaskRunList()).hasSize(5);
assertThat(execution.findTaskRunsByTaskId("skipSetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("skipSetVariables").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("skipUnsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("skipUnsetVariables").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("unsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.findTaskRunsByTaskId("setVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getVariables()).containsEntry("list", List.of(42));

View File

@@ -109,33 +109,6 @@ public class FileSizeFunctionTest {
assertThat(size).isEqualTo(FILE_SIZE);
}
@Test
void shouldThrowIllegalArgumentException_givenTrigger_andParentExecution_andMissingNamespace() throws IOException {
String executionId = IdUtils.create();
URI internalStorageURI = getInternalStorageURI(executionId);
URI internalStorageFile = getInternalStorageFile(internalStorageURI);
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "subflow",
"namespace", NAMESPACE,
"tenantId", MAIN_TENANT),
"execution", Map.of("id", IdUtils.create()),
"trigger", Map.of(
"flowId", FLOW,
"executionId", executionId,
"tenantId", MAIN_TENANT
)
);
Exception ex = assertThrows(
IllegalArgumentException.class,
() -> variableRenderer.render("{{ fileSize('" + internalStorageFile + "') }}", variables)
);
assertTrue(ex.getMessage().startsWith("Unable to read the file"), "Exception message doesn't match expected one");
}
@Test
void returnsCorrectSize_givenUri_andCurrentExecution() throws IOException, IllegalVariableEvaluationException {
String executionId = IdUtils.create();

View File

@@ -256,6 +256,27 @@ class ReadFileFunctionTest {
assertThat(variableRenderer.render("{{ read(nsfile) }}", variables)).isEqualTo("Hello World");
}
@Test
void shouldReadChildFileEvenIfTrigger() throws IOException, IllegalVariableEvaluationException {
String namespace = "my.namespace";
String flowId = "flow";
String executionId = IdUtils.create();
URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "flow",
"namespace", "notme",
"tenantId", MAIN_TENANT),
"execution", Map.of("id", "notme"),
"trigger", Map.of("namespace", "notme", "flowId", "parent", "executionId", "parent")
);
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
assertThat(render).isEqualTo("Hello from a task output");
}
private URI createFile() throws IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), "Hello World".getBytes());

View File

@@ -5,8 +5,17 @@ import org.junit.jupiter.api.Test;
import java.util.List;
class VersionTest {
import static org.assertj.core.api.Assertions.assertThatObject;
import static org.hamcrest.MatcherAssert.assertThat;
class VersionTest {
@Test
void shouldCreateVersionFromIntegerGivenMajorVersion() {
Version version = Version.of(1);
Assertions.assertEquals(1, version.majorVersion());
}
@Test
void shouldCreateVersionFromStringGivenMajorVersion() {
Version version = Version.of("1");
@@ -21,27 +30,27 @@ class VersionTest {
}
@Test
void shouldCreateVersionFromStringGivenMajorMinorIncrementVersion() {
void shouldCreateVersionFromStringGivenMajorMinorPatchVersion() {
Version version = Version.of("1.2.3");
Assertions.assertEquals(1, version.majorVersion());
Assertions.assertEquals(2, version.minorVersion());
Assertions.assertEquals(3, version.incrementalVersion());
Assertions.assertEquals(3, version.patchVersion());
}
@Test
void shouldCreateVersionFromPrefixedStringGivenMajorMinorIncrementVersion() {
void shouldCreateVersionFromPrefixedStringGivenMajorMinorPatchVersion() {
Version version = Version.of("v1.2.3");
Assertions.assertEquals(1, version.majorVersion());
Assertions.assertEquals(2, version.minorVersion());
Assertions.assertEquals(3, version.incrementalVersion());
Assertions.assertEquals(3, version.patchVersion());
}
@Test
void shouldCreateVersionFromStringGivenMajorMinorIncrementAndQualifierVersion() {
void shouldCreateVersionFromStringGivenMajorMinorPatchAndQualifierVersion() {
Version version = Version.of("1.2.3-SNAPSHOT");
Assertions.assertEquals(1, version.majorVersion());
Assertions.assertEquals(2, version.minorVersion());
Assertions.assertEquals(3, version.incrementalVersion());
Assertions.assertEquals(3, version.patchVersion());
Assertions.assertEquals("SNAPSHOT", version.qualifier().toString());
}
@@ -50,7 +59,7 @@ class VersionTest {
Version version = Version.of("1.2.3-RC0-SNAPSHOT");
Assertions.assertEquals(1, version.majorVersion());
Assertions.assertEquals(2, version.minorVersion());
Assertions.assertEquals(3, version.incrementalVersion());
Assertions.assertEquals(3, version.patchVersion());
Assertions.assertEquals("RC0-SNAPSHOT", version.qualifier().toString());
}
@@ -76,13 +85,13 @@ class VersionTest {
}
@Test
void shouldGetLatestVersionGivenMajorMinorIncrementalVersions() {
void shouldGetLatestVersionGivenMajorMinorPatchVersions() {
Version result = Version.getLatest(Version.of("1.0.9"), Version.of("1.0.10"), Version.of("1.0.11"));
Assertions.assertEquals(Version.of("1.0.11"), result);
}
@Test
public void shouldGetOldestVersionGivenMajorMinorIncrementalVersions() {
public void shouldGetOldestVersionGivenMajorMinorPatchVersions() {
Version result = Version.getOldest(Version.of("1.0.9"), Version.of("1.0.10"), Version.of("1.0.11"));
Assertions.assertEquals(Version.of("1.0.9"), result);
}
@@ -135,14 +144,50 @@ class VersionTest {
}
@Test
public void shouldGetStableVersionGivenMajorMinorVersions() {
Version result = Version.getStable(Version.of("1.2.0"), List.of(Version.of("1.2.1"), Version.of("1.2.2"), Version.of("0.99.0")));
Assertions.assertEquals(Version.of("1.2.2"), result);
public void shouldGetStableVersionGivenMajorMinorPatchVersion() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("1.2.1"), versions)).isEqualTo(Version.of("1.2.1"));
assertThatObject(Version.getStable(Version.of("1.2.0"), versions)).isNull();
assertThatObject(Version.getStable(Version.of("1.2.4"), versions)).isNull();
}
@Test
public void shouldGetStableGivenMajorAndMinorVersionOnly() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("1.2"), versions)).isEqualTo(Version.of("1.2.3"));
}
@Test
public void shouldGetStableGivenMajorVersionOnly() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("1"), versions)).isEqualTo(Version.of("1.2.3"));
}
@Test
public void shouldGetNullForStableVersionGivenNoCompatibleVersions() {
Version result = Version.getStable(Version.of("1.2.0"), List.of(Version.of("1.3.0"), Version.of("2.0.0"), Version.of("0.99.0")));
Assertions.assertNull(result);
public void shouldGetNullForStableGivenMajorAndMinorVersionOnly() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("2.0"), versions)).isNull();
assertThatObject(Version.getStable(Version.of("0.1"), versions)).isNull();
}
@Test
public void shouldGetNullForStableGivenMajorVersionOnly() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("2"), versions)).isNull();
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.RunnerUtils;
@@ -11,6 +12,7 @@ import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@@ -28,11 +30,15 @@ class IfTest {
void ifTruthy() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120));
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("if").getFirst().getAttempts();
assertThat(execution.getTaskRunList()).hasSize(2);
assertThat(execution.findTaskRunsByTaskId("when-true").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(flowableAttempts).isNotNull();
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", "true") , Duration.ofSeconds(120));

View File

@@ -5,22 +5,32 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import java.util.List;
@KestraTest(startRunner = true)
class SequentialTest {
@Test
@ExecuteFlow("flows/valids/sequential.yaml")
void sequential(Execution execution) {
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("1-seq").getFirst().getAttempts();
assertThat(execution.getTaskRunList()).hasSize(11);
assertThat(flowableAttempts).isNotNull();
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
@Test
@ExecuteFlow("flows/valids/sequential-with-global-errors.yaml")
void sequentialWithGlobalErrors(Execution execution) {
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("parent-seq").getFirst().getAttempts();
assertThat(execution.getTaskRunList()).hasSize(6);
assertThat(flowableAttempts).isNotNull();
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
}
@@ -30,4 +40,11 @@ class SequentialTest {
assertThat(execution.getTaskRunList()).hasSize(6);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
}
@Test
@ExecuteFlow("flows/valids/sequential-with-disabled.yaml")
void sequentialWithDisabled(Execution execution) {
assertThat(execution.getTaskRunList()).hasSize(2);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
}

View File

@@ -14,6 +14,8 @@ import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -87,4 +89,28 @@ class SubflowRunnerTest {
assertThat(childExecution.get().getTaskRunList()).hasSize(1);
closing.run();
}
@Test
@LoadFlows({"flows/valids/subflow-parent-retry.yaml", "flows/valids/subflow-to-retry.yaml"})
void subflowOutputWithWait() throws QueueException, TimeoutException, InterruptedException {
List<Execution> childExecution = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(4);
Runnable closing = executionQueue.receive(either -> {
if (either.isLeft() && either.getLeft().getFlowId().equals("subflow-to-retry") && either.getLeft().getState().isTerminated()) {
childExecution.add(either.getLeft());
countDownLatch.countDown();
}
});
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-retry");
assertThat(parentExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(parentExecution.getTaskRunList()).hasSize(5);
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
// we should have 4 executions, two in SUCCESS and two in FAILED
assertThat(childExecution).hasSize(4);
assertThat(childExecution.stream().filter(e -> e.getState().getCurrent() == State.Type.SUCCESS).count()).isEqualTo(2);
assertThat(childExecution.stream().filter(e -> e.getState().getCurrent() == State.Type.FAILED).count()).isEqualTo(2);
closing.run();
}
}

View File

@@ -156,6 +156,26 @@ class DownloadTest {
assertThat(output.getUri().toString()).endsWith("filename.jpg");
}
@Test
void fileNameShouldOverrideContentDisposition() throws Exception {
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
embeddedServer.start();
Download task = Download.builder()
.id(DownloadTest.class.getSimpleName())
.type(DownloadTest.class.getName())
.uri(Property.ofValue(embeddedServer.getURI() + "/content-disposition"))
.saveAs(Property.ofValue("hardcoded-filename.jpg"))
.build();
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
Download.Output output = task.run(runContext);
assertThat(output.getUri().toString()).endsWith("hardcoded-filename.jpg");
}
@Test
void contentDispositionWithPath() throws Exception {
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
@@ -232,10 +252,27 @@ class DownloadTest {
Download.Output output = task.run(runContext);
assertThat(output.getUri().toString()).doesNotContain("/secure-path/");
assertThat(output.getUri().toString()).endsWith("file.with+spaces.txt");
}
@Test
void contentDispositionWithBrackets() throws Exception {
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
embeddedServer.start();
Download task = Download.builder()
.id(DownloadTest.class.getSimpleName())
.type(DownloadTest.class.getName())
.uri(Property.ofValue(embeddedServer.getURI() + "/content-disposition-with-brackets"))
.build();
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
Download.Output output = task.run(runContext);
assertThat(output.getUri().toString()).endsWith("file.with%5B%5Dbrackets.txt");
}
@Controller()
public static class SlackWebController {
@Get("500")
@@ -282,5 +319,10 @@ class DownloadTest {
return HttpResponse.ok("Hello World".getBytes())
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"file.with spaces.txt\"");
}
@Get("content-disposition-with-brackets")
public HttpResponse<byte[]> contentDispositionWithBrackets() {
return HttpResponse.ok("Hello World".getBytes())
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"file.with[]brackets.txt\"");
}
}
}

View File

@@ -8,4 +8,4 @@ concurrency:
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT2S
duration: PT10S

View File

@@ -8,4 +8,4 @@ concurrency:
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT2S
duration: PT10S

View File

@@ -0,0 +1,14 @@
id: sequential-with-disabled
namespace: io.kestra.tests
tasks:
- id: Sequential
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
disabled: true
- id: log
type: io.kestra.plugin.core.log.Log
message: Hello World!

View File

@@ -0,0 +1,33 @@
id: subflow-parent-retry
namespace: io.kestra.tests
tasks:
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: seq1
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: subflow1
type: io.kestra.plugin.core.flow.Subflow
flowId: subflow-to-retry
namespace: io.kestra.tests
inputs:
counter: "{{ taskrun.attemptsCount }}"
retry:
type: constant
maxAttempts: 3
interval: PT1S
- id: seq2
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: subflow2
type: io.kestra.plugin.core.flow.Subflow
flowId: subflow-to-retry
namespace: io.kestra.tests
inputs:
counter: "{{ taskrun.attemptsCount }}"
retry:
type: constant
maxAttempts: 3
interval: PT1S

View File

@@ -0,0 +1,14 @@
id: subflow-to-retry
namespace: io.kestra.tests
inputs:
- id: counter
type: INT
tasks:
- id: fail
type: io.kestra.plugin.core.execution.Fail
runIf: "{{inputs.counter < 1}}"
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀

View File

@@ -6,9 +6,11 @@ services:
MYSQL_USER: kestra
MYSQL_PASSWORD: k3str4
MYSQL_ROOT_PASSWORD: "p4ssw0rd"
command:
- --log-bin-trust-function-creators=1
- --sort-buffer-size=10485760
entrypoint: |
sh -c "
echo \"CREATE DATABASE IF NOT EXISTS kestra_unit_webserver;GRANT ALL PRIVILEGES ON kestra_unit_webserver.* TO 'kestra'@'%' WITH GRANT OPTION;\" > /docker-entrypoint-initdb.d/init.sql;
/usr/local/bin/docker-entrypoint.sh --log-bin-trust-function-creators=1 --innodb_ft_min_token_size=1 --ft_min_word_len=1 --sort-buffer-size=10485760
"
ports:
- 3306:3306
restart: on-failure
@@ -34,4 +36,4 @@ services:
# - "4318:4318" # OTLP HTTP receiver
# - "14250:14250" # Receive from external otel-collector, optional
# environment:
# - COLLECTOR_OTLP_ENABLED=true
# - COLLECTOR_OTLP_ENABLED=true

View File

@@ -1,28 +0,0 @@
configurations {
implementation.extendsFrom(micronaut)
}
dependencies {
testImplementation project(':tests')
testImplementation("com.microsoft.playwright:playwright")
}
/**********************************************************************************************************************\
* ./gradlew playwright
**********************************************************************************************************************/
tasks.register('playwright', JavaExec) {
classpath sourceSets.test.runtimeClasspath
mainClass = 'com.microsoft.playwright.CLI'
}
/**********************************************************************************************************************\
* Test
**********************************************************************************************************************/
test {
useJUnitPlatform {
if (project.hasProperty('tags')) {
includeTags project.getProperty('tags').split(',')
}
// Setting the number of parallel forks
maxParallelForks = Runtime.runtime.availableProcessors()
}
}

View File

@@ -127,7 +127,7 @@ public class ExecutorService {
case CANCEL ->
executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
.withConcurrencyState(ExecutionRunning.ConcurrencyState.CANCELLED);
case FAIL -> {
var failedExecution = executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded"));
try {
@@ -137,7 +137,7 @@ public class ExecutorService {
}
yield executionRunning
.withExecution(failedExecution.getExecution())
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
.withConcurrencyState(ExecutionRunning.ConcurrencyState.FAILED);
}
};
@@ -247,7 +247,7 @@ public class ExecutorService {
// first find the normal ended child tasks and send result
Optional<State.Type> state;
try {
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
} catch (Exception e) {
// This will lead to the next task being still executed, but at least Kestra will not crash.
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
@@ -268,9 +268,17 @@ public class ExecutorService {
Output outputs = flowableParent.outputs(runContext);
Map<String, Object> outputMap = MapUtils.merge(workerTaskResult.getTaskRun().getOutputs(), outputs == null ? null : outputs.toMap());
Variables variables = variablesService.of(StorageContext.forTask(workerTaskResult.getTaskRun()), outputMap);
// flowable attempt state transition to terminated
List<TaskRunAttempt> attempts = Optional.ofNullable(parentTaskRun.getAttempts())
.map(ArrayList::new)
.orElseGet(ArrayList::new);
State.Type endedState = endedTask.get().getTaskRun().getState().getCurrent();
TaskRunAttempt updated = attempts.getLast().withState(endedState);
attempts.set( attempts.size() - 1, updated);
return Optional.of(new WorkerTaskResult(workerTaskResult
.getTaskRun()
.withOutputs(variables)
.withAttempts(attempts)
));
} catch (Exception e) {
runContext.logger().error("Unable to resolve outputs from the Flowable task: {}", e.getMessage(), e);
@@ -320,7 +328,6 @@ public class ExecutorService {
private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun) throws InternalException {
Task parent = executor.getFlow().findTaskByTaskId(parentTaskRun.getTaskId());
if (parent instanceof FlowableTask<?> flowableParent) {
// Count the number of flowable tasks executions, some flowable are being called multiple times,
// so this is not exactly the number of flowable taskruns but the number of times they are executed.
@@ -375,6 +382,7 @@ public class ExecutorService {
Output outputs = flowableTask.outputs(runContext);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
taskRun = taskRun.withOutputs(variables);
} catch (Exception e) {
runContext.logger().warn("Unable to save output on taskRun '{}'", taskRun, e);
}
@@ -995,7 +1003,7 @@ public class ExecutorService {
executor.withExecution(
executor
.getExecution()
.withTaskRun(executableTaskRun.withState(State.Type.SKIPPED)),
.withTaskRun(executableTaskRun.withState(State.Type.SKIPPED).addAttempt(TaskRunAttempt.builder().state(new State().withState(State.Type.SKIPPED)).build())),
"handleExecutableTaskSkipped"
);
return false;
@@ -1077,7 +1085,7 @@ public class ExecutorService {
executor.withExecution(
executor
.getExecution()
.withTaskRun(workerTask.getTaskRun().withState(State.Type.SKIPPED)),
.withTaskRun(workerTask.getTaskRun().withState(State.Type.SKIPPED).addAttempt(TaskRunAttempt.builder().state(new State().withState(State.Type.SKIPPED)).build())),
"handleExecutionUpdatingTaskSkipped"
);
return false;

View File

@@ -50,16 +50,147 @@ public class FlowTriggerService {
.map(io.kestra.plugin.core.trigger.Flow.class::cast);
}
public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<? extends Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
/**
* This method computes executions to trigger from flow triggers from a given execution.
* It only computes those depending on standard (non-multiple / non-preconditions) conditions, so it must be used
* in conjunction with {@link #computeExecutionsFromFlowTriggerPreconditions(Execution, Flow, MultipleConditionStorageInterface)}.
*/
public List<Execution> computeExecutionsFromFlowTriggerConditions(Execution execution, Flow flow) {
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
.stream()
// we must filter on no multiple conditions and no preconditions to avoid evaluating two times triggers that have standard conditions and multiple conditions
.filter(it -> it.getTrigger().getPreconditions() == null && ListUtils.emptyOnNull(it.getTrigger().getConditions()).stream().noneMatch(MultipleCondition.class::isInstance))
.toList();
// short-circuit empty triggers to evaluate
if (flowWithFlowTriggers.isEmpty()) {
return Collections.emptyList();
}
// compute all executions to create from flow triggers without taken into account multiple conditions
return flowWithFlowTriggers.stream()
.map(f -> f.getTrigger().evaluate(
Optional.empty(),
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
}
/**
* This method computes executions to trigger from flow triggers from a given execution.
* It only computes those depending on multiple conditions and preconditions, so it must be used
* in conjunction with {@link #computeExecutionsFromFlowTriggerConditions(Execution, Flow)}.
*/
public List<Execution> computeExecutionsFromFlowTriggerPreconditions(Execution execution, Flow flow, MultipleConditionStorageInterface multipleConditionStorage) {
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
.stream()
// we must filter on multiple conditions or preconditions to avoid evaluating two times triggers that only have standard conditions
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getPreconditions() != null || ListUtils.emptyOnNull(flowWithFlowTrigger.getTrigger().getConditions()).stream().anyMatch(MultipleCondition.class::isInstance))
.toList();
// short-circuit empty triggers to evaluate
if (flowWithFlowTriggers.isEmpty()) {
return Collections.emptyList();
}
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = flowWithFlowTriggers.stream()
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
)
// avoid evaluating expired windows (for ex for daily time window or deadline)
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
.toList();
// evaluate multiple conditions
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// persist results
multipleConditionStorage.save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = flowWithFlowTriggers.stream()
// will evaluate conditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage
)
)
// will evaluate preconditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger().getPreconditions(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage
)
)
.map(f -> f.getTrigger().evaluate(
Optional.of(multipleConditionStorage),
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.entrySet().stream()
.map(e -> Map.entry(
e.getKey().getMultipleCondition(),
e.getValue()
))
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
)
.map(Map.Entry::getValue),
multipleConditionStorage.expired(execution.getTenantId()).stream()
).forEach(multipleConditionStorage::delete);
return executions;
}
private List<FlowWithFlowTrigger> computeFlowTriggers(Execution execution, Flow flow) {
if (
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// filter out Test Executions
.filter(flow -> execution.getKind() == null)
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
!flowService.removeUnwanted(flow, execution) ||
// filter out Test Executions
execution.getKind() != null ||
// ensure flow & triggers are enabled
flow.isDisabled() || flow instanceof FlowWithException ||
flow.getTriggers() == null || flow.getTriggers().isEmpty()) {
return Collections.emptyList();
}
return flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))
// filter on the execution state the flow listen to
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getStates().contains(execution.getState().getCurrent()))
// validate flow triggers conditions excluding multiple conditions
@@ -74,96 +205,6 @@ public class FlowTriggerService {
execution
)
)).toList();
// short-circuit empty triggers to evaluate
if (validTriggersBeforeMultipleConditionEval.isEmpty()) {
return Collections.emptyList();
}
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = null;
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
)
// avoid evaluating expired windows (for ex for daily time window or deadline)
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
.toList();
// evaluate multiple conditions
multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// persist results
multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
}
// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = validTriggersBeforeMultipleConditionEval.stream()
// will evaluate conditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
)
// will evaluate preconditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger().getPreconditions(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
)
.map(f -> f.getTrigger().evaluate(
multipleConditionStorage,
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
if (multipleConditionStorage.isPresent()) {
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.entrySet().stream()
.map(e -> Map.entry(
e.getKey().getMultipleCondition(),
e.getValue()
))
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
)
.map(Map.Entry::getValue),
multipleConditionStorage.get().expired(execution.getTenantId()).stream()
).forEach(multipleConditionStorage.get()::delete);
}
return executions;
}
private Stream<MultipleCondition> flowTriggerMultipleConditions(FlowWithFlowTrigger flowWithFlowTrigger) {

View File

@@ -26,8 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class FlowTriggerServiceTest {
public static final List<Label> EMPTY_LABELS = List.of();
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
@Inject
private TestRunContextFactory runContextFactory;
@Inject
@@ -56,10 +55,9 @@ class FlowTriggerServiceTest {
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
flowWithFlowTrigger
);
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
@@ -81,10 +79,9 @@ class FlowTriggerServiceTest {
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
flowWithFlowTrigger
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
@@ -109,10 +106,9 @@ class FlowTriggerServiceTest {
.kind(ExecutionKind.TEST)
.build();
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecutionComingFromATest,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
flowWithFlowTrigger
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);

View File

@@ -1,4 +1,4 @@
version=1.0.8
version=1.0.16
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
org.gradle.parallel=true

View File

@@ -1,8 +1,10 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.ScheduleContextInterface;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.jdbc.runner.JdbcSchedulerContext;
import io.kestra.jdbc.services.JdbcFilterService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -11,6 +13,10 @@ import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.Date;
import java.util.List;
@@ -45,4 +51,11 @@ public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
}
}
@Override
protected Temporal toNextExecutionTime(ZonedDateTime now) {
// next_execution_date in the table is stored in UTC
// convert 'now' to UTC LocalDateTime to avoid any timezone/offset interpretation by the database.
return now.withZoneSameInstant(ZoneOffset.UTC).toLocalDateTime();
}
}

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.filters.*;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.ExecutionStatistics;
@@ -60,7 +61,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
private static final Field<String> STATE_CURRENT_FIELD = field("state_current", String.class);
private static final Field<String> NAMESPACE_FIELD = field("namespace", String.class);
private static final Field<Object> START_DATE_FIELD = field("start_date");
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull();
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull().or(field("kind").eq(ExecutionKind.NORMAL.name()));
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.filters.AbstractFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
@@ -31,8 +32,8 @@ import java.util.stream.Stream;
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
public static final String DATE_COLUMN = "timestamp";
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull().or(field("execution_kind").eq(ExecutionKind.NORMAL.name()));
private static final String DATE_COLUMN = "timestamp";
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;
public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository,

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.filters.AbstractFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.metrics.MetricAggregation;
import io.kestra.core.models.executions.metrics.MetricAggregations;
@@ -34,7 +35,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface {
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull().or(field("execution_kind").eq(ExecutionKind.NORMAL.name()));
protected io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository;
public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository,

View File

@@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -151,7 +152,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
(field("next_execution_date").lessThan(now.toOffsetDateTime())
(field("next_execution_date").lessThan(toNextExecutionTime(now))
// we check for null for backwards compatibility
.or(field("next_execution_date").isNull()))
.and(field("execution_id").isNull())
@@ -162,14 +163,14 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
.fetch()
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class)));
}
public List<Trigger> findByNextExecutionDateReadyButLockedTriggers(ZonedDateTime now) {
return this.jdbcRepository.getDslContextWrapper()
.transactionResult(configuration -> DSL.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
(field("next_execution_date").lessThan(now.toOffsetDateTime())
(field("next_execution_date").lessThan(toNextExecutionTime(now))
// we check for null for backwards compatibility
.or(field("next_execution_date").isNull()))
.and(field("execution_id").isNotNull())
@@ -178,6 +179,10 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
.fetch()
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class))));
}
protected Temporal toNextExecutionTime(ZonedDateTime now) {
return now.toOffsetDateTime();
}
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;

View File

@@ -24,10 +24,10 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
}
/**
* Fetch the concurrency limit counter then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
* Fetch the concurrency limit counter, then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction, so the consumer should use the provided dslContext for any database access.
* <p>
* Note that to avoid a race when no concurrency limit counter exists, it first always try to insert a 0 counter.
* Note that to avoid a race when no concurrency limit counter exists, it first always tries to insert a 0 counter.
*/
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, ConcurrencyLimit, Pair<ExecutionRunning, ConcurrencyLimit>> consumer) {
return this.jdbcRepository
@@ -106,8 +106,7 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
.and(field("namespace").eq(flow.getNamespace()))
.and(field("flow_id").eq(flow.getId()));
return Optional.ofNullable(select.forUpdate().fetchOne())
.map(record -> this.jdbcRepository.map(record));
return this.jdbcRepository.fetchOne(select.forUpdate());
}
private void save(DSLContext dslContext, ConcurrencyLimit concurrencyLimit) {

View File

@@ -12,7 +12,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRepository {
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutionQueued> jdbcRepository;
@@ -70,18 +69,12 @@ public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRep
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
var select = DSL
.using(configuration)
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(buildTenantCondition(execution.getTenantId()))
.and(field("key").eq(IdUtils.fromParts(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
.forUpdate();
Optional<ExecutionQueued> maybeExecution = this.jdbcRepository.fetchOne(select);
if (maybeExecution.isPresent()) {
this.jdbcRepository.delete(maybeExecution.get());
}
DSL
.using(configuration)
.deleteFrom(this.jdbcRepository.getTable())
.where(buildTenantCondition(execution.getTenantId()))
.and(field("key").eq(IdUtils.fromParts(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
.execute();
});
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.jdbc.runner;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.FlowNotFoundException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.*;
@@ -241,7 +242,7 @@ public class JdbcExecutor implements ExecutorInterface {
final ExecutorsUtils executorsUtils,
final MaintenanceService maintenanceService,
@Value("${kestra.jdbc.executor.thread-count:0}") final int threadCount
) {
) {
this.serviceLivenessCoordinator = serviceLivenessCoordinator;
this.flowMetaStore = flowMetaStore;
this.flowTopologyRepository = flowTopologyRepository;
@@ -423,7 +424,7 @@ public class JdbcExecutor implements ExecutorInterface {
MultipleConditionEvent multipleConditionEvent = either.getLeft();
flowTriggerService.computeExecutionsFromFlowTriggers(multipleConditionEvent.execution(), List.of(multipleConditionEvent.flow()), Optional.of(multipleConditionStorage))
flowTriggerService.computeExecutionsFromFlowTriggerPreconditions(multipleConditionEvent.execution(), multipleConditionEvent.flow(), multipleConditionStorage)
.forEach(exec -> {
try {
executionQueue.emit(exec);
@@ -472,7 +473,7 @@ public class JdbcExecutor implements ExecutorInterface {
void reEmitWorkerJobsForWorkers(final Configuration configuration,
final List<String> ids) {
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_WORKER_JOB_RESUBMIT_COUNT, MetricRegistry.METRIC_EXECUTOR_WORKER_JOB_RESUBMIT_COUNT_DESCRIPTION)
.increment(ids.size());
.increment(ids.size());
workerJobRunningRepository.getWorkerJobWithWorkerDead(configuration.dsl(), ids)
.forEach(workerJobRunning -> {
@@ -551,10 +552,10 @@ public class JdbcExecutor implements ExecutorInterface {
execution,
FlowId.uidWithoutRevision(execution),
() -> {
Executor executor = new Executor(execution, null);
try {
final FlowWithSource flow = findFlow(execution);
Executor executor = new Executor(execution, null).withFlow(flow);
final FlowWithSource flow = findFlowOrThrow(execution);
executor = executor.withFlow(flow);
// schedule it for later if needed
if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
@@ -642,7 +643,7 @@ public class JdbcExecutor implements ExecutorInterface {
.forEach(throwConsumer(workerTask -> {
try {
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED).addAttempt(TaskRunAttempt.builder().state(new State().withState(State.Type.SKIPPED)).build())));
} else {
if (workerTask.getTask().isSendToWorkerTask()) {
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
@@ -650,8 +651,24 @@ public class JdbcExecutor implements ExecutorInterface {
.orElse(null);
workerJobQueue.emit(workerGroupKey, workerTask);
}
/// flowable attempt state transition to running
if (workerTask.getTask().isFlowable()) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
List<TaskRunAttempt> attempts = Optional.ofNullable(workerTask.getTaskRun().getAttempts())
.map(ArrayList::new)
.orElseGet(ArrayList::new);
attempts.add(
TaskRunAttempt.builder()
.state(new State().withState(State.Type.RUNNING))
.build()
);
TaskRun updatedTaskRun = workerTask.getTaskRun()
.withAttempts(attempts)
.withState(State.Type.RUNNING);
workerTaskResults.add(new WorkerTaskResult(updatedTaskRun));
}
}
} catch (Exception e) {
@@ -687,20 +704,20 @@ public class JdbcExecutor implements ExecutorInterface {
.filter(subflowExecution -> this.deduplicateSubflowExecution(execution, executorState, subflowExecution.getParentTaskRun()))
.toList();
subflowExecutionDedup
.forEach(throwConsumer(subflowExecution -> {
Execution subExecution = subflowExecution.getExecution();
String log = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
subflowExecutionDedup
.forEach(throwConsumer(subflowExecution -> {
Execution subExecution = subflowExecution.getExecution();
String log = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
JdbcExecutor.log.info(log);
JdbcExecutor.log.info(log);
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun(), subflowExecution.getExecution().getKind()).toBuilder()
.level(Level.INFO)
.message(log)
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
.thread(Thread.currentThread().getName())
.build()
);
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun(), subflowExecution.getExecution().getKind()).toBuilder()
.level(Level.INFO)
.message(log)
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
.thread(Thread.currentThread().getName())
.build()
);
executionQueue.emit(subflowExecution.getExecution());
}));
@@ -721,7 +738,20 @@ public class JdbcExecutor implements ExecutorInterface {
Span.current().recordException(e).setStatus(StatusCode.ERROR);
return null;
} catch (FlowNotFoundException e) {
// avoid infinite loop
if (!executor.getExecution().getState().getCurrent().isFailed()) {
return Pair.of(
handleFailedExecutionFromExecutor(executor, e),
executorState
);
}
}
return Pair.of(
executor,
executorState
);
}
);
});
@@ -758,7 +788,7 @@ public class JdbcExecutor implements ExecutorInterface {
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
try {
// process worker task result
executorService.addWorkerTaskResult(current, () -> findFlow(execution), message);
executorService.addWorkerTaskResult(current, throwSupplier(() -> findFlowOrThrow(execution)), message);
// join worker result
return Pair.of(
current,
@@ -769,7 +799,20 @@ public class JdbcExecutor implements ExecutorInterface {
handleFailedExecutionFromExecutor(current, e),
pair.getRight()
);
} catch (FlowNotFoundException e) {
// avoid infinite loop
if (!current.getExecution().getState().getCurrent().isFailed()) {
return Pair.of(
handleFailedExecutionFromExecutor(current, e),
pair.getRight()
);
}
}
return Pair.of(
current,
pair.getRight()
);
}
return null;
@@ -810,7 +853,7 @@ public class JdbcExecutor implements ExecutorInterface {
if (execution.hasTaskRunJoinable(message.getParentTaskRun())) { // TODO if we remove this check, we can avoid adding 'iteration' on the 'isSame()' method
try {
FlowWithSource flow = findFlow(execution);
FlowWithSource flow = findFlowOrThrow(execution);
Task task = flow.findTaskByTaskId(message.getParentTaskRun().getTaskId());
TaskRun taskRun;
@@ -872,7 +915,20 @@ public class JdbcExecutor implements ExecutorInterface {
handleFailedExecutionFromExecutor(current, e),
pair.getRight()
);
} catch (FlowNotFoundException e) {
// avoid infinite loop
if (!current.getExecution().getState().getCurrent().isFailed()) {
return Pair.of(
handleFailedExecutionFromExecutor(current, e),
pair.getRight()
);
}
}
return Pair.of(
current,
pair.getRight()
);
}
return null;
@@ -910,8 +966,8 @@ public class JdbcExecutor implements ExecutorInterface {
throw new IllegalStateException("Execution state don't exist for " + message.getParentExecutionId() + ", receive " + message);
}
FlowWithSource flow = findFlow(execution);
try {
FlowWithSource flow = findFlowOrThrow(execution);
ExecutableTask<?> executableTask = (ExecutableTask<?>) flow.findTaskByTaskId(message.getTaskId());
if (!executableTask.waitForExecution()) {
return null;
@@ -934,7 +990,7 @@ public class JdbcExecutor implements ExecutorInterface {
log.error("Unable to emit the subflow execution result", ex);
}
}
} catch (InternalException e) {
} catch (InternalException | FlowNotFoundException e) {
log.error("Unable to process the subflow execution end", e);
}
return null;
@@ -1063,11 +1119,18 @@ public class JdbcExecutor implements ExecutorInterface {
// We need to detect that and reset them as they will never reach the reset code later on this method.
if (execution.getTrigger() != null && execution.getState().isFailed() && ListUtils.isEmpty(execution.getTaskRunList())) {
FlowWithSource flow = executor.getFlow();
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> {
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
});
if (flow == null) {
log.error("Couldn't reset trigger for execution {} as flow {} is missing. Trigger {} might stay stuck.",
execution.getId(),
execution.getTenantId() + "/" + execution.getNamespace() + "/" + execution.getFlowId(),
execution.getTrigger().getId()
);
} else {
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
}
}
return;
@@ -1080,7 +1143,7 @@ public class JdbcExecutor implements ExecutorInterface {
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
// or from a worker task in an afterExecution block, in this case we need to load the flow
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
executor = executor.withFlow(findFlow(executor.getExecution()));
executor = executor.withFlow(findFlowOrThrow(executor.getExecution()));
}
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
@@ -1133,7 +1196,9 @@ public class JdbcExecutor implements ExecutorInterface {
boolean queuedThenKilled = execution.getState().getCurrent() == State.Type.KILLED
&& execution.getState().getHistories().stream().anyMatch(h -> h.getState().isQueued())
&& execution.getState().getHistories().stream().noneMatch(h -> h.getState().isRunning());
if (!queuedThenKilled) {
boolean concurrencyShortCircuitState = Concurrency.possibleTransitions(execution.getState().getCurrent())
&& execution.getState().getHistories().get(execution.getState().getHistories().size() - 2).getState().isCreated();
if (!queuedThenKilled && !concurrencyShortCircuitState) {
concurrencyLimitStorage.decrement(executor.getFlow());
if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
@@ -1177,7 +1242,7 @@ public class JdbcExecutor implements ExecutorInterface {
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
}
}
} catch (QueueException e) {
} catch (QueueException | FlowNotFoundException e) {
if (!ignoreFailure) {
// If we cannot add the new worker task result to the execution, we fail it
executionRepository.lock(executor.getExecution().getId(), pair -> {
@@ -1196,8 +1261,10 @@ public class JdbcExecutor implements ExecutorInterface {
private void processFlowTriggers(Execution execution) throws QueueException {
// directly process simple conditions
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
.filter(f ->ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f.getFlow()), Optional.empty()).stream())
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.map(f -> f.getFlow())
.distinct() // as computeExecutionsFromFlowTriggers is based on flow, we must map FlowWithFlowTrigger to a flow and distinct to avoid multiple execution for the same flow
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggerConditions(execution, f).stream())
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
// send multiple conditions to the multiple condition queue for later processing
@@ -1208,8 +1275,17 @@ public class JdbcExecutor implements ExecutorInterface {
.forEach(throwConsumer(multipleCondition -> multipleConditionEventQueue.emit(multipleCondition)));
}
private FlowWithSource findFlow(Execution execution) {
FlowInterface flow = this.flowMetaStore.findByExecution(execution).orElseThrow();
private FlowWithSource findFlowOrThrow(Execution execution) throws FlowNotFoundException {
return findFlow(execution).orElseThrow(() -> new FlowNotFoundException("Unable to find flow %s for execution %s".formatted(execution.getTenantId() + "/" + execution.getNamespace() + "/" + execution.getFlowId(), execution.getId())));
}
private Optional<FlowWithSource> findFlow(Execution execution) {
Optional<FlowInterface> maybeFlow = this.flowMetaStore.findByExecution(execution);
if (maybeFlow.isEmpty()) {
return Optional.empty();
}
FlowInterface flow = maybeFlow.get();
FlowWithSource flowWithSource = pluginDefaultService.injectDefaults(flow, execution);
if (templateExecutorInterface.isPresent()) {
@@ -1224,7 +1300,7 @@ public class JdbcExecutor implements ExecutorInterface {
}
}
return flowWithSource;
return Optional.of(flowWithSource);
}
/**
@@ -1271,7 +1347,7 @@ public class JdbcExecutor implements ExecutorInterface {
else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESTART_FAILED_TASK)) {
Execution newAttempt = executionService.retryTask(
pair.getKey(),
findFlow(pair.getKey()),
findFlowOrThrow(pair.getKey()),
executionDelay.getTaskRunId()
);
executor = executor.withExecution(newAttempt, "retryFailedTask");
@@ -1283,7 +1359,7 @@ public class JdbcExecutor implements ExecutorInterface {
}
// Handle WaitFor
else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.CONTINUE_FLOWABLE)) {
Execution execution = executionService.retryWaitFor(executor.getExecution(), executionDelay.getTaskRunId());
Execution execution = executionService.retryWaitFor(executor.getExecution(), executionDelay.getTaskRunId());
executor = executor.withExecution(execution, "continueLoop");
}
} catch (Exception e) {
@@ -1309,20 +1385,22 @@ public class JdbcExecutor implements ExecutorInterface {
slaMonitorStorage.processExpired(Instant.now(), slaMonitor -> {
Executor result = executionRepository.lock(slaMonitor.getExecutionId(), pair -> {
FlowWithSource flow = findFlow(pair.getLeft());
Executor executor = new Executor(pair.getLeft(), null).withFlow(flow);
Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();
if (sla.isEmpty()) {
// this can happen in case the flow has been updated and the SLA removed
log.debug("Cannot find the SLA '{}' in the flow for execution '{}', ignoring it.", slaMonitor.getSlaId(), slaMonitor.getExecutionId());
return null;
}
metricRegistry
.counter(MetricRegistry.METRIC_EXECUTOR_SLA_EXPIRED_COUNT, MetricRegistry.METRIC_EXECUTOR_SLA_EXPIRED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution()))
.increment();
Executor executor = new Executor(pair.getLeft(), null);
try {
// TODO flow with source is not needed here. Maybe the best would be to not add the flow inside the executor to trace all usage
FlowWithSource flow = findFlowOrThrow(pair.getLeft());
executor = executor.withFlow(flow);
Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();
if (sla.isEmpty()) {
// this can happen in case the flow has been updated and the SLA removed
log.debug("Cannot find the SLA '{}' in the flow for execution '{}', ignoring it.", slaMonitor.getSlaId(), slaMonitor.getExecutionId());
return null;
}
metricRegistry
.counter(MetricRegistry.METRIC_EXECUTOR_SLA_EXPIRED_COUNT, MetricRegistry.METRIC_EXECUTOR_SLA_EXPIRED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution()))
.increment();
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
Optional<Violation> violation = slaService.evaluateExecutionMonitoringSLA(runContext, executor.getExecution(), sla.get());
if (violation.isPresent()) { // should always be true

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.ScheduleContextInterface;
import io.kestra.core.runners.Scheduler;
import io.kestra.core.runners.SchedulerTriggerStateInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.FlowService;
@@ -56,6 +57,9 @@ public class JdbcScheduler extends AbstractScheduler {
.forEach(abstractTrigger -> triggerRepository.delete(Trigger.of(flow, abstractTrigger)));
}
});
// No-op consumption of the trigger queue, so the events are purged from the queue
this.triggerQueue.receive(Scheduler.class, trigger -> { });
}
@Override

View File

@@ -14,6 +14,7 @@ import org.jooq.impl.DSL;
import javax.sql.DataSource;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwPredicate;
@@ -45,10 +46,7 @@ public class JdbcTestUtils {
.meta()
.getTables()
.stream()
.filter(throwPredicate(table -> (table.getSchema().getName().equals(dataSource.getConnection().getCatalog())) ||
table.getSchema().getName().equals("public") || // for Postgres
table.getSchema().getName().equals("dbo") // for SQLServer
))
.filter(throwPredicate(table -> (table.getSchema().getName().equals(Optional.ofNullable(dataSource.getConnection().getSchema()).orElse(dataSource.getConnection().getCatalog())))))
.filter(table -> tableConfigs.getTableConfigs().stream().anyMatch(conf -> conf.table().equalsIgnoreCase(table.getName())))
.toList();
});

View File

@@ -1,6 +1,7 @@
package io.kestra.jdbc.runner;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
@@ -138,6 +139,7 @@ public abstract class JdbcRunnerRetryTest {
retryCaseTest.retryDynamicTask(execution);
}
@FlakyTest(description = "it seems this flow sometimes stay stuck in RUNNING")
@Test
@ExecuteFlow("flows/valids/retry-with-flowable-errors.yaml")
void retryWithFlowableErrors(Execution execution){

View File

@@ -6,21 +6,21 @@ import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.MessageTooBigException;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.AbstractRunnerTest;
import io.kestra.core.runners.InputsTest;
import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@@ -28,11 +28,32 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
public abstract class JdbcRunnerTest extends AbstractRunnerTest {
public static final String NAMESPACE = "io.kestra.tests";
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
private JdbcTestUtils jdbcTestUtils;
@Test
void avoidInfiniteExecutionLoop() throws QueueException, InterruptedException {
Flux<Execution> executionFlux = TestsUtils.receive(executionQueue);
Execution execution = Execution.newExecution(TestsUtils.mockFlow(), Collections.emptyList());
executionQueue.emit(execution);
// Wait some time to ensure no infinite loop occurs
Thread.sleep(500);
// We expect the initial execution message + the failed due to missing flow
assertThat(
Objects.requireNonNull(executionFlux.collectList().block()).stream()
.filter(e -> e.getId().equals(execution.getId()))
.toList()
).hasSize(2);
}
@Test
@LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"})
void waitForChildTaskWarning() throws Exception {

View File

@@ -114,6 +114,10 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.RUNNING) {
runningLatch.countDown();
}
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.FAILED) {
fail("Worker task result should not be in FAILED state as it should be resubmitted");
}
});
workerJobQueue.emit(workerTask(Duration.ofSeconds(5)));

View File

@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Singleton
public class TestRunner implements Runnable, AutoCloseable {
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors()) * 16;
@Setter private boolean schedulerEnabled = true;
@Setter private boolean workerEnabled = true;
@@ -49,7 +49,9 @@ public class TestRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
ExecutorInterface executor = applicationContext.getBean(ExecutorInterface.class);
servers.add(executor);
poolExecutor.execute(executor);
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -32,8 +32,8 @@
<strong>We're sorry but Kestra doesn't work properly without JavaScript enabled. Please enable it to continue.</strong>
</noscript>
<div id="loader-wrapper" data-test-id="loader-wrapper">
<div id="loader" data-test-id="loader"></div>
<div id="loader-wrapper">
<div id="loader"></div>
</div>
<div id="app-container">

View File

@@ -9,7 +9,7 @@
:data-component="`FILENAME_PLACEHOLDER#${tab}`"
>
<template #label>
<component :is="embedActiveTab || tab.disabled ? 'a' : 'router-link'" @click="embeddedTabChange(tab)" :to="embedActiveTab ? undefined : to(tab)" :data-test-id="tab.name">
<component :is="embedActiveTab || tab.disabled ? 'a' : 'router-link'" @click="embeddedTabChange(tab)" :to="embedActiveTab ? undefined : to(tab)">
<el-tooltip v-if="tab.disabled && tab.props && tab.props.showTooltip" :content="$t('add-trigger-in-editor')" placement="top">
<span><strong>{{ tab.title }}</strong></span>
</el-tooltip>

View File

@@ -272,6 +272,7 @@
import Id from "../Id.vue";
import SelectTableActions from "../../mixins/selectTableActions";
import _merge from "lodash/merge";
import moment from "moment";
import LogsWrapper from "../logs/LogsWrapper.vue";
import KestraFilter from "../filter/KestraFilter.vue"
import {mapStores} from "pinia";
@@ -500,6 +501,15 @@
loadQuery(base) {
let queryFilter = this.queryWithFilter();
const timeRange = queryFilter["filters[timeRange][EQUALS]"];
if (timeRange) {
const end = new Date();
const start = new Date(end.getTime() - moment.duration(timeRange).asMilliseconds());
queryFilter["filters[startDate][GREATER_THAN_OR_EQUAL_TO]"] = start.toISOString();
queryFilter["filters[endDate][LESS_THAN_OR_EQUAL_TO]"] = end.toISOString();
delete queryFilter["filters[timeRange][EQUALS]"];
}
return _merge(base, queryFilter)
},
},

View File

@@ -56,7 +56,7 @@
this.follow();
window.addEventListener("popstate", this.follow)
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId})).count;
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId}, true)).count;
},
mounted() {
this.previousExecutionId = this.$route.params.id

View File

@@ -6,13 +6,11 @@
class="filter"
>
<el-radio-button
data-test-id="date-filter-relative-selector"
:value="filterType.RELATIVE"
>
{{ $t("relative") }}
</el-radio-button>
<el-radio-button
data-test-id="date-filter-absolute-selector"
:value="filterType.ABSOLUTE"
>
{{ $t("absolute") }}
@@ -38,13 +36,11 @@
class="filter"
>
<el-radio-button
data-test-id="date-filter-relative-selector"
:value="filterType.RELATIVE"
>
{{ $t("relative") }}
</el-radio-button>
<el-radio-button
data-test-id="date-filter-absolute-selector"
:value="filterType.ABSOLUTE"
>
{{ $t("absolute") }}

View File

@@ -1,7 +1,6 @@
<template>
<el-tooltip :disabled="tooltip === undefined" :content="tooltip" effect="light">
<el-select
data-test-id="time-selector"
:model-value="value"
:placeholder="placeholder"
@change="$emit('change', $event)"

View File

@@ -528,27 +528,25 @@
}
.content-container {
height: calc(100vh - 0px);
overflow-y: auto !important;
overflow-y: scroll;
overflow-x: hidden;
scrollbar-gutter: stable;
word-wrap: break-word;
word-break: break-word;
}
:deep(.el-collapse) {
.el-collapse-item__wrap {
overflow-y: auto !important;
max-height: none !important;
}
.el-collapse-item__content {
overflow-y: auto !important;
word-wrap: break-word;
word-break: break-word;
}
}
:deep(.var-value) {
overflow-y: auto !important;
word-wrap: break-word;
word-break: break-word;
}

View File

@@ -46,7 +46,6 @@
<div class="right-align">
<el-form-item class="submit">
<el-button
:data-test-id="buttonTestId"
:icon="buttonIcon"
:disabled="!flowCanBeExecuted"
:class="{'flow-run-trigger-button': true, 'onboarding-glow': coreStore.guidedProperties.tourStarted}"

View File

@@ -213,7 +213,7 @@
/>
<template #footer>
<router-link
v-if="isSchedule(selectedTrigger.type)"
v-if="isSchedule(selectedTrigger?.type)"
:to="{
name: 'admin/triggers',
query: {

View File

@@ -28,7 +28,7 @@
:navbar="false"
v-if="(input.type === 'ENUM' || input.type === 'SELECT') && !input.isRadio"
:data-testid="`input-form-${input.id}`"
v-model="selectedTriggerLocal[input.id]"
v-model="inputsValues[input.id]"
@update:model-value="onChange(input)"
:allow-create="input.allowCustomValue"
filterable
@@ -238,7 +238,6 @@
/>
<duration-picker
v-if="input.type === 'DURATION'"
:data-testid="`input-form-${input.id}`"
v-model="inputsValues[input.id]"
@update:model-value="onChange(input)"
/>
@@ -334,7 +333,6 @@
multiSelectInputs: {},
inputsValidated: new Set(),
debouncedValidation: () => {},
selectedTriggerLocal: {},
editingArrayId: null,
editableItems: {},
};
@@ -344,8 +342,9 @@
this.inputsMetaData = JSON.parse(JSON.stringify(this.initialInputs));
this.debouncedValidation = debounce(this.validateInputs, 500)
if(this.selectedTrigger?.inputs) this.selectedTriggerLocal = toRaw(this.selectedTrigger.inputs);
else this.selectedTriggerLocal = this.inputsValues;
if(this.selectedTrigger?.inputs){
this.inputsValues = toRaw(this.selectedTrigger.inputs);
}
this.validateInputs().then(() => {
this.$watch("inputsValues", {
@@ -362,6 +361,10 @@
},
deep: true
});
// on first load default values need to be sent to the parent
// since they are part of the actual value
this.$emit("update:modelValue", this.inputsValues)
});
},
mounted() {
@@ -403,12 +406,12 @@
},
updateDefaults() {
for (const input of this.inputsMetaData || []) {
const {type, id, value} = input;
const {type, id, value, defaults} = input;
if (this.inputsValues[id] === undefined || this.inputsValues[id] === null || input.isDefault) {
if (type === "MULTISELECT") {
this.multiSelectInputs[id] = value;
this.multiSelectInputs[id] = value ?? defaults;
}
this.inputsValues[id] = Inputs.normalize(type, value);
this.inputsValues[id] = Inputs.normalize(type, value ?? defaults);
}
}
},
@@ -469,9 +472,9 @@
if (this.inputsMetaData === undefined || this.inputsMetaData.length === 0) {
return;
}
const inputsValuesWithNoDefault = this.inputsValuesWithNoDefault();
const formData = inputsToFormData(this, this.inputsMetaData, inputsValuesWithNoDefault);
const metadataCallback = (response) => {

View File

@@ -247,6 +247,7 @@
const suggestWidgetResizeObserver = ref<MutationObserver>()
const suggestWidgetObserver = ref<MutationObserver>()
const suggestWidget = ref<HTMLElement>()
const resizeObserver = ref<ResizeObserver>()
@@ -796,6 +797,20 @@
setTimeout(() => monaco.editor.remeasureFonts(), 1)
emit("editorDidMount", editorResolved.value);
/* Hhandle resizing. */
resizeObserver.value = new ResizeObserver(() => {
if (localEditor.value) {
localEditor.value.layout();
}
if (localDiffEditor.value) {
localDiffEditor.value.getModifiedEditor().layout();
localDiffEditor.value.getOriginalEditor().layout();
}
});
if (editorRef.value) {
resizeObserver.value.observe(editorRef.value);
}
highlightLine();
}
@@ -853,6 +868,8 @@
function destroy() {
disposeObservers();
disposeCompletions.value?.();
resizeObserver.value?.disconnect();
resizeObserver.value = undefined;
if (localDiffEditor.value !== undefined) {
localDiffEditor.value?.dispose();
localDiffEditor.value?.getModel()?.modified?.dispose();

View File

@@ -475,7 +475,7 @@
return this.namespacesStore
.createKv({
...this.kv,
contentType: ["DATE", "DATETIME"].includes(type) ? "text/plain" : "application/json",
contentType: "text/plain",
value
})
.then(() => {

View File

@@ -26,7 +26,9 @@
<p class="section-1-desc">
{{ $t("welcome_page.start") }}
</p>
<el-button
v-if="isOSS"
@click="startTour"
:icon="Plus"
size="large"
@@ -35,6 +37,18 @@
>
{{ $t("welcome button create") }}
</el-button>
<el-button
v-else
:icon="Plus"
tag="router-link"
:to="{name: 'flows/create'}"
size="large"
type="primary"
class="px-3 p-4 section-1-link product-link"
>
{{ $t("welcome button create") }}
</el-button>
<el-button
:icon="Play"
tag="a"
@@ -70,6 +84,7 @@
import permission from "../../models/permission";
import action from "../../models/action";
import {useAuthStore} from "override/stores/auth";
import {useMiscStore} from "override/stores/misc";
const {topbar = true} = defineProps<{topbar?: boolean}>();
@@ -87,6 +102,8 @@
const authStore = useAuthStore();
const isOSS = computed(() => useMiscStore().configs?.edition === "OSS")
const canCreate = computed(() => {
return authStore.user.hasAnyActionOnAnyNamespace(permission.FLOW, action.CREATE);
});

View File

@@ -100,7 +100,7 @@
>
<NamespaceSelect
v-model="secret.namespace"
:readonly="secret.update"
:read-only="secret.update"
:include-system-namespace="true"
all
/>

View File

@@ -506,7 +506,7 @@ export const useFlowStore = defineStore("flow", () => {
}
function loadDependencies(options: { namespace: string, id: string, subtype: "FLOW" | "EXECUTION" }, onlyCount = false) {
return store.$http.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies?expandAll=true`).then(response => {
return store.$http.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies?expandAll=${onlyCount ? false : true}`).then(response => {
return {
...(!onlyCount ? {data: transformResponse(response.data, options.subtype)} : {}),
count: response.data.nodes ? [...new Set(response.data.nodes.map((r:{uid:string}) => r.uid))].length : 0

View File

@@ -107,25 +107,11 @@
:deep(.alert-info) {
display: flex;
gap: 12px;
padding: 16px 16px 0 16px;
padding: .5rem !important;
background-color: var(--ks-background-info);
border: 1px solid var(--ks-border-info);
border-left-width: 5px;
border-radius: 8px;
&::before {
content: '!';
min-width: 20px;
height: 20px;
margin-top: 4px;
border-radius: 50%;
background: var(--ks-content-info);
border: 1px solid var(--ks-border-info);
color: var(--ks-content-inverse);
font: 600 13px/20px sans-serif;
text-align: center;
}
border-left-width: 0.25rem;
border-radius: 0.5rem;
p { color: var(--ks-content-info); }
}
@@ -135,7 +121,7 @@
color: var(--ks-content-info);
border: 1px solid var(--ks-border-info);
font-family: 'Courier New', Courier, monospace;
white-space: nowrap; // Prevent button text from wrapping
white-space: nowrap;
.material-design-icon {
position: absolute;

View File

@@ -870,19 +870,10 @@
"adding": "+ {what} hinzufügen",
"adding_default": "+ Neuen Wert hinzufügen",
"clearSelection": "Auswahl aufheben",
"close": {
"afterExecution": "Nach Ausführung Task schließen",
"conditions": "Bedingung schließen",
"errors": "Fehlerbehandler schließen",
"finally": "Task schließen",
"input": "Eingabe schließen",
"pluginDefaults": "Plugin-Standard schließen",
"tasks": "Task schließen",
"triggers": "Trigger schließen"
},
"creation": {
"afterExecution": "Fügen Sie einen Block nach der Ausführung hinzu",
"conditions": "Bedingung hinzufügen",
"default": "Hinzufügen",
"errors": "Einen Fehler-Handler hinzufügen",
"finally": "Fügen Sie einen Finally-Block hinzu",
"inputs": "Ein Eingabefeld hinzufügen",
@@ -916,6 +907,10 @@
"variable": "Variable",
"yaml": "YAML-Editor"
},
"remove": {
"cases": "Diesen Fall entfernen",
"default": "Diesen Eintrag entfernen"
},
"sections": {
"afterExecution": "Nach Ausführung",
"connection": "Verbindungseigenschaften",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Wählen Sie eine Task aus",
"conditions": "Wählen Sie eine Bedingung aus",
"default": "Wählen Sie einen Typ aus",
"errors": "Wählen Sie eine Task aus",
"finally": "Wählen Sie eine Task aus",
"inputs": "Wählen Sie einen Input-Feldtyp aus",

View File

@@ -870,19 +870,10 @@
"adding": "+ Agregar un {what}",
"adding_default": "+ Añadir un nuevo value",
"clearSelection": "Borrar selección",
"close": {
"afterExecution": "Cerrar después de la ejecución de la task",
"conditions": "Condición de cierre",
"errors": "Cerrar el manejador de errores",
"finally": "Cerrar task",
"input": "Cerrar input",
"pluginDefaults": "Cerrar plugin predeterminado",
"tasks": "Cerrar task",
"triggers": "Cerrar trigger"
},
"creation": {
"afterExecution": "Agregar un bloque después de la ejecución",
"conditions": "Agregar una condición",
"default": "Agregar",
"errors": "Agregar un manejador de errores",
"finally": "Agregar un bloque finally",
"inputs": "Agregar un campo de input",
@@ -916,6 +907,10 @@
"variable": "Variable",
"yaml": "Editor YAML"
},
"remove": {
"cases": "Eliminar este caso",
"default": "Eliminar esta entrada"
},
"sections": {
"afterExecution": "Después de la Ejecución",
"connection": "Propiedades de conexión",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Seleccionar una task",
"conditions": "Seleccione una condición",
"default": "Seleccionar un tipo",
"errors": "Selecciona una task",
"finally": "Seleccionar una task",
"inputs": "Seleccione un tipo de campo de input",

View File

@@ -870,19 +870,10 @@
"adding": "+ Ajouter un {what}",
"adding_default": "+ Ajouter une nouvelle valeur",
"clearSelection": "Effacer la sélection",
"close": {
"afterExecution": "Fermer après l'exécution de la task",
"conditions": "Condition de fermeture",
"errors": "Fermer le gestionnaire d'erreurs",
"finally": "Fermer la task",
"input": "Fermer l'input",
"pluginDefaults": "Fermer le plugin par défaut",
"tasks": "Fermer la task",
"triggers": "Fermer le trigger"
},
"creation": {
"afterExecution": "Ajouter un bloc après l'exécution",
"conditions": "Ajouter une condition",
"default": "Ajouter",
"errors": "Ajouter un gestionnaire d'erreurs",
"finally": "Ajouter un bloc finally",
"inputs": "Ajouter un champ d'input",
@@ -916,6 +907,10 @@
"variable": "Variable",
"yaml": "Éditeur YAML"
},
"remove": {
"cases": "Supprimer ce cas",
"default": "Supprimer cette entrée"
},
"sections": {
"afterExecution": "Après l'Exécution",
"connection": "Propriétés de connexion",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Sélectionnez une task",
"conditions": "Sélectionner une condition",
"default": "Sélectionner un type",
"errors": "Sélectionner une task",
"finally": "Sélectionnez une task",
"inputs": "Sélectionnez un type de champ input",

View File

@@ -870,19 +870,10 @@
"adding": "+ एक {what} जोड़ें",
"adding_default": "+ एक नया value जोड़ें",
"clearSelection": "चयन साफ़ करें",
"close": {
"afterExecution": "कार्य पूरा होने के बाद बंद करें",
"conditions": "बंद करने की शर्त",
"errors": "त्रुटि हैंडलर बंद करें",
"finally": "टास्क बंद करें",
"input": "इनपुट बंद करें",
"pluginDefaults": "प्लगइन डिफ़ॉल्ट बंद करें",
"tasks": "टास्क बंद करें",
"triggers": "ट्रिगर बंद करें"
},
"creation": {
"afterExecution": "एक निष्पादन के बाद ब्लॉक जोड़ें",
"conditions": "एक शर्त जोड़ें",
"default": "जोड़ें",
"errors": "त्रुटि हैंडलर जोड़ें",
"finally": "अंत में एक finally ब्लॉक जोड़ें",
"inputs": "इनपुट फ़ील्ड जोड़ें",
@@ -916,6 +907,10 @@
"variable": "वेरिएबल",
"yaml": "YAML संपादक"
},
"remove": {
"cases": "इस केस को हटाएं",
"default": "इस प्रविष्टि को हटाएं"
},
"sections": {
"afterExecution": "Execution के बाद",
"connection": "कनेक्शन गुणधर्म",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "एक task चुनें",
"conditions": "कंडीशन चुनें",
"default": "एक प्रकार चुनें",
"errors": "एक task चुनें",
"finally": "एक task चुनें",
"inputs": "इनपुट फ़ील्ड प्रकार चुनें",

View File

@@ -870,19 +870,10 @@
"adding": "+ Aggiungi un {what}",
"adding_default": "+ Aggiungi un nuovo value",
"clearSelection": "Cancella selezione",
"close": {
"afterExecution": "Chiudi dopo l'esecuzione del task",
"conditions": "Condizione di chiusura",
"errors": "Chiudi error handler",
"finally": "Chiudi task",
"input": "Chiudi input",
"pluginDefaults": "Chiudi plugin predefinito",
"tasks": "Chiudi task",
"triggers": "Chiudi trigger"
},
"creation": {
"afterExecution": "Aggiungi un blocco after execution",
"conditions": "Aggiungi una condizione",
"default": "Aggiungi",
"errors": "Aggiungi un gestore degli errori",
"finally": "Aggiungi un blocco finally",
"inputs": "Aggiungi un campo di input",
@@ -916,6 +907,10 @@
"variable": "Variabile",
"yaml": "Editor YAML"
},
"remove": {
"cases": "Rimuovi questo caso",
"default": "Rimuovi questa voce"
},
"sections": {
"afterExecution": "Dopo l'Esecuzione",
"connection": "Proprietà di connessione",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Seleziona un task",
"conditions": "Seleziona una condizione",
"default": "Seleziona un tipo",
"errors": "Seleziona un task",
"finally": "Seleziona un task",
"inputs": "Seleziona un tipo di input field",

View File

@@ -870,19 +870,10 @@
"adding": "+ {what}を追加",
"adding_default": "+ 新しいvalueを追加",
"clearSelection": "選択をクリア",
"close": {
"afterExecution": "実行後にタスクを閉じる",
"conditions": "クローズ条件",
"errors": "エラーハンドラーを閉じる",
"finally": "タスクを閉じる",
"input": "入力を閉じる",
"pluginDefaults": "プラグインのデフォルトを閉じる",
"tasks": "タスクを閉じる",
"triggers": "トリガーを閉じる"
},
"creation": {
"afterExecution": "実行後ブロックを追加",
"conditions": "条件を追加",
"default": "追加",
"errors": "エラーハンドラーを追加",
"finally": "finally ブロックを追加",
"inputs": "入力フィールドを追加",
@@ -916,6 +907,10 @@
"variable": "変数",
"yaml": "YAMLエディター"
},
"remove": {
"cases": "このケースを削除",
"default": "このエントリを削除"
},
"sections": {
"afterExecution": "実行後",
"connection": "接続プロパティ",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "タスクを選択",
"conditions": "条件を選択",
"default": "タイプを選択",
"errors": "タスクを選択",
"finally": "タスクを選択",
"inputs": "入力フィールドタイプを選択",

View File

@@ -870,19 +870,10 @@
"adding": "+ {what} 추가",
"adding_default": "+ 새 value 추가",
"clearSelection": "선택 해제",
"close": {
"afterExecution": "실행 task 후 닫기",
"conditions": "닫기 조건",
"errors": "오류 처리기 닫기",
"finally": "작업 닫기",
"input": "입력 닫기",
"pluginDefaults": "플러그인 기본값 닫기",
"tasks": "작업 닫기",
"triggers": "트리거 닫기"
},
"creation": {
"afterExecution": "실행 후 블록 추가",
"conditions": "조건 추가",
"default": "추가",
"errors": "오류 처리기 추가",
"finally": "마지막 블록 추가",
"inputs": "입력 필드 추가",
@@ -916,6 +907,10 @@
"variable": "변수",
"yaml": "YAML 편집기"
},
"remove": {
"cases": "이 케이스를 제거하십시오",
"default": "이 항목 제거"
},
"sections": {
"afterExecution": "실행 후",
"connection": "연결 속성",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "작업 선택",
"conditions": "조건 선택",
"default": "유형 선택",
"errors": "작업 선택",
"finally": "작업 선택",
"inputs": "입력 필드 유형 선택",

View File

@@ -870,19 +870,10 @@
"adding": "+ Dodaj {what}",
"adding_default": "+ Dodaj nową wartość",
"clearSelection": "Wyczyść zaznaczenie",
"close": {
"afterExecution": "Zamknij po wykonaniu task",
"conditions": "Zamknij warunek",
"errors": "Zamknij obsługę błędów",
"finally": "Zamknij task",
"input": "Zamknij input",
"pluginDefaults": "Zamknij domyślną wtyczkę",
"tasks": "Zamknij task",
"triggers": "Zamknij trigger"
},
"creation": {
"afterExecution": "Dodaj blok po wykonaniu",
"conditions": "Dodaj warunek",
"default": "Dodaj",
"errors": "Dodaj obsługę błędów",
"finally": "Dodaj blok finally",
"inputs": "Dodaj pole input",
@@ -916,6 +907,10 @@
"variable": "Zmienna",
"yaml": "Edytor YAML"
},
"remove": {
"cases": "Usuń ten przypadek",
"default": "Usuń ten wpis"
},
"sections": {
"afterExecution": "Po wykonaniu",
"connection": "Właściwości połączenia",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Wybierz task",
"conditions": "Wybierz warunek",
"default": "Wybierz typ",
"errors": "Wybierz task",
"finally": "Wybierz task",
"inputs": "Wybierz typ pola input",

View File

@@ -870,19 +870,10 @@
"adding": "+ Adicionar um {what}",
"adding_default": "+ Adicionar um novo value",
"clearSelection": "Limpar seleção",
"close": {
"afterExecution": "Fechar após execução da task",
"conditions": "Condição de fechamento",
"errors": "Fechar manipulador de erro",
"finally": "Fechar task",
"input": "Fechar input",
"pluginDefaults": "Fechar plugin padrão",
"tasks": "Fechar task",
"triggers": "Fechar trigger"
},
"creation": {
"afterExecution": "Adicionar um bloco após a execução",
"conditions": "Adicionar uma condição",
"default": "Adicionar",
"errors": "Adicionar um manipulador de erro",
"finally": "Adicionar um bloco finally",
"inputs": "Adicionar um campo de input",
@@ -916,6 +907,10 @@
"variable": "Variável",
"yaml": "Editor YAML"
},
"remove": {
"cases": "Remover este caso",
"default": "Remover esta entrada"
},
"sections": {
"afterExecution": "Após a Execução",
"connection": "Propriedades de conexão",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Selecione uma task",
"conditions": "Selecione uma condição",
"default": "Selecione um tipo",
"errors": "Selecione uma task",
"finally": "Selecione uma task",
"inputs": "Selecione um tipo de campo de input",

File diff suppressed because it is too large Load Diff

View File

@@ -870,19 +870,10 @@
"adding": "+ Добавить {what}",
"adding_default": "+ Добавить новое value",
"clearSelection": "Очистить выбор",
"close": {
"afterExecution": "Закрыть после выполнения task",
"conditions": "Закрыть условие",
"errors": "Закрыть обработчик ошибок",
"finally": "Закрыть task",
"input": "Закрыть input",
"pluginDefaults": "Закрыть плагин по умолчанию",
"tasks": "Закрыть task",
"triggers": "Закрыть trigger"
},
"creation": {
"afterExecution": "Добавить блок после выполнения",
"conditions": "Добавить условие",
"default": "Добавить",
"errors": "Добавить обработчик ошибок",
"finally": "Добавить блок finally",
"inputs": "Добавить поле input",
@@ -916,6 +907,10 @@
"variable": "Переменная",
"yaml": "Редактор YAML"
},
"remove": {
"cases": "Удалить этот кейс",
"default": "Удалить эту запись"
},
"sections": {
"afterExecution": "После выполнения",
"connection": "Свойства подключения",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Выберите task",
"conditions": "Выберите условие",
"default": "Выберите тип",
"errors": "Выберите task",
"finally": "Выберите task",
"inputs": "Выберите тип поля input",

Some files were not shown because too many files have changed in this diff Show More