mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
69 Commits
fix/sdk-ch
...
v1.0.16
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9da8ba2f22 | ||
|
|
20330384ca | ||
|
|
d5ba45acba | ||
|
|
f701f15dcb | ||
|
|
55a507b621 | ||
|
|
3cd357f311 | ||
|
|
c3e0b6d740 | ||
|
|
3209ea9657 | ||
|
|
72b261129d | ||
|
|
26c83390ba | ||
|
|
7ba6bc6d30 | ||
|
|
fc86ef7bb4 | ||
|
|
69b46fa3b8 | ||
|
|
d681e349a1 | ||
|
|
165951a8f3 | ||
|
|
be8de252ae | ||
|
|
8a6093615a | ||
|
|
09b6964f16 | ||
|
|
7f2d4d02d6 | ||
|
|
7e200d9ebc | ||
|
|
d361c33f63 | ||
|
|
31438ffff0 | ||
|
|
18caf45521 | ||
|
|
50d6de75f4 | ||
|
|
4c054f9d24 | ||
|
|
5bad8dd3c7 | ||
|
|
69b1921236 | ||
|
|
4e99a253e3 | ||
|
|
97d0a93e01 | ||
|
|
e2d8d51843 | ||
|
|
8567ff5490 | ||
|
|
050e22dd09 | ||
|
|
3552eeefbb | ||
|
|
2e47fb8285 | ||
|
|
b52a07e562 | ||
|
|
3f7c01db41 | ||
|
|
f5dbec96e0 | ||
|
|
fe7a6d9af9 | ||
|
|
06c8c35061 | ||
|
|
8f23e813f2 | ||
|
|
47b7c7cd2e | ||
|
|
aca7c2f694 | ||
|
|
a0f29b7d5d | ||
|
|
0176c8c101 | ||
|
|
b0036bbfca | ||
|
|
fad5edbde8 | ||
|
|
f125f63ae5 | ||
|
|
6db1bfb2ce | ||
|
|
0957e07c78 | ||
|
|
5a4a5e44df | ||
|
|
faee3f1827 | ||
|
|
3604762da0 | ||
|
|
6ceb0de1d5 | ||
|
|
4a62f9c818 | ||
|
|
d14f3e3317 | ||
|
|
7e9030dfcf | ||
|
|
2fce17a8a9 | ||
|
|
67d8509106 | ||
|
|
01e92a6d79 | ||
|
|
883b7c8610 | ||
|
|
11ef823567 | ||
|
|
771cca1441 | ||
|
|
53e8674dfc | ||
|
|
59016ae1af | ||
|
|
7503d6fa21 | ||
|
|
0234a4c64c | ||
|
|
98c9c4d21f | ||
|
|
8e54183a44 | ||
|
|
8aa332c629 |
@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
|
||||
Duration ttl = expiration == null ? null : Duration.parse(expiration);
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
|
||||
.contentType(MediaType.APPLICATION_JSON_TYPE);
|
||||
.contentType(MediaType.TEXT_PLAIN);
|
||||
|
||||
if (ttl != null) {
|
||||
request.header("ttl", ttl.toString());
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
import io.kestra.core.runners.ExecutorInterface;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.core.services.StartExecutorService;
|
||||
@@ -10,6 +12,8 @@ import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -19,6 +23,9 @@ import java.util.Map;
|
||||
description = "Start the Kestra executor"
|
||||
)
|
||||
public class ExecutorCommand extends AbstractServerCommand {
|
||||
@CommandLine.Spec
|
||||
CommandLine.Model.CommandSpec spec;
|
||||
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@@ -28,22 +35,28 @@ public class ExecutorCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private StartExecutorService startExecutorService;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "The list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "Tenant identifier required to load flows from the specified path")
|
||||
private File flowPath;
|
||||
|
||||
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path")
|
||||
private String tenantId;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "List of execution IDs to skip, separated by commas; for troubleshooting only")
|
||||
private List<String> skipExecutions = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "The list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "List of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipFlows = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "The list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "List of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipNamespaces = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "The list of tenants to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "List of tenants to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipTenants = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "The list of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
|
||||
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "List of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue; for debugging only")
|
||||
private List<String> startExecutors = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--not-start-executors"}, split=",", description = "The list of Kafka Stream executors to not start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
|
||||
@CommandLine.Option(names = {"--not-start-executors"}, split=",", description = "Lst of Kafka Stream executors to not start, separated by a command. Use it only with the Kafka queue; for debugging only")
|
||||
private List<String> notStartExecutors = Collections.emptyList();
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@@ -64,6 +77,16 @@ public class ExecutorCommand extends AbstractServerCommand {
|
||||
|
||||
super.call();
|
||||
|
||||
if (flowPath != null) {
|
||||
try {
|
||||
LocalFlowRepositoryLoader localFlowRepositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
|
||||
TenantIdSelectorService tenantIdSelectorService = applicationContext.getBean(TenantIdSelectorService.class);
|
||||
localFlowRepositoryLoader.load(tenantIdSelectorService.getTenantId(this.tenantId), this.flowPath);
|
||||
} catch (IOException e) {
|
||||
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow path", e);
|
||||
}
|
||||
}
|
||||
|
||||
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
|
||||
executorService.run();
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ public class IndexerCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private SkipExecutionService skipExecutionService;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
|
||||
@@ -42,7 +42,7 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@Nullable
|
||||
private FileChangedEventListener fileWatcher;
|
||||
|
||||
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "the flow path containing flow to inject at startup (when running with a memory flow repository)")
|
||||
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "Tenant identifier required to load flows from the specified path")
|
||||
private File flowPath;
|
||||
|
||||
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
|
||||
@@ -51,19 +51,19 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
private int workerThread = defaultWorkerThread();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipExecutions = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipFlows = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipNamespaces = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipTenants = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
|
||||
|
||||
@@ -40,7 +40,7 @@ public class WebServerCommand extends AbstractServerCommand {
|
||||
@Option(names = {"--no-indexer"}, description = "Flag to disable starting an embedded indexer.")
|
||||
private boolean indexerDisabled = false;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
@Override
|
||||
|
||||
@@ -30,15 +30,15 @@ micronaut:
|
||||
read-idle-timeout: 60m
|
||||
write-idle-timeout: 60m
|
||||
idle-timeout: 60m
|
||||
netty:
|
||||
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
|
||||
max-chunk-size: 10MB
|
||||
max-header-size: 32768 # increased from the default of 8k
|
||||
responses:
|
||||
file:
|
||||
cache-seconds: 86400
|
||||
cache-control:
|
||||
public: true
|
||||
netty:
|
||||
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
|
||||
max-chunk-size: 10MB
|
||||
max-header-size: 32768 # increased from the default of 8k
|
||||
|
||||
# Access log configuration, see https://docs.micronaut.io/latest/guide/index.html#accessLogger
|
||||
access-logger:
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
/**
|
||||
* Exception that can be thrown when a Flow is not found.
|
||||
*/
|
||||
public class FlowNotFoundException extends NotFoundException {
|
||||
|
||||
/**
|
||||
* Creates a new {@link FlowNotFoundException} instance.
|
||||
*/
|
||||
public FlowNotFoundException() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link NotFoundException} instance.
|
||||
*
|
||||
* @param message the error message.
|
||||
*/
|
||||
public FlowNotFoundException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipInputStream;
|
||||
@@ -65,7 +64,7 @@ public interface HasSource {
|
||||
|
||||
if (isYAML(fileName)) {
|
||||
byte[] bytes = inputStream.readAllBytes();
|
||||
List<String> sources = List.of(new String(bytes).split("---"));
|
||||
List<String> sources = List.of(new String(bytes).split("(?m)^---\\s*$"));
|
||||
for (int i = 0; i < sources.size(); i++) {
|
||||
String source = sources.get(i);
|
||||
reader.accept(source, String.valueOf(i));
|
||||
|
||||
@@ -1,16 +1,33 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interface that can be implemented by classes supporting plugin versioning.
|
||||
*
|
||||
* @see Plugin
|
||||
*/
|
||||
public interface PluginVersioning {
|
||||
|
||||
String TITLE = "Plugin Version";
|
||||
String DESCRIPTION = """
|
||||
Defines the version of the plugin to use.
|
||||
|
||||
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
|
||||
@Schema(title = "The version of the plugin to use.")
|
||||
The version must follow the Semantic Versioning (SemVer) specification:
|
||||
- A single-digit MAJOR version (e.g., `1`).
|
||||
- A MAJOR.MINOR version (e.g., `1.1`).
|
||||
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
|
||||
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
|
||||
""";
|
||||
|
||||
@Schema(
|
||||
title = TITLE,
|
||||
description = DESCRIPTION
|
||||
)
|
||||
String getVersion();
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
import io.kestra.plugin.core.dashboard.data.IData;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
@@ -36,6 +38,8 @@ public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F
|
||||
private Map<String, C> columns;
|
||||
|
||||
@Setter
|
||||
@Valid
|
||||
@Nullable
|
||||
private List<AbstractFilter<F>> where;
|
||||
|
||||
private List<OrderBy> orderBy;
|
||||
|
||||
@@ -28,6 +28,7 @@ import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
@@ -77,10 +78,12 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
||||
@Schema(implementation = Object.class)
|
||||
Map<String, Object> inputs;
|
||||
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
||||
@Schema(implementation = Object.class)
|
||||
Map<String, Object> outputs;
|
||||
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@@ -88,6 +91,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
List<Label> labels;
|
||||
|
||||
@With
|
||||
@Schema(implementation = Object.class)
|
||||
Map<String, Object> variables;
|
||||
|
||||
@NotNull
|
||||
@@ -647,18 +651,20 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
|
||||
return this.findTaskRunByTasks(resolvedTasks, parentTaskRun)
|
||||
.stream()
|
||||
.anyMatch(taskRun -> {
|
||||
ResolvedTask resolvedTask = resolvedTasks.stream()
|
||||
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
|
||||
.orElse(null);
|
||||
if (resolvedTask == null) {
|
||||
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
|
||||
taskRun.getId(), parentTaskRun.getId());
|
||||
return false;
|
||||
}
|
||||
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry())
|
||||
&& taskRun.getState().isFailed();
|
||||
});
|
||||
// NOTE: we check on isFailed first to avoid the costly shouldBeRetried() method
|
||||
.anyMatch(taskRun -> taskRun.getState().isFailed() && shouldNotBeRetried(resolvedTasks, parentTaskRun, taskRun));
|
||||
}
|
||||
|
||||
private static boolean shouldNotBeRetried(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun, TaskRun taskRun) {
|
||||
ResolvedTask resolvedTask = resolvedTasks.stream()
|
||||
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
|
||||
.orElse(null);
|
||||
if (resolvedTask == null) {
|
||||
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
|
||||
taskRun.getId(), parentTaskRun.getId());
|
||||
return false;
|
||||
}
|
||||
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry());
|
||||
}
|
||||
|
||||
public boolean hasCreated() {
|
||||
|
||||
@@ -3,10 +3,13 @@ package io.kestra.core.models.executions;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
@@ -53,6 +56,7 @@ public class TaskRun implements TenantInterface {
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.ALWAYS)
|
||||
@Nullable
|
||||
@Schema(implementation = Object.class)
|
||||
Variables outputs;
|
||||
|
||||
@NotNull
|
||||
@@ -310,4 +314,11 @@ public class TaskRun implements TenantInterface {
|
||||
.build();
|
||||
}
|
||||
|
||||
public TaskRun addAttempt(TaskRunAttempt attempt) {
|
||||
if (this.attempts == null) {
|
||||
this.attempts = new ArrayList<>();
|
||||
}
|
||||
this.attempts.add(attempt);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,4 +24,8 @@ public class Concurrency {
|
||||
public enum Behavior {
|
||||
QUEUE, CANCEL, FAIL;
|
||||
}
|
||||
|
||||
public static boolean possibleTransitions(State.Type type) {
|
||||
return type.equals(State.Type.CANCELLED) || type.equals(State.Type.FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,6 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@JsonDeserialize(using = Property.PropertyDeserializer.class)
|
||||
@JsonSerialize(using = Property.PropertySerializer.class)
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor(access = AccessLevel.PACKAGE)
|
||||
@Schema(
|
||||
oneOf = {
|
||||
@@ -51,6 +50,7 @@ public class Property<T> {
|
||||
.copy()
|
||||
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
|
||||
|
||||
private final boolean skipCache;
|
||||
private String expression;
|
||||
private T value;
|
||||
|
||||
@@ -60,13 +60,23 @@ public class Property<T> {
|
||||
@Deprecated
|
||||
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
|
||||
public Property(String expression) {
|
||||
this.expression = expression;
|
||||
this(expression, false);
|
||||
}
|
||||
|
||||
private Property(String expression, boolean skipCache) {
|
||||
this.expression = expression;
|
||||
this.skipCache = skipCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #ofValue(Object)} instead.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Deprecated
|
||||
public Property(Map<?, ?> map) {
|
||||
try {
|
||||
expression = MAPPER.writeValueAsString(map);
|
||||
this.skipCache = false;
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
@@ -79,9 +89,6 @@ public class Property<T> {
|
||||
/**
|
||||
* Returns a new {@link Property} with no cached rendered value,
|
||||
* so that the next render will evaluate its original Pebble expression.
|
||||
* <p>
|
||||
* The returned property will still cache its rendered result.
|
||||
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
|
||||
*
|
||||
* @return a new {@link Property} without a pre-rendered value
|
||||
*/
|
||||
@@ -133,6 +140,7 @@ public class Property<T> {
|
||||
|
||||
/**
|
||||
* Build a new Property object with a Pebble expression.<br>
|
||||
* This property object will not cache its rendered value.
|
||||
* <p>
|
||||
* Use {@link #ofValue(Object)} to build a property with a value instead.
|
||||
*/
|
||||
@@ -142,11 +150,11 @@ public class Property<T> {
|
||||
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
|
||||
}
|
||||
|
||||
return new Property<>(expression);
|
||||
return new Property<>(expression, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it to its target type.<br>
|
||||
* Render a property, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
*
|
||||
@@ -164,7 +172,7 @@ public class Property<T> {
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
if (property.skipCache || property.value == null) {
|
||||
String rendered = context.render(property.expression, variables);
|
||||
property.value = MAPPER.convertValue(rendered, clazz);
|
||||
}
|
||||
@@ -192,7 +200,7 @@ public class Property<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
if (property.skipCache || property.value == null) {
|
||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||
try {
|
||||
String trimmedExpression = property.expression.trim();
|
||||
@@ -244,7 +252,7 @@ public class Property<T> {
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
if (property.skipCache || property.value == null) {
|
||||
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);
|
||||
|
||||
try {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.io.File;
|
||||
@@ -33,7 +34,7 @@ public record PluginArtifact(
|
||||
String version,
|
||||
URI uri
|
||||
) implements Comparable<PluginArtifact> {
|
||||
|
||||
|
||||
private static final Pattern ARTIFACT_PATTERN = Pattern.compile(
|
||||
"([^: ]+):([^: ]+)(:([^: ]*)(:([^: ]+))?)?:([^: ]+)"
|
||||
);
|
||||
@@ -42,7 +43,8 @@ public record PluginArtifact(
|
||||
);
|
||||
|
||||
public static final String JAR_EXTENSION = "jar";
|
||||
|
||||
public static final String KESTRA_GROUP_ID = "io.kestra";
|
||||
|
||||
/**
|
||||
* Static helper method for constructing a new {@link PluginArtifact} from a JAR file.
|
||||
*
|
||||
@@ -135,6 +137,11 @@ public record PluginArtifact(
|
||||
public String toString() {
|
||||
return toCoordinates();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isOfficial() {
|
||||
return groupId.startsWith(KESTRA_GROUP_ID);
|
||||
}
|
||||
|
||||
public String toCoordinates() {
|
||||
return Stream.of(groupId, artifactId, extension, classifier, version)
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.Version;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpMethod;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.HttpResponse;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.HttpClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -15,9 +19,12 @@ import java.util.Base64;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Services for retrieving available plugin artifacts for Kestra.
|
||||
@@ -39,6 +46,8 @@ public class PluginCatalogService {
|
||||
|
||||
private final boolean icons;
|
||||
private final boolean oss;
|
||||
|
||||
private final Version currentStableVersion;
|
||||
|
||||
/**
|
||||
* Creates a new {@link PluginCatalogService} instance.
|
||||
@@ -53,11 +62,55 @@ public class PluginCatalogService {
|
||||
this.httpClient = httpClient;
|
||||
this.icons = icons;
|
||||
this.oss = communityOnly;
|
||||
|
||||
|
||||
Version version = Version.of(KestraContext.getContext().getVersion());
|
||||
this.currentStableVersion = new Version(version.majorVersion(), version.minorVersion(), version.patchVersion(), null);
|
||||
|
||||
// Immediately trigger an async load of plugin artifacts.
|
||||
this.isLoaded.set(true);
|
||||
this.plugins = CompletableFuture.supplyAsync(this::load);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the version for the given artifacts.
|
||||
*
|
||||
* @param artifacts The list of artifacts to resolve.
|
||||
* @return The list of results.
|
||||
*/
|
||||
public List<PluginResolutionResult> resolveVersions(List<PluginArtifact> artifacts) {
|
||||
if (ListUtils.isEmpty(artifacts)) {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
final Map<String, ApiPluginArtifact> pluginsByGroupAndArtifactId = getAllCompatiblePlugins().stream()
|
||||
.collect(Collectors.toMap(it -> it.groupId() + ":" + it.artifactId(), Function.identity()));
|
||||
|
||||
return artifacts.stream().map(it -> {
|
||||
// Get all compatible versions for current artifact
|
||||
List<String> versions = Optional
|
||||
.ofNullable(pluginsByGroupAndArtifactId.get(it.groupId() + ":" + it.artifactId()))
|
||||
.map(ApiPluginArtifact::versions)
|
||||
.orElse(List.of());
|
||||
|
||||
// Try to resolve the version
|
||||
String resolvedVersion = null;
|
||||
if (!versions.isEmpty()) {
|
||||
if (it.version().equalsIgnoreCase("LATEST")) {
|
||||
resolvedVersion = versions.getFirst();
|
||||
} else {
|
||||
resolvedVersion = versions.contains(it.version()) ? it.version() : null;
|
||||
}
|
||||
}
|
||||
|
||||
// Build the PluginResolutionResult
|
||||
return new PluginResolutionResult(
|
||||
it,
|
||||
resolvedVersion,
|
||||
versions,
|
||||
resolvedVersion != null
|
||||
);
|
||||
}).toList();
|
||||
}
|
||||
|
||||
public synchronized List<PluginManifest> get() {
|
||||
try {
|
||||
@@ -140,7 +193,27 @@ public class PluginCatalogService {
|
||||
isLoaded.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
private List<ApiPluginArtifact> getAllCompatiblePlugins() {
|
||||
|
||||
MutableHttpRequest<Object> request = HttpRequest.create(
|
||||
HttpMethod.GET,
|
||||
"/v1/plugins/artifacts/core-compatibility/" + currentStableVersion
|
||||
);
|
||||
if (oss) {
|
||||
request.getParameters().add("license", "OPENSOURCE");
|
||||
}
|
||||
try {
|
||||
return httpClient
|
||||
.toBlocking()
|
||||
.exchange(request, Argument.listOf(ApiPluginArtifact.class))
|
||||
.body();
|
||||
} catch (Exception e) {
|
||||
log.debug("Failed to retrieve available plugins from Kestra API. Cause: ", e);
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
public record PluginManifest(
|
||||
String title,
|
||||
String icon,
|
||||
@@ -153,4 +226,11 @@ public class PluginCatalogService {
|
||||
return groupId + ":" + artifactId + ":LATEST";
|
||||
}
|
||||
}
|
||||
|
||||
public record ApiPluginArtifact(
|
||||
String groupId,
|
||||
String artifactId,
|
||||
String license,
|
||||
List<String> versions
|
||||
) {}
|
||||
}
|
||||
|
||||
@@ -144,7 +144,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
|
||||
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
|
||||
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
|
||||
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
|
||||
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::asText).orElse(null);
|
||||
|
||||
if (type == null || type.isEmpty()) {
|
||||
return null;
|
||||
|
||||
@@ -56,12 +56,10 @@ public final class ExecutableUtils {
|
||||
}
|
||||
|
||||
public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskrun, Execution execution) {
|
||||
List<TaskRunAttempt> attempts = parentTaskrun.getAttempts() == null ? new ArrayList<>() : new ArrayList<>(parentTaskrun.getAttempts());
|
||||
attempts.add(TaskRunAttempt.builder().state(parentTaskrun.getState()).build());
|
||||
return SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(parentTaskrun.getState().getCurrent())
|
||||
.parentTaskRun(parentTaskrun.withAttempts(attempts))
|
||||
.parentTaskRun(parentTaskrun.addAttempt(TaskRunAttempt.builder().state(parentTaskrun.getState()).build()))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -82,8 +82,7 @@ public abstract class FilesService {
|
||||
}
|
||||
|
||||
private static String resolveUniqueNameForFile(final Path path) {
|
||||
String filename = path.getFileName().toString();
|
||||
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
|
||||
return IdUtils.from(path.toString()) + "-" + encodedFilename;
|
||||
String filename = path.getFileName().toString().replace(' ', '+');
|
||||
return IdUtils.from(path.toString()) + "-" + filename;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.plugin.core.flow.Dag;
|
||||
|
||||
import java.util.*;
|
||||
@@ -152,6 +153,35 @@ public class FlowableUtils {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
public static Optional<State.Type> resolveSequentialState(
|
||||
Execution execution,
|
||||
List<ResolvedTask> tasks,
|
||||
List<ResolvedTask> errors,
|
||||
List<ResolvedTask> _finally,
|
||||
TaskRun parentTaskRun,
|
||||
RunContext runContext,
|
||||
boolean allowFailure,
|
||||
boolean allowWarning
|
||||
) {
|
||||
if (ListUtils.emptyOnNull(tasks).stream()
|
||||
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
|
||||
.findAny()
|
||||
.isEmpty()) {
|
||||
return Optional.of(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
return resolveState(
|
||||
execution,
|
||||
tasks,
|
||||
errors,
|
||||
_finally,
|
||||
parentTaskRun,
|
||||
runContext,
|
||||
allowFailure,
|
||||
allowWarning
|
||||
);
|
||||
}
|
||||
|
||||
public static Optional<State.Type> resolveState(
|
||||
Execution execution,
|
||||
List<ResolvedTask> tasks,
|
||||
@@ -207,7 +237,7 @@ public class FlowableUtils {
|
||||
}
|
||||
} else {
|
||||
// first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows
|
||||
if (execution.hasFailed(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
|
||||
if (execution.hasFailedNoRetry(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
|
||||
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,14 +3,19 @@ package io.kestra.core.runners;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKilled;
|
||||
import io.kestra.core.models.executions.ExecutionKilledExecution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
@@ -34,9 +39,16 @@ public class RunnerUtils {
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.KILL_NAMED)
|
||||
protected QueueInterface<ExecutionKilled> killQueue;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
@@ -146,6 +158,11 @@ public class RunnerUtils {
|
||||
}), duration);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Execution awaitExecution(Predicate<Execution> predicate, Duration duration) throws TimeoutException {
|
||||
return awaitExecution(predicate, () -> {}, duration);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Execution awaitExecution(Predicate<Execution> predicate, Runnable executionEmitter, Duration duration) throws TimeoutException {
|
||||
AtomicReference<Execution> receive = new AtomicReference<>();
|
||||
@@ -172,6 +189,62 @@ public class RunnerUtils {
|
||||
return receive.get();
|
||||
}
|
||||
|
||||
public List<Execution> awaitFlowExecutionNumber(int number, String tenantId, String namespace, String flowId) {
|
||||
return awaitFlowExecutionNumber(number, tenantId, namespace, flowId, null);
|
||||
}
|
||||
|
||||
public List<Execution> awaitFlowExecutionNumber(int number, String tenantId, String namespace, String flowId, Duration duration) {
|
||||
AtomicReference<List<Execution>> receive = new AtomicReference<>();
|
||||
Flow flow = flowRepository
|
||||
.findById(tenantId, namespace, flowId, Optional.empty())
|
||||
.orElseThrow(
|
||||
() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'"));
|
||||
try {
|
||||
if (duration == null){
|
||||
duration = Duration.ofSeconds(20);
|
||||
}
|
||||
Await.until(() -> {
|
||||
ArrayListTotal<Execution> byFlowId = executionRepository.findByFlowId(
|
||||
tenantId, namespace, flowId, Pageable.UNPAGED);
|
||||
if (byFlowId.size() == number
|
||||
&& byFlowId.stream()
|
||||
.filter(e -> executionService.isTerminated(flow, e))
|
||||
.toList().size() == number) {
|
||||
receive.set(byFlowId);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}, Duration.ofMillis(50), duration);
|
||||
|
||||
} catch (TimeoutException e) {
|
||||
ArrayListTotal<Execution> byFlowId = executionRepository.findByFlowId(
|
||||
tenantId, namespace, flowId, Pageable.UNPAGED);
|
||||
if (!byFlowId.isEmpty()) {
|
||||
throw new RuntimeException("%d Execution found for flow %s, but %d where awaited".formatted(byFlowId.size(), flowId, number));
|
||||
} else {
|
||||
throw new RuntimeException("No execution for flow %s exist in the database".formatted(flowId));
|
||||
}
|
||||
}
|
||||
|
||||
return receive.get();
|
||||
}
|
||||
|
||||
public Execution killExecution(Execution execution) throws QueueException, TimeoutException {
|
||||
killQueue.emit(ExecutionKilledExecution.builder()
|
||||
.executionId(execution.getId())
|
||||
.isOnKillCascade(true)
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.tenantId(execution.getTenantId())
|
||||
.build());
|
||||
|
||||
return awaitExecution(isTerminatedExecution(
|
||||
execution,
|
||||
flowRepository
|
||||
.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), Optional.ofNullable(execution.getFlowRevision()))
|
||||
.orElse(null)
|
||||
), throwRunnable(() -> this.executionQueue.emit(execution)), Duration.ofSeconds(60));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Execution awaitChildExecution(Flow flow, Execution parentExecution, Runnable executionEmitter, Duration duration) throws TimeoutException {
|
||||
return this.awaitExecution(isTerminatedChildExecution(parentExecution, flow), executionEmitter, duration);
|
||||
|
||||
@@ -151,10 +151,7 @@ abstract class AbstractFileFunction implements Function {
|
||||
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
|
||||
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);
|
||||
|
||||
if (!isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path)) {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the parent execution");
|
||||
}
|
||||
return true;
|
||||
return isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -383,6 +383,7 @@ public class ExecutionService {
|
||||
if (!isFlowable || s.equals(taskRunId)) {
|
||||
TaskRun newTaskRun;
|
||||
|
||||
State.Type targetState = newState;
|
||||
if (task instanceof Pause pauseTask) {
|
||||
State.Type terminalState = newState == State.Type.RUNNING ? State.Type.SUCCESS : newState;
|
||||
Pause.Resumed _resumed = resumed != null ? resumed : Pause.Resumed.now(terminalState);
|
||||
@@ -392,23 +393,23 @@ public class ExecutionService {
|
||||
// if it's a Pause task with no subtask, we terminate the task
|
||||
if (ListUtils.isEmpty(pauseTask.getTasks()) && ListUtils.isEmpty(pauseTask.getErrors()) && ListUtils.isEmpty(pauseTask.getFinally())) {
|
||||
if (newState == State.Type.RUNNING) {
|
||||
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
|
||||
targetState = State.Type.SUCCESS;
|
||||
} else if (newState == State.Type.KILLING) {
|
||||
newTaskRun = newTaskRun.withState(State.Type.KILLED);
|
||||
} else {
|
||||
newTaskRun = newTaskRun.withState(newState);
|
||||
targetState = State.Type.KILLED;
|
||||
}
|
||||
} else {
|
||||
// we should set the state to RUNNING so that subtasks are executed
|
||||
newTaskRun = newTaskRun.withState(State.Type.RUNNING);
|
||||
targetState = State.Type.RUNNING;
|
||||
}
|
||||
newTaskRun = newTaskRun.withState(targetState);
|
||||
} else {
|
||||
newTaskRun = originalTaskRun.withState(newState);
|
||||
newTaskRun = originalTaskRun.withState(targetState);
|
||||
}
|
||||
|
||||
|
||||
if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {
|
||||
ArrayList<TaskRunAttempt> attempts = new ArrayList<>(originalTaskRun.getAttempts());
|
||||
attempts.set(attempts.size() - 1, attempts.getLast().withState(newState));
|
||||
attempts.set(attempts.size() - 1, attempts.getLast().withState(targetState));
|
||||
newTaskRun = newTaskRun.withAttempts(attempts);
|
||||
}
|
||||
|
||||
|
||||
@@ -32,48 +32,84 @@ public class Version implements Comparable<Version> {
|
||||
* @param version the version.
|
||||
* @return a new {@link Version} instance.
|
||||
*/
|
||||
public static Version of(String version) {
|
||||
public static Version of(final Object version) {
|
||||
|
||||
if (version.startsWith("v")) {
|
||||
version = version.substring(1);
|
||||
if (Objects.isNull(version)) {
|
||||
throw new IllegalArgumentException("Invalid version, cannot parse null version");
|
||||
}
|
||||
|
||||
String strVersion = version.toString();
|
||||
|
||||
if (strVersion.startsWith("v")) {
|
||||
strVersion = strVersion.substring(1);
|
||||
}
|
||||
|
||||
int qualifier = version.indexOf("-");
|
||||
int qualifier = strVersion.indexOf("-");
|
||||
|
||||
final String[] versions = qualifier > 0 ?
|
||||
version.substring(0, qualifier).split("\\.") :
|
||||
version.split("\\.");
|
||||
strVersion.substring(0, qualifier).split("\\.") :
|
||||
strVersion.split("\\.");
|
||||
try {
|
||||
final int majorVersion = Integer.parseInt(versions[0]);
|
||||
final int minorVersion = versions.length > 1 ? Integer.parseInt(versions[1]) : 0;
|
||||
final int incrementalVersion = versions.length > 2 ? Integer.parseInt(versions[2]) : 0;
|
||||
final Integer minorVersion = versions.length > 1 ? Integer.parseInt(versions[1]) : null;
|
||||
final Integer incrementalVersion = versions.length > 2 ? Integer.parseInt(versions[2]) : null;
|
||||
|
||||
return new Version(
|
||||
majorVersion,
|
||||
minorVersion,
|
||||
incrementalVersion,
|
||||
qualifier > 0 ? version.substring(qualifier + 1) : null,
|
||||
version
|
||||
qualifier > 0 ? strVersion.substring(qualifier + 1) : null,
|
||||
strVersion
|
||||
);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Invalid version, cannot parse '" + version + "'");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Static helper method for returning the most recent stable version for a current {@link Version}.
|
||||
* Resolves the most appropriate stable version from a collection, based on a given input version.
|
||||
* <p>
|
||||
* The matching rules are:
|
||||
* <ul>
|
||||
* <li>If {@code from} specifies only a major version (e.g. {@code 1}), return the latest stable version
|
||||
* with the same major (e.g. {@code 1.2.3}).</li>
|
||||
* <li>If {@code from} specifies a major and minor version only (e.g. {@code 1.2}), return the latest
|
||||
* stable version with the same major and minor (e.g. {@code 1.2.3}).</li>
|
||||
* <li>If {@code from} specifies a full version with major, minor, and patch (e.g. {@code 1.2.2}),
|
||||
* then only return it if it is exactly present (and stable) in {@code versions}.
|
||||
* No "upgrade" is performed in this case.</li>
|
||||
* <li>If no suitable version is found, returns {@code null}.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param from the current version.
|
||||
* @param versions the list of version.
|
||||
*
|
||||
* @return the last stable version.
|
||||
* @param from the reference version (may specify only major, or major+minor, or major+minor+patch).
|
||||
* @param versions the collection of candidate versions to resolve against.
|
||||
* @return the best matching stable version, or {@code null} if none match.
|
||||
*/
|
||||
public static Version getStable(final Version from, final Collection<Version> versions) {
|
||||
List<Version> compatibleVersions = versions.stream()
|
||||
.filter(v -> v.majorVersion() == from.majorVersion() && v.minorVersion() == from.minorVersion())
|
||||
.toList();
|
||||
if (compatibleVersions.isEmpty()) return null;
|
||||
return Version.getLatest(compatibleVersions);
|
||||
// Case 1: "from" is only a major (e.g. 1)
|
||||
if (from.hasOnlyMajor()) {
|
||||
List<Version> sameMajor = versions.stream()
|
||||
.filter(v -> v.majorVersion() == from.majorVersion())
|
||||
.toList();
|
||||
return sameMajor.isEmpty() ? null : Version.getLatest(sameMajor);
|
||||
}
|
||||
|
||||
// Case 2: "from" is major+minor only (e.g. 1.2)
|
||||
if (from.hasMajorAndMinorOnly()) {
|
||||
List<Version> sameMinor = versions.stream()
|
||||
.filter(v -> v.majorVersion() == from.majorVersion()
|
||||
&& v.minorVersion() == from.minorVersion())
|
||||
.toList();
|
||||
return sameMinor.isEmpty() ? null : Version.getLatest(sameMinor);
|
||||
}
|
||||
|
||||
// Case 3: "from" is full version (major+minor+patch)
|
||||
if (versions.contains(from)) {
|
||||
return from;
|
||||
}
|
||||
|
||||
// No match
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -123,8 +159,8 @@ public class Version implements Comparable<Version> {
|
||||
}
|
||||
|
||||
private final int majorVersion;
|
||||
private final int minorVersion;
|
||||
private final int incrementalVersion;
|
||||
private final Integer minorVersion;
|
||||
private final Integer patchVersion;
|
||||
private final Qualifier qualifier;
|
||||
|
||||
private final String originalVersion;
|
||||
@@ -134,14 +170,14 @@ public class Version implements Comparable<Version> {
|
||||
*
|
||||
* @param majorVersion the major version (must be superior or equal to 0).
|
||||
* @param minorVersion the minor version (must be superior or equal to 0).
|
||||
* @param incrementalVersion the incremental version (must be superior or equal to 0).
|
||||
* @param patchVersion the incremental version (must be superior or equal to 0).
|
||||
* @param qualifier the qualifier.
|
||||
*/
|
||||
public Version(final int majorVersion,
|
||||
final int minorVersion,
|
||||
final int incrementalVersion,
|
||||
final int patchVersion,
|
||||
final String qualifier) {
|
||||
this(majorVersion, minorVersion, incrementalVersion, qualifier, null);
|
||||
this(majorVersion, minorVersion, patchVersion, qualifier, null);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -149,25 +185,25 @@ public class Version implements Comparable<Version> {
|
||||
*
|
||||
* @param majorVersion the major version (must be superior or equal to 0).
|
||||
* @param minorVersion the minor version (must be superior or equal to 0).
|
||||
* @param incrementalVersion the incremental version (must be superior or equal to 0).
|
||||
* @param patchVersion the incremental version (must be superior or equal to 0).
|
||||
* @param qualifier the qualifier.
|
||||
* @param originalVersion the original string version.
|
||||
*/
|
||||
private Version(final int majorVersion,
|
||||
final int minorVersion,
|
||||
final int incrementalVersion,
|
||||
private Version(final Integer majorVersion,
|
||||
final Integer minorVersion,
|
||||
final Integer patchVersion,
|
||||
final String qualifier,
|
||||
final String originalVersion) {
|
||||
this.majorVersion = requirePositive(majorVersion, "major");
|
||||
this.minorVersion = requirePositive(minorVersion, "minor");
|
||||
this.incrementalVersion = requirePositive(incrementalVersion, "incremental");
|
||||
this.patchVersion = requirePositive(patchVersion, "incremental");
|
||||
this.qualifier = qualifier != null ? new Qualifier(qualifier) : null;
|
||||
this.originalVersion = originalVersion;
|
||||
}
|
||||
|
||||
|
||||
private static int requirePositive(int version, final String message) {
|
||||
if (version < 0) {
|
||||
private static Integer requirePositive(Integer version, final String message) {
|
||||
if (version != null && version < 0) {
|
||||
throw new IllegalArgumentException(String.format("The '%s' version must super or equal to 0", message));
|
||||
}
|
||||
return version;
|
||||
@@ -178,11 +214,11 @@ public class Version implements Comparable<Version> {
|
||||
}
|
||||
|
||||
public int minorVersion() {
|
||||
return minorVersion;
|
||||
return minorVersion != null ? minorVersion : 0;
|
||||
}
|
||||
|
||||
public int incrementalVersion() {
|
||||
return incrementalVersion;
|
||||
public int patchVersion() {
|
||||
return patchVersion != null ? patchVersion : 0;
|
||||
}
|
||||
|
||||
public Qualifier qualifier() {
|
||||
@@ -197,9 +233,9 @@ public class Version implements Comparable<Version> {
|
||||
if (this == o) return true;
|
||||
if (!(o instanceof Version)) return false;
|
||||
Version version = (Version) o;
|
||||
return majorVersion == version.majorVersion &&
|
||||
minorVersion == version.minorVersion &&
|
||||
incrementalVersion == version.incrementalVersion &&
|
||||
return Objects.equals(majorVersion,version.majorVersion) &&
|
||||
Objects.equals(minorVersion, version.minorVersion) &&
|
||||
Objects.equals(patchVersion,version.patchVersion) &&
|
||||
Objects.equals(qualifier, version.qualifier);
|
||||
}
|
||||
|
||||
@@ -208,7 +244,7 @@ public class Version implements Comparable<Version> {
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(majorVersion, minorVersion, incrementalVersion, qualifier);
|
||||
return Objects.hash(majorVersion, minorVersion, patchVersion, qualifier);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -218,7 +254,7 @@ public class Version implements Comparable<Version> {
|
||||
public String toString() {
|
||||
if (originalVersion != null) return originalVersion;
|
||||
|
||||
String version = majorVersion + "." + minorVersion + "." + incrementalVersion;
|
||||
String version = majorVersion + "." + minorVersion + "." + patchVersion;
|
||||
return (qualifier != null) ? version +"-" + qualifier : version;
|
||||
}
|
||||
|
||||
@@ -238,7 +274,7 @@ public class Version implements Comparable<Version> {
|
||||
return compareMinor;
|
||||
}
|
||||
|
||||
int compareIncremental = Integer.compare(that.incrementalVersion, this.incrementalVersion);
|
||||
int compareIncremental = Integer.compare(that.patchVersion, this.patchVersion);
|
||||
if (compareIncremental != 0) {
|
||||
return compareIncremental;
|
||||
}
|
||||
@@ -253,6 +289,21 @@ public class Version implements Comparable<Version> {
|
||||
|
||||
return this.qualifier.compareTo(that.qualifier);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return true if only major is specified (e.g. "1")
|
||||
*/
|
||||
private boolean hasOnlyMajor() {
|
||||
return minorVersion == null && patchVersion == null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if major+minor are specified, but no patch (e.g. "1.2")
|
||||
*/
|
||||
private boolean hasMajorAndMinorOnly() {
|
||||
return minorVersion != null && patchVersion == null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether this version is before the given one.
|
||||
|
||||
@@ -33,11 +33,13 @@ public class ExecutionsDataFilterValidator implements ConstraintValidator<Execut
|
||||
}
|
||||
});
|
||||
|
||||
executionsDataFilter.getWhere().forEach(filter -> {
|
||||
if (filter.getField() == Executions.Fields.LABELS && filter.getLabelKey() == null) {
|
||||
violations.add("Label filters must have a `labelKey`.");
|
||||
}
|
||||
});
|
||||
if (executionsDataFilter.getWhere() != null) {
|
||||
executionsDataFilter.getWhere().forEach(filter -> {
|
||||
if (filter.getField() == Executions.Fields.LABELS && filter.getLabelKey() == null) {
|
||||
violations.add("Label filters must have a `labelKey`.");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (!violations.isEmpty()) {
|
||||
context.disableDefaultConstraintViolation();
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.NextTaskRun;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.property.Property;
|
||||
@@ -15,6 +16,7 @@ import io.kestra.core.models.tasks.*;
|
||||
import io.kestra.core.runners.FlowableUtils;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.validations.DagTaskValidation;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
@@ -176,6 +178,22 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
|
||||
|
||||
return FlowableUtils.resolveSequentialState(
|
||||
execution,
|
||||
childTasks,
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
|
||||
parentTaskRun,
|
||||
runContext,
|
||||
this.isAllowFailure(),
|
||||
this.isAllowWarning()
|
||||
);
|
||||
}
|
||||
|
||||
public List<String> dagCheckNotExistTask(List<DagTask> taskDepends) {
|
||||
List<String> dependenciesIds = taskDepends
|
||||
.stream()
|
||||
|
||||
@@ -163,15 +163,9 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
|
||||
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
|
||||
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
|
||||
.toList();
|
||||
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
|
||||
|
||||
if (childTasks.isEmpty()) {
|
||||
return Optional.of(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
return FlowableUtils.resolveState(
|
||||
return FlowableUtils.resolveSequentialState(
|
||||
execution,
|
||||
childTasks,
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
|
||||
@@ -127,14 +127,9 @@ public class EachSequential extends Sequential implements FlowableTask<VoidOutpu
|
||||
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
|
||||
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
|
||||
.toList();
|
||||
if (childTasks.isEmpty()) {
|
||||
return Optional.of(State.Type.SUCCESS);
|
||||
}
|
||||
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
|
||||
|
||||
return FlowableUtils.resolveState(
|
||||
return FlowableUtils.resolveSequentialState(
|
||||
execution,
|
||||
childTasks,
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
|
||||
@@ -245,15 +245,9 @@ public class ForEach extends Sequential implements FlowableTask<VoidOutput> {
|
||||
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
|
||||
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
|
||||
.toList();
|
||||
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
|
||||
|
||||
if (childTasks.isEmpty()) {
|
||||
return Optional.of(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
return FlowableUtils.resolveState(
|
||||
return FlowableUtils.resolveSequentialState(
|
||||
execution,
|
||||
childTasks,
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
|
||||
@@ -2,7 +2,9 @@ package io.kestra.plugin.core.flow;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
@@ -23,6 +25,7 @@ import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
@@ -176,4 +179,20 @@ public class Parallel extends Task implements FlowableTask<VoidOutput> {
|
||||
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
|
||||
|
||||
return FlowableUtils.resolveSequentialState(
|
||||
execution,
|
||||
childTasks,
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
|
||||
parentTaskRun,
|
||||
runContext,
|
||||
this.isAllowFailure(),
|
||||
this.isAllowWarning()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.NextTaskRun;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.hierarchies.AbstractGraph;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
@@ -23,6 +24,7 @@ import lombok.experimental.SuperBuilder;
|
||||
import jakarta.validation.Valid;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@SuperBuilder
|
||||
@@ -113,6 +115,22 @@ public class Sequential extends Task implements FlowableTask<VoidOutput> {
|
||||
return FlowableUtils.resolveTasks(this.getTasks(), parentTaskRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
List<ResolvedTask> childTasks = this.childTasks(runContext, parentTaskRun);
|
||||
|
||||
return FlowableUtils.resolveSequentialState(
|
||||
execution,
|
||||
childTasks,
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
|
||||
parentTaskRun,
|
||||
runContext,
|
||||
this.isAllowFailure(),
|
||||
this.isAllowWarning()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
return FlowableUtils.resolveSequentialNexts(
|
||||
|
||||
@@ -60,7 +60,15 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
public class Download extends AbstractHttp implements RunnableTask<Download.Output> {
|
||||
@Schema(title = "Should the task fail when downloading an empty file.")
|
||||
@Builder.Default
|
||||
private final Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
|
||||
private Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
|
||||
|
||||
@Schema(
|
||||
title = "Name of the file inside the output.",
|
||||
description = """
|
||||
If not provided, the filename will be extracted from the `Content-Disposition` header.
|
||||
If no `Content-Disposition` header, a name would be generated."""
|
||||
)
|
||||
private Property<String> saveAs;
|
||||
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
Logger logger = runContext.logger();
|
||||
@@ -111,20 +119,26 @@ public class Download extends AbstractHttp implements RunnableTask<Download.Outp
|
||||
}
|
||||
}
|
||||
|
||||
String filename = null;
|
||||
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
|
||||
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
|
||||
filename = filenameFromHeader(runContext, contentDisposition);
|
||||
}
|
||||
if (filename != null) {
|
||||
filename = URLEncoder.encode(filename, StandardCharsets.UTF_8);
|
||||
String rFilename = runContext.render(this.saveAs).as(String.class).orElse(null);
|
||||
if (rFilename == null) {
|
||||
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
|
||||
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
|
||||
rFilename = filenameFromHeader(runContext, contentDisposition);
|
||||
if (rFilename != null) {
|
||||
URLEncoder.encode(rFilename, StandardCharsets.UTF_8);
|
||||
rFilename = rFilename.replace(' ', '+');
|
||||
// brackets are IPv6 reserved characters
|
||||
rFilename = rFilename.replace("[", "%5B");
|
||||
rFilename = rFilename.replace("]", "%5D");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("File '{}' downloaded with size '{}'", from, size);
|
||||
|
||||
return Output.builder()
|
||||
.code(response.getStatus().getCode())
|
||||
.uri(runContext.storage().putFile(tempFile, filename))
|
||||
.uri(runContext.storage().putFile(tempFile, rFilename))
|
||||
.headers(response.getHeaders().map())
|
||||
.length(size.get())
|
||||
.build();
|
||||
|
||||
@@ -154,6 +154,15 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
).trigger(executionTrigger).build());
|
||||
}
|
||||
|
||||
// add a NORMAL kind execution, it should be fetched correctly
|
||||
executionRepository.save(builder(
|
||||
State.Type.SUCCESS,
|
||||
null
|
||||
)
|
||||
.trigger(executionTrigger)
|
||||
.kind(ExecutionKind.NORMAL)
|
||||
.build());
|
||||
|
||||
// add a test execution, this should be ignored in search & statistics
|
||||
executionRepository.save(builder(
|
||||
State.Type.SUCCESS,
|
||||
@@ -176,16 +185,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
static Stream<Arguments> filterCombinations() {
|
||||
return Stream.of(
|
||||
Arguments.of(QueryFilter.builder().field(Field.QUERY).value("unittest").operation(Op.EQUALS).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.unittest").operation(Op.EQUALS).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.QUERY).value("unittest").operation(Op.EQUALS).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.unittest").operation(Op.EQUALS).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build(), 1),
|
||||
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(FLOW).operation(Op.EQUALS).build(), 15),
|
||||
Arguments.of(QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(FLOW).operation(Op.EQUALS).build(), 16),
|
||||
Arguments.of(QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.STATE).value(Type.RUNNING).operation(Op.EQUALS).build(), 5),
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(), 28)
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(), 29)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -211,7 +220,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
inject();
|
||||
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
|
||||
List<QueryFilter> filters = List.of(QueryFilter.builder()
|
||||
@@ -275,7 +284,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.value("io.kestra")
|
||||
.build());
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -291,7 +300,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.value(executionTriggerId)
|
||||
.build());
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
|
||||
filters = List.of(QueryFilter.builder()
|
||||
@@ -301,7 +310,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.build());
|
||||
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
|
||||
|
||||
@@ -312,12 +321,12 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.build());
|
||||
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters );
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
assertThat(executions.getFirst().getTrigger()).isNull();
|
||||
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, null);
|
||||
assertThat(executions.getTotal()).isEqualTo(56L);
|
||||
assertThat(executions.getTotal()).isEqualTo(58L);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -325,7 +334,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
inject();
|
||||
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), MAIN_TENANT, null);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
|
||||
var filters = List.of(QueryFilter.builder()
|
||||
@@ -342,7 +351,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
inject();
|
||||
|
||||
ArrayListTotal<TaskRun> taskRuns = executionRepository.findTaskRun(Pageable.from(1, 10), MAIN_TENANT, null);
|
||||
assertThat(taskRuns.getTotal()).isEqualTo(74L);
|
||||
assertThat(taskRuns.getTotal()).isEqualTo(77L);
|
||||
assertThat(taskRuns.size()).isEqualTo(10);
|
||||
|
||||
var filters = List.of(QueryFilter.builder()
|
||||
@@ -732,7 +741,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
inject();
|
||||
|
||||
List<Execution> executions = executionRepository.findAllAsync(MAIN_TENANT).collectList().block();
|
||||
assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions
|
||||
assertThat(executions).hasSize(30); // used by the backup so it contains TEST executions
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -249,31 +249,33 @@ public abstract class AbstractLogRepositoryTest {
|
||||
for (int i = 0; i < 20; i++) {
|
||||
logRepository.save(builder.build());
|
||||
}
|
||||
// normal kind should also be retrieved
|
||||
logRepository.save(builder.executionKind(ExecutionKind.NORMAL).build());
|
||||
|
||||
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(1, 50));
|
||||
|
||||
assertThat(find.size()).isEqualTo(50);
|
||||
assertThat(find.getTotal()).isEqualTo(101L);
|
||||
assertThat(find.getTotal()).isEqualTo(102L);
|
||||
|
||||
find = logRepository.findByExecutionId(MAIN_TENANT, executionId, null, Pageable.from(3, 50));
|
||||
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getTotal()).isEqualTo(101L);
|
||||
assertThat(find.size()).isEqualTo(2);
|
||||
assertThat(find.getTotal()).isEqualTo(102L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskId(MAIN_TENANT, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50));
|
||||
|
||||
assertThat(find.size()).isEqualTo(21);
|
||||
assertThat(find.getTotal()).isEqualTo(21L);
|
||||
assertThat(find.size()).isEqualTo(22);
|
||||
assertThat(find.getTotal()).isEqualTo(22L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10));
|
||||
|
||||
assertThat(find.size()).isEqualTo(10);
|
||||
assertThat(find.getTotal()).isEqualTo(21L);
|
||||
assertThat(find.getTotal()).isEqualTo(22L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10));
|
||||
|
||||
assertThat(find.size()).isEqualTo(10);
|
||||
assertThat(find.getTotal()).isEqualTo(21L);
|
||||
assertThat(find.getTotal()).isEqualTo(22L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunId(MAIN_TENANT, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
|
||||
|
||||
|
||||
@@ -31,20 +31,22 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
TaskRun taskRun1 = taskRun(executionId, "task");
|
||||
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
|
||||
MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST);
|
||||
MetricEntry normalCounter = MetricEntry.of(taskRun1, counter("normal"), ExecutionKind.NORMAL);
|
||||
TaskRun taskRun2 = taskRun(executionId, "task");
|
||||
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
|
||||
metricRepository.save(counter);
|
||||
metricRepository.save(testCounter); // should only be retrieved by execution id
|
||||
metricRepository.save(normalCounter);
|
||||
metricRepository.save(timer);
|
||||
|
||||
List<MetricEntry> results = metricRepository.findByExecutionId(null, executionId, Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
assertThat(results.size()).isEqualTo(4);
|
||||
|
||||
results = metricRepository.findByExecutionIdAndTaskId(null, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
assertThat(results.size()).isEqualTo(4);
|
||||
|
||||
results = metricRepository.findByExecutionIdAndTaskRunId(null, executionId, taskRun1.getId(), Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(2);
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
|
||||
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
|
||||
null,
|
||||
|
||||
@@ -33,6 +33,7 @@ import org.junitpioneer.jupiter.RetryingTest;
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
// must be per-class to allow calling once init() which took a lot of time
|
||||
public abstract class AbstractRunnerTest {
|
||||
public static final String TENANT_1 = "tenant1";
|
||||
|
||||
@Inject
|
||||
protected RunnerUtils runnerUtils;
|
||||
@@ -436,9 +437,9 @@ public abstract class AbstractRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
|
||||
protected void flowConcurrencyWithForEachItem() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -597,4 +598,4 @@ public abstract class AbstractRunnerTest {
|
||||
public void shouldCallTasksAfterListener(Execution execution) {
|
||||
afterExecutionTestCase.shouldCallTasksAfterListener(execution);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,7 +330,7 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("1_each", List.of()).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getAttempts()).isNull();
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
restart = executionService.markAs(execution, flow, execution.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getId(), State.Type.FAILED);
|
||||
|
||||
@@ -441,6 +441,7 @@ class ExecutionServiceTest {
|
||||
|
||||
assertThat(killed.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
|
||||
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(killed.getState().getHistories()).hasSize(5);
|
||||
}
|
||||
|
||||
|
||||
@@ -103,28 +103,28 @@ class FilesServiceTest {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
Path fileWithSpace = tempDir.resolve("with space.txt");
|
||||
Path fileWithUnicode = tempDir.resolve("สวัสดี.txt");
|
||||
Path fileWithUnicode = tempDir.resolve("สวัสดี&.txt");
|
||||
|
||||
Files.writeString(fileWithSpace, "content");
|
||||
Files.writeString(fileWithUnicode, "content");
|
||||
|
||||
Path targetFileWithSpace = runContext.workingDir().path().resolve("with space.txt");
|
||||
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี.txt");
|
||||
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี&.txt");
|
||||
|
||||
Files.copy(fileWithSpace, targetFileWithSpace);
|
||||
Files.copy(fileWithUnicode, targetFileWithUnicode);
|
||||
|
||||
Map<String, URI> outputFiles = FilesService.outputFiles(
|
||||
runContext,
|
||||
List.of("with space.txt", "สวัสดี.txt")
|
||||
List.of("with space.txt", "สวัสดี&.txt")
|
||||
);
|
||||
|
||||
assertThat(outputFiles).hasSize(2);
|
||||
assertThat(outputFiles).containsKey("with space.txt");
|
||||
assertThat(outputFiles).containsKey("สวัสดี.txt");
|
||||
assertThat(outputFiles).containsKey("สวัสดี&.txt");
|
||||
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("with space.txt"))).isNotNull();
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี.txt"))).isNotNull();
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี&.txt"))).isNotNull();
|
||||
}
|
||||
|
||||
private URI createFile() throws IOException {
|
||||
|
||||
@@ -2,14 +2,12 @@ package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKilled;
|
||||
import io.kestra.core.models.executions.ExecutionKilledExecution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.State.Type;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.reporter.model.Count;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -31,16 +29,18 @@ import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Singleton
|
||||
public class FlowConcurrencyCaseTest {
|
||||
public static final String NAMESPACE = "io.kestra.tests";
|
||||
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
@@ -64,50 +64,34 @@ public class FlowConcurrencyCaseTest {
|
||||
@Named(QueueFactoryInterface.KILL_NAMED)
|
||||
protected QueueInterface<ExecutionKilled> killQueue;
|
||||
|
||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
|
||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
try {
|
||||
List<Execution> shouldFailExecutions = List.of(
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel"),
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel")
|
||||
);
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
|
||||
});
|
||||
|
||||
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(Type.CANCELLED::equals);
|
||||
} finally {
|
||||
runnerUtils.killExecution(execution1);
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyFail() throws TimeoutException, QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-fail");
|
||||
public void flowConcurrencyFail() throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
|
||||
try {
|
||||
List<Execution> shouldFailExecutions = List.of(
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail"),
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail")
|
||||
);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
|
||||
});
|
||||
|
||||
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(State.Type.FAILED::equals);
|
||||
} finally {
|
||||
runnerUtils.killExecution(execution1);
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueue() throws TimeoutException, QueueException, InterruptedException {
|
||||
@@ -265,28 +249,25 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(secondExecutionResult.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.CANCELLED);
|
||||
}
|
||||
|
||||
public void flowConcurrencyWithForEachItem() throws TimeoutException, QueueException, InterruptedException, URISyntaxException, IOException {
|
||||
URI file = storageUpload();
|
||||
public void flowConcurrencyWithForEachItem(String tenantId) throws QueueException, URISyntaxException, IOException, TimeoutException {
|
||||
URI file = storageUpload(tenantId);
|
||||
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
|
||||
Execution forEachItem = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-for-each-item", null,
|
||||
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
|
||||
Execution forEachItem = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-for-each-item", null,
|
||||
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
|
||||
assertThat(forEachItem.getState().getCurrent()).isEqualTo(Type.RUNNING);
|
||||
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if ("flow-concurrency-queue".equals(e.getLeft().getFlowId()) && e.getLeft().getState().isRunning()) {
|
||||
executionIds.add(e.getLeft().getId());
|
||||
}
|
||||
});
|
||||
|
||||
// wait a little to be sure there are not too many executions started
|
||||
Thread.sleep(500);
|
||||
|
||||
assertThat(executionIds).hasSize(1);
|
||||
receive.blockLast();
|
||||
|
||||
Execution terminated = runnerUtils.awaitExecution(e -> e.getId().equals(forEachItem.getId()) && e.getState().isTerminated(), () -> {}, Duration.ofSeconds(10));
|
||||
Execution terminated = runnerUtils.awaitExecution(e -> e.getState().isTerminated(), Duration.ofSeconds(60));
|
||||
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
|
||||
List<Execution> executions = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-queue");
|
||||
|
||||
assertThat(executions).extracting(e -> e.getState().getCurrent()).containsOnly(Type.SUCCESS);
|
||||
assertThat(executions.stream()
|
||||
.map(e -> e.getState().getHistories())
|
||||
.flatMap(List::stream)
|
||||
.map(State.History::getState)
|
||||
.toList()).contains(Type.QUEUED);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueRestarted() throws Exception {
|
||||
@@ -445,12 +426,16 @@ public class FlowConcurrencyCaseTest {
|
||||
}
|
||||
|
||||
private URI storageUpload() throws URISyntaxException, IOException {
|
||||
return storageUpload(MAIN_TENANT);
|
||||
}
|
||||
|
||||
private URI storageUpload(String tenantId) throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
Files.write(tempFile.toPath(), content());
|
||||
|
||||
return storageInterface.put(
|
||||
MAIN_TENANT,
|
||||
tenantId,
|
||||
null,
|
||||
new URI("/file/storage/file.txt"),
|
||||
new FileInputStream(tempFile)
|
||||
|
||||
@@ -83,24 +83,37 @@ class RunContextPropertyTest {
|
||||
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
|
||||
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void asShouldReturnCachedRenderedProperty() throws IllegalVariableEvaluationException {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
|
||||
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||
|
||||
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value1"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void asShouldNotReturnCachedRenderedPropertyWithSkipCache() throws IllegalVariableEvaluationException {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
|
||||
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||
|
||||
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||
assertThat(runContextProperty.skipCache().as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
|
||||
var skippedCache = runContextProperty.skipCache();
|
||||
assertThat(skippedCache.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
|
||||
// assure skipCache is preserved across calls
|
||||
assertThat(skippedCache.as(String.class, Map.of("variable", "value3"))).isEqualTo(Optional.of("value3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void asShouldNotReturnCachedRenderedPropertyWithOfExpression() throws IllegalVariableEvaluationException {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
var runContextProperty = new RunContextProperty<String>(Property.ofExpression("{{ variable }}"), runContext);
|
||||
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,9 @@ class TaskWithRunIfTest {
|
||||
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||
assertThat(execution.findTaskRunsByTaskId("executed").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.findTaskRunsByTaskId("notexecuted").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("notexecuted").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("notexecutedflowable").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("notexecutedflowable").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("willfailedtheflow").getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
}
|
||||
|
||||
@@ -31,6 +33,7 @@ class TaskWithRunIfTest {
|
||||
assertThat(execution.getTaskRunList()).hasSize(3);
|
||||
assertThat(execution.findTaskRunsByTaskId("log_orders").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -39,7 +42,9 @@ class TaskWithRunIfTest {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||
assertThat(execution.findTaskRunsByTaskId("skipSetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("skipSetVariables").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("skipUnsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("skipUnsetVariables").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("unsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.findTaskRunsByTaskId("setVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getVariables()).containsEntry("list", List.of(42));
|
||||
|
||||
@@ -109,33 +109,6 @@ public class FileSizeFunctionTest {
|
||||
assertThat(size).isEqualTo(FILE_SIZE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldThrowIllegalArgumentException_givenTrigger_andParentExecution_andMissingNamespace() throws IOException {
|
||||
String executionId = IdUtils.create();
|
||||
URI internalStorageURI = getInternalStorageURI(executionId);
|
||||
URI internalStorageFile = getInternalStorageFile(internalStorageURI);
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "subflow",
|
||||
"namespace", NAMESPACE,
|
||||
"tenantId", MAIN_TENANT),
|
||||
"execution", Map.of("id", IdUtils.create()),
|
||||
"trigger", Map.of(
|
||||
"flowId", FLOW,
|
||||
"executionId", executionId,
|
||||
"tenantId", MAIN_TENANT
|
||||
)
|
||||
);
|
||||
|
||||
Exception ex = assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> variableRenderer.render("{{ fileSize('" + internalStorageFile + "') }}", variables)
|
||||
);
|
||||
|
||||
assertTrue(ex.getMessage().startsWith("Unable to read the file"), "Exception message doesn't match expected one");
|
||||
}
|
||||
|
||||
@Test
|
||||
void returnsCorrectSize_givenUri_andCurrentExecution() throws IOException, IllegalVariableEvaluationException {
|
||||
String executionId = IdUtils.create();
|
||||
|
||||
@@ -256,6 +256,27 @@ class ReadFileFunctionTest {
|
||||
assertThat(variableRenderer.render("{{ read(nsfile) }}", variables)).isEqualTo("Hello World");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReadChildFileEvenIfTrigger() throws IOException, IllegalVariableEvaluationException {
|
||||
String namespace = "my.namespace";
|
||||
String flowId = "flow";
|
||||
String executionId = IdUtils.create();
|
||||
URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
|
||||
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "flow",
|
||||
"namespace", "notme",
|
||||
"tenantId", MAIN_TENANT),
|
||||
"execution", Map.of("id", "notme"),
|
||||
"trigger", Map.of("namespace", "notme", "flowId", "parent", "executionId", "parent")
|
||||
);
|
||||
|
||||
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
|
||||
assertThat(render).isEqualTo("Hello from a task output");
|
||||
}
|
||||
|
||||
private URI createFile() throws IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
Files.write(tempFile.toPath(), "Hello World".getBytes());
|
||||
|
||||
@@ -5,8 +5,17 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
class VersionTest {
|
||||
import static org.assertj.core.api.Assertions.assertThatObject;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
class VersionTest {
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromIntegerGivenMajorVersion() {
|
||||
Version version = Version.of(1);
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromStringGivenMajorVersion() {
|
||||
Version version = Version.of("1");
|
||||
@@ -21,27 +30,27 @@ class VersionTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromStringGivenMajorMinorIncrementVersion() {
|
||||
void shouldCreateVersionFromStringGivenMajorMinorPatchVersion() {
|
||||
Version version = Version.of("1.2.3");
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
Assertions.assertEquals(2, version.minorVersion());
|
||||
Assertions.assertEquals(3, version.incrementalVersion());
|
||||
Assertions.assertEquals(3, version.patchVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromPrefixedStringGivenMajorMinorIncrementVersion() {
|
||||
void shouldCreateVersionFromPrefixedStringGivenMajorMinorPatchVersion() {
|
||||
Version version = Version.of("v1.2.3");
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
Assertions.assertEquals(2, version.minorVersion());
|
||||
Assertions.assertEquals(3, version.incrementalVersion());
|
||||
Assertions.assertEquals(3, version.patchVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromStringGivenMajorMinorIncrementAndQualifierVersion() {
|
||||
void shouldCreateVersionFromStringGivenMajorMinorPatchAndQualifierVersion() {
|
||||
Version version = Version.of("1.2.3-SNAPSHOT");
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
Assertions.assertEquals(2, version.minorVersion());
|
||||
Assertions.assertEquals(3, version.incrementalVersion());
|
||||
Assertions.assertEquals(3, version.patchVersion());
|
||||
Assertions.assertEquals("SNAPSHOT", version.qualifier().toString());
|
||||
}
|
||||
|
||||
@@ -50,7 +59,7 @@ class VersionTest {
|
||||
Version version = Version.of("1.2.3-RC0-SNAPSHOT");
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
Assertions.assertEquals(2, version.minorVersion());
|
||||
Assertions.assertEquals(3, version.incrementalVersion());
|
||||
Assertions.assertEquals(3, version.patchVersion());
|
||||
Assertions.assertEquals("RC0-SNAPSHOT", version.qualifier().toString());
|
||||
}
|
||||
|
||||
@@ -76,13 +85,13 @@ class VersionTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetLatestVersionGivenMajorMinorIncrementalVersions() {
|
||||
void shouldGetLatestVersionGivenMajorMinorPatchVersions() {
|
||||
Version result = Version.getLatest(Version.of("1.0.9"), Version.of("1.0.10"), Version.of("1.0.11"));
|
||||
Assertions.assertEquals(Version.of("1.0.11"), result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetOldestVersionGivenMajorMinorIncrementalVersions() {
|
||||
public void shouldGetOldestVersionGivenMajorMinorPatchVersions() {
|
||||
Version result = Version.getOldest(Version.of("1.0.9"), Version.of("1.0.10"), Version.of("1.0.11"));
|
||||
Assertions.assertEquals(Version.of("1.0.9"), result);
|
||||
}
|
||||
@@ -135,14 +144,50 @@ class VersionTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetStableVersionGivenMajorMinorVersions() {
|
||||
Version result = Version.getStable(Version.of("1.2.0"), List.of(Version.of("1.2.1"), Version.of("1.2.2"), Version.of("0.99.0")));
|
||||
Assertions.assertEquals(Version.of("1.2.2"), result);
|
||||
public void shouldGetStableVersionGivenMajorMinorPatchVersion() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("1.2.1"), versions)).isEqualTo(Version.of("1.2.1"));
|
||||
assertThatObject(Version.getStable(Version.of("1.2.0"), versions)).isNull();
|
||||
assertThatObject(Version.getStable(Version.of("1.2.4"), versions)).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetStableGivenMajorAndMinorVersionOnly() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("1.2"), versions)).isEqualTo(Version.of("1.2.3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetStableGivenMajorVersionOnly() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("1"), versions)).isEqualTo(Version.of("1.2.3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetNullForStableVersionGivenNoCompatibleVersions() {
|
||||
Version result = Version.getStable(Version.of("1.2.0"), List.of(Version.of("1.3.0"), Version.of("2.0.0"), Version.of("0.99.0")));
|
||||
Assertions.assertNull(result);
|
||||
public void shouldGetNullForStableGivenMajorAndMinorVersionOnly() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("2.0"), versions)).isNull();
|
||||
assertThatObject(Version.getStable(Version.of("0.1"), versions)).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetNullForStableGivenMajorVersionOnly() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("2"), versions)).isNull();
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRunAttempt;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
@@ -11,6 +12,7 @@ import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@@ -28,11 +30,15 @@ class IfTest {
|
||||
void ifTruthy() throws TimeoutException, QueueException {
|
||||
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "if-condition", null,
|
||||
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120));
|
||||
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("if").getFirst().getAttempts();
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(2);
|
||||
assertThat(execution.findTaskRunsByTaskId("when-true").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
assertThat(flowableAttempts).isNotNull();
|
||||
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "if-condition", null,
|
||||
(f, e) -> Map.of("param", "true") , Duration.ofSeconds(120));
|
||||
|
||||
|
||||
@@ -5,22 +5,32 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRunAttempt;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import java.util.List;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
class SequentialTest {
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/sequential.yaml")
|
||||
void sequential(Execution execution) {
|
||||
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("1-seq").getFirst().getAttempts();
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(11);
|
||||
assertThat(flowableAttempts).isNotNull();
|
||||
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/sequential-with-global-errors.yaml")
|
||||
void sequentialWithGlobalErrors(Execution execution) {
|
||||
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("parent-seq").getFirst().getAttempts();
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(6);
|
||||
assertThat(flowableAttempts).isNotNull();
|
||||
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
}
|
||||
|
||||
@@ -30,4 +40,11 @@ class SequentialTest {
|
||||
assertThat(execution.getTaskRunList()).hasSize(6);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/sequential-with-disabled.yaml")
|
||||
void sequentialWithDisabled(Execution execution) {
|
||||
assertThat(execution.getTaskRunList()).hasSize(2);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,8 @@ import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
@@ -87,4 +89,28 @@ class SubflowRunnerTest {
|
||||
assertThat(childExecution.get().getTaskRunList()).hasSize(1);
|
||||
closing.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/subflow-parent-retry.yaml", "flows/valids/subflow-to-retry.yaml"})
|
||||
void subflowOutputWithWait() throws QueueException, TimeoutException, InterruptedException {
|
||||
List<Execution> childExecution = new ArrayList<>();
|
||||
CountDownLatch countDownLatch = new CountDownLatch(4);
|
||||
Runnable closing = executionQueue.receive(either -> {
|
||||
if (either.isLeft() && either.getLeft().getFlowId().equals("subflow-to-retry") && either.getLeft().getState().isTerminated()) {
|
||||
childExecution.add(either.getLeft());
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-retry");
|
||||
assertThat(parentExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(parentExecution.getTaskRunList()).hasSize(5);
|
||||
|
||||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
// we should have 4 executions, two in SUCCESS and two in FAILED
|
||||
assertThat(childExecution).hasSize(4);
|
||||
assertThat(childExecution.stream().filter(e -> e.getState().getCurrent() == State.Type.SUCCESS).count()).isEqualTo(2);
|
||||
assertThat(childExecution.stream().filter(e -> e.getState().getCurrent() == State.Type.FAILED).count()).isEqualTo(2);
|
||||
closing.run();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,6 +156,26 @@ class DownloadTest {
|
||||
assertThat(output.getUri().toString()).endsWith("filename.jpg");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void fileNameShouldOverrideContentDisposition() throws Exception {
|
||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
Download task = Download.builder()
|
||||
.id(DownloadTest.class.getSimpleName())
|
||||
.type(DownloadTest.class.getName())
|
||||
.uri(Property.ofValue(embeddedServer.getURI() + "/content-disposition"))
|
||||
.saveAs(Property.ofValue("hardcoded-filename.jpg"))
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
|
||||
|
||||
Download.Output output = task.run(runContext);
|
||||
|
||||
assertThat(output.getUri().toString()).endsWith("hardcoded-filename.jpg");
|
||||
}
|
||||
|
||||
@Test
|
||||
void contentDispositionWithPath() throws Exception {
|
||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||
@@ -232,10 +252,27 @@ class DownloadTest {
|
||||
|
||||
Download.Output output = task.run(runContext);
|
||||
|
||||
assertThat(output.getUri().toString()).doesNotContain("/secure-path/");
|
||||
assertThat(output.getUri().toString()).endsWith("file.with+spaces.txt");
|
||||
}
|
||||
|
||||
@Test
|
||||
void contentDispositionWithBrackets() throws Exception {
|
||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
Download task = Download.builder()
|
||||
.id(DownloadTest.class.getSimpleName())
|
||||
.type(DownloadTest.class.getName())
|
||||
.uri(Property.ofValue(embeddedServer.getURI() + "/content-disposition-with-brackets"))
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
|
||||
|
||||
Download.Output output = task.run(runContext);
|
||||
|
||||
assertThat(output.getUri().toString()).endsWith("file.with%5B%5Dbrackets.txt");
|
||||
}
|
||||
|
||||
@Controller()
|
||||
public static class SlackWebController {
|
||||
@Get("500")
|
||||
@@ -282,5 +319,10 @@ class DownloadTest {
|
||||
return HttpResponse.ok("Hello World".getBytes())
|
||||
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"file.with spaces.txt\"");
|
||||
}
|
||||
@Get("content-disposition-with-brackets")
|
||||
public HttpResponse<byte[]> contentDispositionWithBrackets() {
|
||||
return HttpResponse.ok("Hello World".getBytes())
|
||||
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"file.with[]brackets.txt\"");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,4 +8,4 @@ concurrency:
|
||||
tasks:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT2S
|
||||
duration: PT10S
|
||||
|
||||
@@ -8,4 +8,4 @@ concurrency:
|
||||
tasks:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT2S
|
||||
duration: PT10S
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
id: sequential-with-disabled
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: Sequential
|
||||
type: io.kestra.plugin.core.flow.Sequential
|
||||
tasks:
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
disabled: true
|
||||
- id: log
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World!
|
||||
@@ -0,0 +1,33 @@
|
||||
id: subflow-parent-retry
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: parallel
|
||||
type: io.kestra.plugin.core.flow.Parallel
|
||||
tasks:
|
||||
- id: seq1
|
||||
type: io.kestra.plugin.core.flow.Sequential
|
||||
tasks:
|
||||
- id: subflow1
|
||||
type: io.kestra.plugin.core.flow.Subflow
|
||||
flowId: subflow-to-retry
|
||||
namespace: io.kestra.tests
|
||||
inputs:
|
||||
counter: "{{ taskrun.attemptsCount }}"
|
||||
retry:
|
||||
type: constant
|
||||
maxAttempts: 3
|
||||
interval: PT1S
|
||||
- id: seq2
|
||||
type: io.kestra.plugin.core.flow.Sequential
|
||||
tasks:
|
||||
- id: subflow2
|
||||
type: io.kestra.plugin.core.flow.Subflow
|
||||
flowId: subflow-to-retry
|
||||
namespace: io.kestra.tests
|
||||
inputs:
|
||||
counter: "{{ taskrun.attemptsCount }}"
|
||||
retry:
|
||||
type: constant
|
||||
maxAttempts: 3
|
||||
interval: PT1S
|
||||
14
core/src/test/resources/flows/valids/subflow-to-retry.yaml
Normal file
14
core/src/test/resources/flows/valids/subflow-to-retry.yaml
Normal file
@@ -0,0 +1,14 @@
|
||||
id: subflow-to-retry
|
||||
namespace: io.kestra.tests
|
||||
|
||||
inputs:
|
||||
- id: counter
|
||||
type: INT
|
||||
|
||||
tasks:
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
runIf: "{{inputs.counter < 1}}"
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
@@ -6,9 +6,11 @@ services:
|
||||
MYSQL_USER: kestra
|
||||
MYSQL_PASSWORD: k3str4
|
||||
MYSQL_ROOT_PASSWORD: "p4ssw0rd"
|
||||
command:
|
||||
- --log-bin-trust-function-creators=1
|
||||
- --sort-buffer-size=10485760
|
||||
entrypoint: |
|
||||
sh -c "
|
||||
echo \"CREATE DATABASE IF NOT EXISTS kestra_unit_webserver;GRANT ALL PRIVILEGES ON kestra_unit_webserver.* TO 'kestra'@'%' WITH GRANT OPTION;\" > /docker-entrypoint-initdb.d/init.sql;
|
||||
/usr/local/bin/docker-entrypoint.sh --log-bin-trust-function-creators=1 --innodb_ft_min_token_size=1 --ft_min_word_len=1 --sort-buffer-size=10485760
|
||||
"
|
||||
ports:
|
||||
- 3306:3306
|
||||
restart: on-failure
|
||||
@@ -34,4 +36,4 @@ services:
|
||||
# - "4318:4318" # OTLP HTTP receiver
|
||||
# - "14250:14250" # Receive from external otel-collector, optional
|
||||
# environment:
|
||||
# - COLLECTOR_OTLP_ENABLED=true
|
||||
# - COLLECTOR_OTLP_ENABLED=true
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
configurations {
|
||||
implementation.extendsFrom(micronaut)
|
||||
}
|
||||
|
||||
dependencies {
|
||||
testImplementation project(':tests')
|
||||
testImplementation("com.microsoft.playwright:playwright")
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* ./gradlew playwright
|
||||
**********************************************************************************************************************/
|
||||
tasks.register('playwright', JavaExec) {
|
||||
classpath sourceSets.test.runtimeClasspath
|
||||
mainClass = 'com.microsoft.playwright.CLI'
|
||||
}
|
||||
/**********************************************************************************************************************\
|
||||
* Test
|
||||
**********************************************************************************************************************/
|
||||
test {
|
||||
useJUnitPlatform {
|
||||
if (project.hasProperty('tags')) {
|
||||
includeTags project.getProperty('tags').split(',')
|
||||
}
|
||||
// Setting the number of parallel forks
|
||||
maxParallelForks = Runtime.runtime.availableProcessors()
|
||||
}
|
||||
}
|
||||
@@ -127,7 +127,7 @@ public class ExecutorService {
|
||||
case CANCEL ->
|
||||
executionRunning
|
||||
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.CANCELLED);
|
||||
case FAIL -> {
|
||||
var failedExecution = executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded"));
|
||||
try {
|
||||
@@ -137,7 +137,7 @@ public class ExecutorService {
|
||||
}
|
||||
yield executionRunning
|
||||
.withExecution(failedExecution.getExecution())
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.FAILED);
|
||||
}
|
||||
|
||||
};
|
||||
@@ -247,7 +247,7 @@ public class ExecutorService {
|
||||
// first find the normal ended child tasks and send result
|
||||
Optional<State.Type> state;
|
||||
try {
|
||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||
} catch (Exception e) {
|
||||
// This will lead to the next task being still executed, but at least Kestra will not crash.
|
||||
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
|
||||
@@ -268,9 +268,17 @@ public class ExecutorService {
|
||||
Output outputs = flowableParent.outputs(runContext);
|
||||
Map<String, Object> outputMap = MapUtils.merge(workerTaskResult.getTaskRun().getOutputs(), outputs == null ? null : outputs.toMap());
|
||||
Variables variables = variablesService.of(StorageContext.forTask(workerTaskResult.getTaskRun()), outputMap);
|
||||
// flowable attempt state transition to terminated
|
||||
List<TaskRunAttempt> attempts = Optional.ofNullable(parentTaskRun.getAttempts())
|
||||
.map(ArrayList::new)
|
||||
.orElseGet(ArrayList::new);
|
||||
State.Type endedState = endedTask.get().getTaskRun().getState().getCurrent();
|
||||
TaskRunAttempt updated = attempts.getLast().withState(endedState);
|
||||
attempts.set( attempts.size() - 1, updated);
|
||||
return Optional.of(new WorkerTaskResult(workerTaskResult
|
||||
.getTaskRun()
|
||||
.withOutputs(variables)
|
||||
.withAttempts(attempts)
|
||||
));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().error("Unable to resolve outputs from the Flowable task: {}", e.getMessage(), e);
|
||||
@@ -320,7 +328,6 @@ public class ExecutorService {
|
||||
|
||||
private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun) throws InternalException {
|
||||
Task parent = executor.getFlow().findTaskByTaskId(parentTaskRun.getTaskId());
|
||||
|
||||
if (parent instanceof FlowableTask<?> flowableParent) {
|
||||
// Count the number of flowable tasks executions, some flowable are being called multiple times,
|
||||
// so this is not exactly the number of flowable taskruns but the number of times they are executed.
|
||||
@@ -375,6 +382,7 @@ public class ExecutorService {
|
||||
Output outputs = flowableTask.outputs(runContext);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
|
||||
taskRun = taskRun.withOutputs(variables);
|
||||
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Unable to save output on taskRun '{}'", taskRun, e);
|
||||
}
|
||||
@@ -995,7 +1003,7 @@ public class ExecutorService {
|
||||
executor.withExecution(
|
||||
executor
|
||||
.getExecution()
|
||||
.withTaskRun(executableTaskRun.withState(State.Type.SKIPPED)),
|
||||
.withTaskRun(executableTaskRun.withState(State.Type.SKIPPED).addAttempt(TaskRunAttempt.builder().state(new State().withState(State.Type.SKIPPED)).build())),
|
||||
"handleExecutableTaskSkipped"
|
||||
);
|
||||
return false;
|
||||
@@ -1077,7 +1085,7 @@ public class ExecutorService {
|
||||
executor.withExecution(
|
||||
executor
|
||||
.getExecution()
|
||||
.withTaskRun(workerTask.getTaskRun().withState(State.Type.SKIPPED)),
|
||||
.withTaskRun(workerTask.getTaskRun().withState(State.Type.SKIPPED).addAttempt(TaskRunAttempt.builder().state(new State().withState(State.Type.SKIPPED)).build())),
|
||||
"handleExecutionUpdatingTaskSkipped"
|
||||
);
|
||||
return false;
|
||||
|
||||
@@ -50,16 +50,147 @@ public class FlowTriggerService {
|
||||
.map(io.kestra.plugin.core.trigger.Flow.class::cast);
|
||||
}
|
||||
|
||||
public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<? extends Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
|
||||
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
|
||||
/**
|
||||
* This method computes executions to trigger from flow triggers from a given execution.
|
||||
* It only computes those depending on standard (non-multiple / non-preconditions) conditions, so it must be used
|
||||
* in conjunction with {@link #computeExecutionsFromFlowTriggerPreconditions(Execution, Flow, MultipleConditionStorageInterface)}.
|
||||
*/
|
||||
public List<Execution> computeExecutionsFromFlowTriggerConditions(Execution execution, Flow flow) {
|
||||
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
|
||||
.stream()
|
||||
// we must filter on no multiple conditions and no preconditions to avoid evaluating two times triggers that have standard conditions and multiple conditions
|
||||
.filter(it -> it.getTrigger().getPreconditions() == null && ListUtils.emptyOnNull(it.getTrigger().getConditions()).stream().noneMatch(MultipleCondition.class::isInstance))
|
||||
.toList();
|
||||
|
||||
// short-circuit empty triggers to evaluate
|
||||
if (flowWithFlowTriggers.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// compute all executions to create from flow triggers without taken into account multiple conditions
|
||||
return flowWithFlowTriggers.stream()
|
||||
.map(f -> f.getTrigger().evaluate(
|
||||
Optional.empty(),
|
||||
runContextFactory.of(f.getFlow(), execution),
|
||||
f.getFlow(),
|
||||
execution
|
||||
))
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method computes executions to trigger from flow triggers from a given execution.
|
||||
* It only computes those depending on multiple conditions and preconditions, so it must be used
|
||||
* in conjunction with {@link #computeExecutionsFromFlowTriggerConditions(Execution, Flow)}.
|
||||
*/
|
||||
public List<Execution> computeExecutionsFromFlowTriggerPreconditions(Execution execution, Flow flow, MultipleConditionStorageInterface multipleConditionStorage) {
|
||||
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
|
||||
.stream()
|
||||
// we must filter on multiple conditions or preconditions to avoid evaluating two times triggers that only have standard conditions
|
||||
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getPreconditions() != null || ListUtils.emptyOnNull(flowWithFlowTrigger.getTrigger().getConditions()).stream().anyMatch(MultipleCondition.class::isInstance))
|
||||
.toList();
|
||||
|
||||
// short-circuit empty triggers to evaluate
|
||||
if (flowWithFlowTriggers.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = flowWithFlowTriggers.stream()
|
||||
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
|
||||
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
multipleConditionStorage.getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
multipleCondition
|
||||
)
|
||||
)
|
||||
)
|
||||
// avoid evaluating expired windows (for ex for daily time window or deadline)
|
||||
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
|
||||
.toList();
|
||||
|
||||
// evaluate multiple conditions
|
||||
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
|
||||
Map<String, Boolean> results = f.getMultipleCondition()
|
||||
.getConditions()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(e -> new AbstractMap.SimpleEntry<>(
|
||||
e.getKey(),
|
||||
conditionService.isValid(e.getValue(), f.getFlow(), execution)
|
||||
))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
return Map.entry(f, f.getMultipleConditionWindow().with(results));
|
||||
})
|
||||
.filter(e -> !e.getValue().getResults().isEmpty())
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// persist results
|
||||
multipleConditionStorage.save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
|
||||
|
||||
// compute all executions to create from flow triggers now that multiple conditions storage is populated
|
||||
List<Execution> executions = flowWithFlowTriggers.stream()
|
||||
// will evaluate conditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage
|
||||
)
|
||||
)
|
||||
// will evaluate preconditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger().getPreconditions(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage
|
||||
)
|
||||
)
|
||||
.map(f -> f.getTrigger().evaluate(
|
||||
Optional.of(multipleConditionStorage),
|
||||
runContextFactory.of(f.getFlow(), execution),
|
||||
f.getFlow(),
|
||||
execution
|
||||
))
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.toList();
|
||||
|
||||
// purge fulfilled or expired multiple condition windows
|
||||
Stream.concat(
|
||||
multipleConditionWindowsByFlow.entrySet().stream()
|
||||
.map(e -> Map.entry(
|
||||
e.getKey().getMultipleCondition(),
|
||||
e.getValue()
|
||||
))
|
||||
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
|
||||
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
|
||||
)
|
||||
.map(Map.Entry::getValue),
|
||||
multipleConditionStorage.expired(execution.getTenantId()).stream()
|
||||
).forEach(multipleConditionStorage::delete);
|
||||
|
||||
return executions;
|
||||
}
|
||||
|
||||
private List<FlowWithFlowTrigger> computeFlowTriggers(Execution execution, Flow flow) {
|
||||
if (
|
||||
// prevent recursive flow triggers
|
||||
.filter(flow -> flowService.removeUnwanted(flow, execution))
|
||||
// filter out Test Executions
|
||||
.filter(flow -> execution.getKind() == null)
|
||||
// ensure flow & triggers are enabled
|
||||
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
|
||||
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
||||
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
|
||||
!flowService.removeUnwanted(flow, execution) ||
|
||||
// filter out Test Executions
|
||||
execution.getKind() != null ||
|
||||
// ensure flow & triggers are enabled
|
||||
flow.isDisabled() || flow instanceof FlowWithException ||
|
||||
flow.getTriggers() == null || flow.getTriggers().isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))
|
||||
// filter on the execution state the flow listen to
|
||||
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getStates().contains(execution.getState().getCurrent()))
|
||||
// validate flow triggers conditions excluding multiple conditions
|
||||
@@ -74,96 +205,6 @@ public class FlowTriggerService {
|
||||
execution
|
||||
)
|
||||
)).toList();
|
||||
|
||||
// short-circuit empty triggers to evaluate
|
||||
if (validTriggersBeforeMultipleConditionEval.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = null;
|
||||
if (multipleConditionStorage.isPresent()) {
|
||||
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
|
||||
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
|
||||
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
multipleCondition
|
||||
)
|
||||
)
|
||||
)
|
||||
// avoid evaluating expired windows (for ex for daily time window or deadline)
|
||||
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
|
||||
.toList();
|
||||
|
||||
// evaluate multiple conditions
|
||||
multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
|
||||
Map<String, Boolean> results = f.getMultipleCondition()
|
||||
.getConditions()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(e -> new AbstractMap.SimpleEntry<>(
|
||||
e.getKey(),
|
||||
conditionService.isValid(e.getValue(), f.getFlow(), execution)
|
||||
))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
return Map.entry(f, f.getMultipleConditionWindow().with(results));
|
||||
})
|
||||
.filter(e -> !e.getValue().getResults().isEmpty())
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// persist results
|
||||
multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
|
||||
}
|
||||
|
||||
// compute all executions to create from flow triggers now that multiple conditions storage is populated
|
||||
List<Execution> executions = validTriggersBeforeMultipleConditionEval.stream()
|
||||
// will evaluate conditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage.orElse(null)
|
||||
)
|
||||
)
|
||||
// will evaluate preconditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger().getPreconditions(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage.orElse(null)
|
||||
)
|
||||
)
|
||||
.map(f -> f.getTrigger().evaluate(
|
||||
multipleConditionStorage,
|
||||
runContextFactory.of(f.getFlow(), execution),
|
||||
f.getFlow(),
|
||||
execution
|
||||
))
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.toList();
|
||||
|
||||
if (multipleConditionStorage.isPresent()) {
|
||||
// purge fulfilled or expired multiple condition windows
|
||||
Stream.concat(
|
||||
multipleConditionWindowsByFlow.entrySet().stream()
|
||||
.map(e -> Map.entry(
|
||||
e.getKey().getMultipleCondition(),
|
||||
e.getValue()
|
||||
))
|
||||
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
|
||||
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
|
||||
)
|
||||
.map(Map.Entry::getValue),
|
||||
multipleConditionStorage.get().expired(execution.getTenantId()).stream()
|
||||
).forEach(multipleConditionStorage.get()::delete);
|
||||
}
|
||||
|
||||
return executions;
|
||||
}
|
||||
|
||||
private Stream<MultipleCondition> flowTriggerMultipleConditions(FlowWithFlowTrigger flowWithFlowTrigger) {
|
||||
|
||||
@@ -26,8 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
@KestraTest
|
||||
class FlowTriggerServiceTest {
|
||||
public static final List<Label> EMPTY_LABELS = List.of();
|
||||
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
|
||||
|
||||
|
||||
@Inject
|
||||
private TestRunContextFactory runContextFactory;
|
||||
@Inject
|
||||
@@ -56,10 +55,9 @@ class FlowTriggerServiceTest {
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
simpleFlowExecution,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
flowWithFlowTrigger
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
|
||||
@@ -81,10 +79,9 @@ class FlowTriggerServiceTest {
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
simpleFlowExecution,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
flowWithFlowTrigger
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
@@ -109,10 +106,9 @@ class FlowTriggerServiceTest {
|
||||
.kind(ExecutionKind.TEST)
|
||||
.build();
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
simpleFlowExecutionComingFromATest,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
flowWithFlowTrigger
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
version=1.0.8
|
||||
version=1.0.16
|
||||
|
||||
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
|
||||
org.gradle.parallel=true
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.runners.ScheduleContextInterface;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
|
||||
import io.kestra.jdbc.runner.JdbcSchedulerContext;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
@@ -11,6 +13,10 @@ import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.Temporal;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
@@ -45,4 +51,11 @@ public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Temporal toNextExecutionTime(ZonedDateTime now) {
|
||||
// next_execution_date in the table is stored in UTC
|
||||
// convert 'now' to UTC LocalDateTime to avoid any timezone/offset interpretation by the database.
|
||||
return now.withZoneSameInstant(ZoneOffset.UTC).toLocalDateTime();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.filters.*;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
|
||||
import io.kestra.core.models.executions.statistics.ExecutionCount;
|
||||
import io.kestra.core.models.executions.statistics.ExecutionStatistics;
|
||||
@@ -60,7 +61,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
private static final Field<String> STATE_CURRENT_FIELD = field("state_current", String.class);
|
||||
private static final Field<String> NAMESPACE_FIELD = field("namespace", String.class);
|
||||
private static final Field<Object> START_DATE_FIELD = field("start_date");
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull();
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull().or(field("kind").eq(ExecutionKind.NORMAL.name()));
|
||||
|
||||
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
|
||||
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.LogRepositoryInterface;
|
||||
@@ -31,8 +32,8 @@ import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
|
||||
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
|
||||
public static final String DATE_COLUMN = "timestamp";
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull().or(field("execution_kind").eq(ExecutionKind.NORMAL.name()));
|
||||
private static final String DATE_COLUMN = "timestamp";
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;
|
||||
|
||||
public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository,
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.models.executions.metrics.MetricAggregation;
|
||||
import io.kestra.core.models.executions.metrics.MetricAggregations;
|
||||
@@ -34,7 +35,7 @@ import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface {
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull().or(field("execution_kind").eq(ExecutionKind.NORMAL.name()));
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository;
|
||||
|
||||
public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository,
|
||||
|
||||
@@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.Temporal;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -151,7 +152,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(
|
||||
(field("next_execution_date").lessThan(now.toOffsetDateTime())
|
||||
(field("next_execution_date").lessThan(toNextExecutionTime(now))
|
||||
// we check for null for backwards compatibility
|
||||
.or(field("next_execution_date").isNull()))
|
||||
.and(field("execution_id").isNull())
|
||||
@@ -162,14 +163,14 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
.fetch()
|
||||
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class)));
|
||||
}
|
||||
|
||||
|
||||
public List<Trigger> findByNextExecutionDateReadyButLockedTriggers(ZonedDateTime now) {
|
||||
return this.jdbcRepository.getDslContextWrapper()
|
||||
.transactionResult(configuration -> DSL.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(
|
||||
(field("next_execution_date").lessThan(now.toOffsetDateTime())
|
||||
(field("next_execution_date").lessThan(toNextExecutionTime(now))
|
||||
// we check for null for backwards compatibility
|
||||
.or(field("next_execution_date").isNull()))
|
||||
.and(field("execution_id").isNotNull())
|
||||
@@ -178,6 +179,10 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
.fetch()
|
||||
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class))));
|
||||
}
|
||||
|
||||
protected Temporal toNextExecutionTime(ZonedDateTime now) {
|
||||
return now.toOffsetDateTime();
|
||||
}
|
||||
|
||||
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
|
||||
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;
|
||||
|
||||
@@ -24,10 +24,10 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the concurrency limit counter then process the count using the consumer function.
|
||||
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
|
||||
* Fetch the concurrency limit counter, then process the count using the consumer function.
|
||||
* It locked the raw and is wrapped in a transaction, so the consumer should use the provided dslContext for any database access.
|
||||
* <p>
|
||||
* Note that to avoid a race when no concurrency limit counter exists, it first always try to insert a 0 counter.
|
||||
* Note that to avoid a race when no concurrency limit counter exists, it first always tries to insert a 0 counter.
|
||||
*/
|
||||
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, ConcurrencyLimit, Pair<ExecutionRunning, ConcurrencyLimit>> consumer) {
|
||||
return this.jdbcRepository
|
||||
@@ -106,8 +106,7 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
|
||||
.and(field("namespace").eq(flow.getNamespace()))
|
||||
.and(field("flow_id").eq(flow.getId()));
|
||||
|
||||
return Optional.ofNullable(select.forUpdate().fetchOne())
|
||||
.map(record -> this.jdbcRepository.map(record));
|
||||
return this.jdbcRepository.fetchOne(select.forUpdate());
|
||||
}
|
||||
|
||||
private void save(DSLContext dslContext, ConcurrencyLimit concurrencyLimit) {
|
||||
|
||||
@@ -12,7 +12,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRepository {
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutionQueued> jdbcRepository;
|
||||
@@ -70,18 +69,12 @@ public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRep
|
||||
this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
var select = DSL
|
||||
.using(configuration)
|
||||
.select(AbstractJdbcRepository.field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(buildTenantCondition(execution.getTenantId()))
|
||||
.and(field("key").eq(IdUtils.fromParts(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
|
||||
.forUpdate();
|
||||
|
||||
Optional<ExecutionQueued> maybeExecution = this.jdbcRepository.fetchOne(select);
|
||||
if (maybeExecution.isPresent()) {
|
||||
this.jdbcRepository.delete(maybeExecution.get());
|
||||
}
|
||||
DSL
|
||||
.using(configuration)
|
||||
.deleteFrom(this.jdbcRepository.getTable())
|
||||
.where(buildTenantCondition(execution.getTenantId()))
|
||||
.and(field("key").eq(IdUtils.fromParts(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
|
||||
.execute();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.jdbc.runner;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.FlowNotFoundException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.*;
|
||||
@@ -241,7 +242,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
final ExecutorsUtils executorsUtils,
|
||||
final MaintenanceService maintenanceService,
|
||||
@Value("${kestra.jdbc.executor.thread-count:0}") final int threadCount
|
||||
) {
|
||||
) {
|
||||
this.serviceLivenessCoordinator = serviceLivenessCoordinator;
|
||||
this.flowMetaStore = flowMetaStore;
|
||||
this.flowTopologyRepository = flowTopologyRepository;
|
||||
@@ -423,7 +424,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
|
||||
MultipleConditionEvent multipleConditionEvent = either.getLeft();
|
||||
|
||||
flowTriggerService.computeExecutionsFromFlowTriggers(multipleConditionEvent.execution(), List.of(multipleConditionEvent.flow()), Optional.of(multipleConditionStorage))
|
||||
flowTriggerService.computeExecutionsFromFlowTriggerPreconditions(multipleConditionEvent.execution(), multipleConditionEvent.flow(), multipleConditionStorage)
|
||||
.forEach(exec -> {
|
||||
try {
|
||||
executionQueue.emit(exec);
|
||||
@@ -472,7 +473,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
void reEmitWorkerJobsForWorkers(final Configuration configuration,
|
||||
final List<String> ids) {
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_WORKER_JOB_RESUBMIT_COUNT, MetricRegistry.METRIC_EXECUTOR_WORKER_JOB_RESUBMIT_COUNT_DESCRIPTION)
|
||||
.increment(ids.size());
|
||||
.increment(ids.size());
|
||||
|
||||
workerJobRunningRepository.getWorkerJobWithWorkerDead(configuration.dsl(), ids)
|
||||
.forEach(workerJobRunning -> {
|
||||
@@ -551,10 +552,10 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
execution,
|
||||
FlowId.uidWithoutRevision(execution),
|
||||
() -> {
|
||||
Executor executor = new Executor(execution, null);
|
||||
try {
|
||||
|
||||
final FlowWithSource flow = findFlow(execution);
|
||||
Executor executor = new Executor(execution, null).withFlow(flow);
|
||||
final FlowWithSource flow = findFlowOrThrow(execution);
|
||||
executor = executor.withFlow(flow);
|
||||
|
||||
// schedule it for later if needed
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
|
||||
@@ -642,7 +643,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
.forEach(throwConsumer(workerTask -> {
|
||||
try {
|
||||
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED).addAttempt(TaskRunAttempt.builder().state(new State().withState(State.Type.SKIPPED)).build())));
|
||||
} else {
|
||||
if (workerTask.getTask().isSendToWorkerTask()) {
|
||||
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
|
||||
@@ -650,8 +651,24 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
.orElse(null);
|
||||
workerJobQueue.emit(workerGroupKey, workerTask);
|
||||
}
|
||||
/// flowable attempt state transition to running
|
||||
if (workerTask.getTask().isFlowable()) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
|
||||
List<TaskRunAttempt> attempts = Optional.ofNullable(workerTask.getTaskRun().getAttempts())
|
||||
.map(ArrayList::new)
|
||||
.orElseGet(ArrayList::new);
|
||||
|
||||
|
||||
attempts.add(
|
||||
TaskRunAttempt.builder()
|
||||
.state(new State().withState(State.Type.RUNNING))
|
||||
.build()
|
||||
);
|
||||
|
||||
TaskRun updatedTaskRun = workerTask.getTaskRun()
|
||||
.withAttempts(attempts)
|
||||
.withState(State.Type.RUNNING);
|
||||
|
||||
workerTaskResults.add(new WorkerTaskResult(updatedTaskRun));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@@ -687,20 +704,20 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
.filter(subflowExecution -> this.deduplicateSubflowExecution(execution, executorState, subflowExecution.getParentTaskRun()))
|
||||
.toList();
|
||||
|
||||
subflowExecutionDedup
|
||||
.forEach(throwConsumer(subflowExecution -> {
|
||||
Execution subExecution = subflowExecution.getExecution();
|
||||
String log = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
|
||||
subflowExecutionDedup
|
||||
.forEach(throwConsumer(subflowExecution -> {
|
||||
Execution subExecution = subflowExecution.getExecution();
|
||||
String log = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
|
||||
|
||||
JdbcExecutor.log.info(log);
|
||||
JdbcExecutor.log.info(log);
|
||||
|
||||
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun(), subflowExecution.getExecution().getKind()).toBuilder()
|
||||
.level(Level.INFO)
|
||||
.message(log)
|
||||
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
|
||||
.thread(Thread.currentThread().getName())
|
||||
.build()
|
||||
);
|
||||
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun(), subflowExecution.getExecution().getKind()).toBuilder()
|
||||
.level(Level.INFO)
|
||||
.message(log)
|
||||
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
|
||||
.thread(Thread.currentThread().getName())
|
||||
.build()
|
||||
);
|
||||
|
||||
executionQueue.emit(subflowExecution.getExecution());
|
||||
}));
|
||||
@@ -721,7 +738,20 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
Span.current().recordException(e).setStatus(StatusCode.ERROR);
|
||||
|
||||
return null;
|
||||
} catch (FlowNotFoundException e) {
|
||||
// avoid infinite loop
|
||||
if (!executor.getExecution().getState().getCurrent().isFailed()) {
|
||||
return Pair.of(
|
||||
handleFailedExecutionFromExecutor(executor, e),
|
||||
executorState
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return Pair.of(
|
||||
executor,
|
||||
executorState
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
@@ -758,7 +788,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
|
||||
try {
|
||||
// process worker task result
|
||||
executorService.addWorkerTaskResult(current, () -> findFlow(execution), message);
|
||||
executorService.addWorkerTaskResult(current, throwSupplier(() -> findFlowOrThrow(execution)), message);
|
||||
// join worker result
|
||||
return Pair.of(
|
||||
current,
|
||||
@@ -769,7 +799,20 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
handleFailedExecutionFromExecutor(current, e),
|
||||
pair.getRight()
|
||||
);
|
||||
} catch (FlowNotFoundException e) {
|
||||
// avoid infinite loop
|
||||
if (!current.getExecution().getState().getCurrent().isFailed()) {
|
||||
return Pair.of(
|
||||
handleFailedExecutionFromExecutor(current, e),
|
||||
pair.getRight()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return Pair.of(
|
||||
current,
|
||||
pair.getRight()
|
||||
);
|
||||
}
|
||||
|
||||
return null;
|
||||
@@ -810,7 +853,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
|
||||
if (execution.hasTaskRunJoinable(message.getParentTaskRun())) { // TODO if we remove this check, we can avoid adding 'iteration' on the 'isSame()' method
|
||||
try {
|
||||
FlowWithSource flow = findFlow(execution);
|
||||
FlowWithSource flow = findFlowOrThrow(execution);
|
||||
Task task = flow.findTaskByTaskId(message.getParentTaskRun().getTaskId());
|
||||
TaskRun taskRun;
|
||||
|
||||
@@ -872,7 +915,20 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
handleFailedExecutionFromExecutor(current, e),
|
||||
pair.getRight()
|
||||
);
|
||||
} catch (FlowNotFoundException e) {
|
||||
// avoid infinite loop
|
||||
if (!current.getExecution().getState().getCurrent().isFailed()) {
|
||||
return Pair.of(
|
||||
handleFailedExecutionFromExecutor(current, e),
|
||||
pair.getRight()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return Pair.of(
|
||||
current,
|
||||
pair.getRight()
|
||||
);
|
||||
}
|
||||
|
||||
return null;
|
||||
@@ -910,8 +966,8 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
throw new IllegalStateException("Execution state don't exist for " + message.getParentExecutionId() + ", receive " + message);
|
||||
}
|
||||
|
||||
FlowWithSource flow = findFlow(execution);
|
||||
try {
|
||||
FlowWithSource flow = findFlowOrThrow(execution);
|
||||
ExecutableTask<?> executableTask = (ExecutableTask<?>) flow.findTaskByTaskId(message.getTaskId());
|
||||
if (!executableTask.waitForExecution()) {
|
||||
return null;
|
||||
@@ -934,7 +990,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
log.error("Unable to emit the subflow execution result", ex);
|
||||
}
|
||||
}
|
||||
} catch (InternalException e) {
|
||||
} catch (InternalException | FlowNotFoundException e) {
|
||||
log.error("Unable to process the subflow execution end", e);
|
||||
}
|
||||
return null;
|
||||
@@ -1063,11 +1119,18 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
// We need to detect that and reset them as they will never reach the reset code later on this method.
|
||||
if (execution.getTrigger() != null && execution.getState().isFailed() && ListUtils.isEmpty(execution.getTaskRunList())) {
|
||||
FlowWithSource flow = executor.getFlow();
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.ifPresent(trigger -> {
|
||||
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
|
||||
});
|
||||
|
||||
if (flow == null) {
|
||||
log.error("Couldn't reset trigger for execution {} as flow {} is missing. Trigger {} might stay stuck.",
|
||||
execution.getId(),
|
||||
execution.getTenantId() + "/" + execution.getNamespace() + "/" + execution.getFlowId(),
|
||||
execution.getTrigger().getId()
|
||||
);
|
||||
} else {
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
@@ -1080,7 +1143,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
|
||||
// or from a worker task in an afterExecution block, in this case we need to load the flow
|
||||
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
|
||||
executor = executor.withFlow(findFlow(executor.getExecution()));
|
||||
executor = executor.withFlow(findFlowOrThrow(executor.getExecution()));
|
||||
}
|
||||
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
|
||||
|
||||
@@ -1133,7 +1196,9 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
boolean queuedThenKilled = execution.getState().getCurrent() == State.Type.KILLED
|
||||
&& execution.getState().getHistories().stream().anyMatch(h -> h.getState().isQueued())
|
||||
&& execution.getState().getHistories().stream().noneMatch(h -> h.getState().isRunning());
|
||||
if (!queuedThenKilled) {
|
||||
boolean concurrencyShortCircuitState = Concurrency.possibleTransitions(execution.getState().getCurrent())
|
||||
&& execution.getState().getHistories().get(execution.getState().getHistories().size() - 2).getState().isCreated();
|
||||
if (!queuedThenKilled && !concurrencyShortCircuitState) {
|
||||
concurrencyLimitStorage.decrement(executor.getFlow());
|
||||
|
||||
if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
|
||||
@@ -1177,7 +1242,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
|
||||
}
|
||||
}
|
||||
} catch (QueueException e) {
|
||||
} catch (QueueException | FlowNotFoundException e) {
|
||||
if (!ignoreFailure) {
|
||||
// If we cannot add the new worker task result to the execution, we fail it
|
||||
executionRepository.lock(executor.getExecution().getId(), pair -> {
|
||||
@@ -1196,8 +1261,10 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
private void processFlowTriggers(Execution execution) throws QueueException {
|
||||
// directly process simple conditions
|
||||
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
|
||||
.filter(f ->ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
|
||||
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f.getFlow()), Optional.empty()).stream())
|
||||
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
|
||||
.map(f -> f.getFlow())
|
||||
.distinct() // as computeExecutionsFromFlowTriggers is based on flow, we must map FlowWithFlowTrigger to a flow and distinct to avoid multiple execution for the same flow
|
||||
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggerConditions(execution, f).stream())
|
||||
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
|
||||
|
||||
// send multiple conditions to the multiple condition queue for later processing
|
||||
@@ -1208,8 +1275,17 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
.forEach(throwConsumer(multipleCondition -> multipleConditionEventQueue.emit(multipleCondition)));
|
||||
}
|
||||
|
||||
private FlowWithSource findFlow(Execution execution) {
|
||||
FlowInterface flow = this.flowMetaStore.findByExecution(execution).orElseThrow();
|
||||
private FlowWithSource findFlowOrThrow(Execution execution) throws FlowNotFoundException {
|
||||
return findFlow(execution).orElseThrow(() -> new FlowNotFoundException("Unable to find flow %s for execution %s".formatted(execution.getTenantId() + "/" + execution.getNamespace() + "/" + execution.getFlowId(), execution.getId())));
|
||||
}
|
||||
|
||||
private Optional<FlowWithSource> findFlow(Execution execution) {
|
||||
Optional<FlowInterface> maybeFlow = this.flowMetaStore.findByExecution(execution);
|
||||
if (maybeFlow.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
FlowInterface flow = maybeFlow.get();
|
||||
FlowWithSource flowWithSource = pluginDefaultService.injectDefaults(flow, execution);
|
||||
|
||||
if (templateExecutorInterface.isPresent()) {
|
||||
@@ -1224,7 +1300,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
}
|
||||
}
|
||||
|
||||
return flowWithSource;
|
||||
return Optional.of(flowWithSource);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1271,7 +1347,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESTART_FAILED_TASK)) {
|
||||
Execution newAttempt = executionService.retryTask(
|
||||
pair.getKey(),
|
||||
findFlow(pair.getKey()),
|
||||
findFlowOrThrow(pair.getKey()),
|
||||
executionDelay.getTaskRunId()
|
||||
);
|
||||
executor = executor.withExecution(newAttempt, "retryFailedTask");
|
||||
@@ -1283,7 +1359,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
}
|
||||
// Handle WaitFor
|
||||
else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.CONTINUE_FLOWABLE)) {
|
||||
Execution execution = executionService.retryWaitFor(executor.getExecution(), executionDelay.getTaskRunId());
|
||||
Execution execution = executionService.retryWaitFor(executor.getExecution(), executionDelay.getTaskRunId());
|
||||
executor = executor.withExecution(execution, "continueLoop");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@@ -1309,20 +1385,22 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
|
||||
slaMonitorStorage.processExpired(Instant.now(), slaMonitor -> {
|
||||
Executor result = executionRepository.lock(slaMonitor.getExecutionId(), pair -> {
|
||||
FlowWithSource flow = findFlow(pair.getLeft());
|
||||
Executor executor = new Executor(pair.getLeft(), null).withFlow(flow);
|
||||
Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();
|
||||
if (sla.isEmpty()) {
|
||||
// this can happen in case the flow has been updated and the SLA removed
|
||||
log.debug("Cannot find the SLA '{}' in the flow for execution '{}', ignoring it.", slaMonitor.getSlaId(), slaMonitor.getExecutionId());
|
||||
return null;
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_EXECUTOR_SLA_EXPIRED_COUNT, MetricRegistry.METRIC_EXECUTOR_SLA_EXPIRED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution()))
|
||||
.increment();
|
||||
|
||||
Executor executor = new Executor(pair.getLeft(), null);
|
||||
try {
|
||||
// TODO flow with source is not needed here. Maybe the best would be to not add the flow inside the executor to trace all usage
|
||||
FlowWithSource flow = findFlowOrThrow(pair.getLeft());
|
||||
executor = executor.withFlow(flow);
|
||||
Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();
|
||||
if (sla.isEmpty()) {
|
||||
// this can happen in case the flow has been updated and the SLA removed
|
||||
log.debug("Cannot find the SLA '{}' in the flow for execution '{}', ignoring it.", slaMonitor.getSlaId(), slaMonitor.getExecutionId());
|
||||
return null;
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_EXECUTOR_SLA_EXPIRED_COUNT, MetricRegistry.METRIC_EXECUTOR_SLA_EXPIRED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution()))
|
||||
.increment();
|
||||
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
Optional<Violation> violation = slaService.evaluateExecutionMonitoringSLA(runContext, executor.getExecution(), sla.get());
|
||||
if (violation.isPresent()) { // should always be true
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.repositories.TriggerRepositoryInterface;
|
||||
import io.kestra.core.runners.ScheduleContextInterface;
|
||||
import io.kestra.core.runners.Scheduler;
|
||||
import io.kestra.core.runners.SchedulerTriggerStateInterface;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.core.services.FlowService;
|
||||
@@ -56,6 +57,9 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
.forEach(abstractTrigger -> triggerRepository.delete(Trigger.of(flow, abstractTrigger)));
|
||||
}
|
||||
});
|
||||
|
||||
// No-op consumption of the trigger queue, so the events are purged from the queue
|
||||
this.triggerQueue.receive(Scheduler.class, trigger -> { });
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -14,6 +14,7 @@ import org.jooq.impl.DSL;
|
||||
import javax.sql.DataSource;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
|
||||
@@ -45,10 +46,7 @@ public class JdbcTestUtils {
|
||||
.meta()
|
||||
.getTables()
|
||||
.stream()
|
||||
.filter(throwPredicate(table -> (table.getSchema().getName().equals(dataSource.getConnection().getCatalog())) ||
|
||||
table.getSchema().getName().equals("public") || // for Postgres
|
||||
table.getSchema().getName().equals("dbo") // for SQLServer
|
||||
))
|
||||
.filter(throwPredicate(table -> (table.getSchema().getName().equals(Optional.ofNullable(dataSource.getConnection().getSchema()).orElse(dataSource.getConnection().getCatalog())))))
|
||||
.filter(table -> tableConfigs.getTableConfigs().stream().anyMatch(conf -> conf.table().equalsIgnoreCase(table.getName())))
|
||||
.toList();
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -138,6 +139,7 @@ public abstract class JdbcRunnerRetryTest {
|
||||
retryCaseTest.retryDynamicTask(execution);
|
||||
}
|
||||
|
||||
@FlakyTest(description = "it seems this flow sometimes stay stuck in RUNNING")
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/retry-with-flowable-errors.yaml")
|
||||
void retryWithFlowableErrors(Execution execution){
|
||||
|
||||
@@ -6,21 +6,21 @@ import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.MessageTooBigException;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.AbstractRunnerTest;
|
||||
import io.kestra.core.runners.InputsTest;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
import org.slf4j.event.Level;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
@@ -28,11 +28,32 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public abstract class JdbcRunnerTest extends AbstractRunnerTest {
|
||||
|
||||
public static final String NAMESPACE = "io.kestra.tests";
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
private JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@Test
|
||||
void avoidInfiniteExecutionLoop() throws QueueException, InterruptedException {
|
||||
Flux<Execution> executionFlux = TestsUtils.receive(executionQueue);
|
||||
Execution execution = Execution.newExecution(TestsUtils.mockFlow(), Collections.emptyList());
|
||||
executionQueue.emit(execution);
|
||||
|
||||
// Wait some time to ensure no infinite loop occurs
|
||||
Thread.sleep(500);
|
||||
|
||||
// We expect the initial execution message + the failed due to missing flow
|
||||
assertThat(
|
||||
Objects.requireNonNull(executionFlux.collectList().block()).stream()
|
||||
.filter(e -> e.getId().equals(execution.getId()))
|
||||
.toList()
|
||||
).hasSize(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"})
|
||||
void waitForChildTaskWarning() throws Exception {
|
||||
|
||||
@@ -114,6 +114,10 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
|
||||
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.RUNNING) {
|
||||
runningLatch.countDown();
|
||||
}
|
||||
|
||||
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.FAILED) {
|
||||
fail("Worker task result should not be in FAILED state as it should be resubmitted");
|
||||
}
|
||||
});
|
||||
|
||||
workerJobQueue.emit(workerTask(Duration.ofSeconds(5)));
|
||||
|
||||
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@Slf4j
|
||||
@Singleton
|
||||
public class TestRunner implements Runnable, AutoCloseable {
|
||||
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
|
||||
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors()) * 16;
|
||||
@Setter private boolean schedulerEnabled = true;
|
||||
@Setter private boolean workerEnabled = true;
|
||||
|
||||
@@ -49,7 +49,9 @@ public class TestRunner implements Runnable, AutoCloseable {
|
||||
running.set(true);
|
||||
|
||||
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
|
||||
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
|
||||
ExecutorInterface executor = applicationContext.getBean(ExecutorInterface.class);
|
||||
servers.add(executor);
|
||||
poolExecutor.execute(executor);
|
||||
|
||||
if (workerEnabled) {
|
||||
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
|
||||
|
||||
@@ -32,8 +32,8 @@
|
||||
<strong>We're sorry but Kestra doesn't work properly without JavaScript enabled. Please enable it to continue.</strong>
|
||||
</noscript>
|
||||
|
||||
<div id="loader-wrapper" data-test-id="loader-wrapper">
|
||||
<div id="loader" data-test-id="loader"></div>
|
||||
<div id="loader-wrapper">
|
||||
<div id="loader"></div>
|
||||
</div>
|
||||
|
||||
<div id="app-container">
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
:data-component="`FILENAME_PLACEHOLDER#${tab}`"
|
||||
>
|
||||
<template #label>
|
||||
<component :is="embedActiveTab || tab.disabled ? 'a' : 'router-link'" @click="embeddedTabChange(tab)" :to="embedActiveTab ? undefined : to(tab)" :data-test-id="tab.name">
|
||||
<component :is="embedActiveTab || tab.disabled ? 'a' : 'router-link'" @click="embeddedTabChange(tab)" :to="embedActiveTab ? undefined : to(tab)">
|
||||
<el-tooltip v-if="tab.disabled && tab.props && tab.props.showTooltip" :content="$t('add-trigger-in-editor')" placement="top">
|
||||
<span><strong>{{ tab.title }}</strong></span>
|
||||
</el-tooltip>
|
||||
|
||||
@@ -272,6 +272,7 @@
|
||||
import Id from "../Id.vue";
|
||||
import SelectTableActions from "../../mixins/selectTableActions";
|
||||
import _merge from "lodash/merge";
|
||||
import moment from "moment";
|
||||
import LogsWrapper from "../logs/LogsWrapper.vue";
|
||||
import KestraFilter from "../filter/KestraFilter.vue"
|
||||
import {mapStores} from "pinia";
|
||||
@@ -500,6 +501,15 @@
|
||||
loadQuery(base) {
|
||||
let queryFilter = this.queryWithFilter();
|
||||
|
||||
const timeRange = queryFilter["filters[timeRange][EQUALS]"];
|
||||
if (timeRange) {
|
||||
const end = new Date();
|
||||
const start = new Date(end.getTime() - moment.duration(timeRange).asMilliseconds());
|
||||
queryFilter["filters[startDate][GREATER_THAN_OR_EQUAL_TO]"] = start.toISOString();
|
||||
queryFilter["filters[endDate][LESS_THAN_OR_EQUAL_TO]"] = end.toISOString();
|
||||
delete queryFilter["filters[timeRange][EQUALS]"];
|
||||
}
|
||||
|
||||
return _merge(base, queryFilter)
|
||||
},
|
||||
},
|
||||
|
||||
@@ -56,7 +56,7 @@
|
||||
this.follow();
|
||||
window.addEventListener("popstate", this.follow)
|
||||
|
||||
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId})).count;
|
||||
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId}, true)).count;
|
||||
},
|
||||
mounted() {
|
||||
this.previousExecutionId = this.$route.params.id
|
||||
|
||||
@@ -6,13 +6,11 @@
|
||||
class="filter"
|
||||
>
|
||||
<el-radio-button
|
||||
data-test-id="date-filter-relative-selector"
|
||||
:value="filterType.RELATIVE"
|
||||
>
|
||||
{{ $t("relative") }}
|
||||
</el-radio-button>
|
||||
<el-radio-button
|
||||
data-test-id="date-filter-absolute-selector"
|
||||
:value="filterType.ABSOLUTE"
|
||||
>
|
||||
{{ $t("absolute") }}
|
||||
@@ -38,13 +36,11 @@
|
||||
class="filter"
|
||||
>
|
||||
<el-radio-button
|
||||
data-test-id="date-filter-relative-selector"
|
||||
:value="filterType.RELATIVE"
|
||||
>
|
||||
{{ $t("relative") }}
|
||||
</el-radio-button>
|
||||
<el-radio-button
|
||||
data-test-id="date-filter-absolute-selector"
|
||||
:value="filterType.ABSOLUTE"
|
||||
>
|
||||
{{ $t("absolute") }}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
<template>
|
||||
<el-tooltip :disabled="tooltip === undefined" :content="tooltip" effect="light">
|
||||
<el-select
|
||||
data-test-id="time-selector"
|
||||
:model-value="value"
|
||||
:placeholder="placeholder"
|
||||
@change="$emit('change', $event)"
|
||||
|
||||
@@ -528,27 +528,25 @@
|
||||
}
|
||||
.content-container {
|
||||
height: calc(100vh - 0px);
|
||||
overflow-y: auto !important;
|
||||
overflow-y: scroll;
|
||||
overflow-x: hidden;
|
||||
scrollbar-gutter: stable;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
:deep(.el-collapse) {
|
||||
.el-collapse-item__wrap {
|
||||
overflow-y: auto !important;
|
||||
max-height: none !important;
|
||||
}
|
||||
|
||||
.el-collapse-item__content {
|
||||
overflow-y: auto !important;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
}
|
||||
|
||||
:deep(.var-value) {
|
||||
overflow-y: auto !important;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
@@ -46,7 +46,6 @@
|
||||
<div class="right-align">
|
||||
<el-form-item class="submit">
|
||||
<el-button
|
||||
:data-test-id="buttonTestId"
|
||||
:icon="buttonIcon"
|
||||
:disabled="!flowCanBeExecuted"
|
||||
:class="{'flow-run-trigger-button': true, 'onboarding-glow': coreStore.guidedProperties.tourStarted}"
|
||||
|
||||
@@ -213,7 +213,7 @@
|
||||
/>
|
||||
<template #footer>
|
||||
<router-link
|
||||
v-if="isSchedule(selectedTrigger.type)"
|
||||
v-if="isSchedule(selectedTrigger?.type)"
|
||||
:to="{
|
||||
name: 'admin/triggers',
|
||||
query: {
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
:navbar="false"
|
||||
v-if="(input.type === 'ENUM' || input.type === 'SELECT') && !input.isRadio"
|
||||
:data-testid="`input-form-${input.id}`"
|
||||
v-model="selectedTriggerLocal[input.id]"
|
||||
v-model="inputsValues[input.id]"
|
||||
@update:model-value="onChange(input)"
|
||||
:allow-create="input.allowCustomValue"
|
||||
filterable
|
||||
@@ -238,7 +238,6 @@
|
||||
/>
|
||||
<duration-picker
|
||||
v-if="input.type === 'DURATION'"
|
||||
:data-testid="`input-form-${input.id}`"
|
||||
v-model="inputsValues[input.id]"
|
||||
@update:model-value="onChange(input)"
|
||||
/>
|
||||
@@ -334,7 +333,6 @@
|
||||
multiSelectInputs: {},
|
||||
inputsValidated: new Set(),
|
||||
debouncedValidation: () => {},
|
||||
selectedTriggerLocal: {},
|
||||
editingArrayId: null,
|
||||
editableItems: {},
|
||||
};
|
||||
@@ -344,8 +342,9 @@
|
||||
this.inputsMetaData = JSON.parse(JSON.stringify(this.initialInputs));
|
||||
this.debouncedValidation = debounce(this.validateInputs, 500)
|
||||
|
||||
if(this.selectedTrigger?.inputs) this.selectedTriggerLocal = toRaw(this.selectedTrigger.inputs);
|
||||
else this.selectedTriggerLocal = this.inputsValues;
|
||||
if(this.selectedTrigger?.inputs){
|
||||
this.inputsValues = toRaw(this.selectedTrigger.inputs);
|
||||
}
|
||||
|
||||
this.validateInputs().then(() => {
|
||||
this.$watch("inputsValues", {
|
||||
@@ -362,6 +361,10 @@
|
||||
},
|
||||
deep: true
|
||||
});
|
||||
|
||||
// on first load default values need to be sent to the parent
|
||||
// since they are part of the actual value
|
||||
this.$emit("update:modelValue", this.inputsValues)
|
||||
});
|
||||
},
|
||||
mounted() {
|
||||
@@ -403,12 +406,12 @@
|
||||
},
|
||||
updateDefaults() {
|
||||
for (const input of this.inputsMetaData || []) {
|
||||
const {type, id, value} = input;
|
||||
const {type, id, value, defaults} = input;
|
||||
if (this.inputsValues[id] === undefined || this.inputsValues[id] === null || input.isDefault) {
|
||||
if (type === "MULTISELECT") {
|
||||
this.multiSelectInputs[id] = value;
|
||||
this.multiSelectInputs[id] = value ?? defaults;
|
||||
}
|
||||
this.inputsValues[id] = Inputs.normalize(type, value);
|
||||
this.inputsValues[id] = Inputs.normalize(type, value ?? defaults);
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -469,9 +472,9 @@
|
||||
if (this.inputsMetaData === undefined || this.inputsMetaData.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
const inputsValuesWithNoDefault = this.inputsValuesWithNoDefault();
|
||||
|
||||
|
||||
const formData = inputsToFormData(this, this.inputsMetaData, inputsValuesWithNoDefault);
|
||||
|
||||
const metadataCallback = (response) => {
|
||||
|
||||
@@ -247,6 +247,7 @@
|
||||
const suggestWidgetResizeObserver = ref<MutationObserver>()
|
||||
const suggestWidgetObserver = ref<MutationObserver>()
|
||||
const suggestWidget = ref<HTMLElement>()
|
||||
const resizeObserver = ref<ResizeObserver>()
|
||||
|
||||
|
||||
|
||||
@@ -796,6 +797,20 @@
|
||||
setTimeout(() => monaco.editor.remeasureFonts(), 1)
|
||||
emit("editorDidMount", editorResolved.value);
|
||||
|
||||
/* Hhandle resizing. */
|
||||
resizeObserver.value = new ResizeObserver(() => {
|
||||
if (localEditor.value) {
|
||||
localEditor.value.layout();
|
||||
}
|
||||
if (localDiffEditor.value) {
|
||||
localDiffEditor.value.getModifiedEditor().layout();
|
||||
localDiffEditor.value.getOriginalEditor().layout();
|
||||
}
|
||||
});
|
||||
if (editorRef.value) {
|
||||
resizeObserver.value.observe(editorRef.value);
|
||||
}
|
||||
|
||||
highlightLine();
|
||||
}
|
||||
|
||||
@@ -853,6 +868,8 @@
|
||||
function destroy() {
|
||||
disposeObservers();
|
||||
disposeCompletions.value?.();
|
||||
resizeObserver.value?.disconnect();
|
||||
resizeObserver.value = undefined;
|
||||
if (localDiffEditor.value !== undefined) {
|
||||
localDiffEditor.value?.dispose();
|
||||
localDiffEditor.value?.getModel()?.modified?.dispose();
|
||||
|
||||
@@ -475,7 +475,7 @@
|
||||
return this.namespacesStore
|
||||
.createKv({
|
||||
...this.kv,
|
||||
contentType: ["DATE", "DATETIME"].includes(type) ? "text/plain" : "application/json",
|
||||
contentType: "text/plain",
|
||||
value
|
||||
})
|
||||
.then(() => {
|
||||
|
||||
@@ -26,7 +26,9 @@
|
||||
<p class="section-1-desc">
|
||||
{{ $t("welcome_page.start") }}
|
||||
</p>
|
||||
|
||||
<el-button
|
||||
v-if="isOSS"
|
||||
@click="startTour"
|
||||
:icon="Plus"
|
||||
size="large"
|
||||
@@ -35,6 +37,18 @@
|
||||
>
|
||||
{{ $t("welcome button create") }}
|
||||
</el-button>
|
||||
<el-button
|
||||
v-else
|
||||
:icon="Plus"
|
||||
tag="router-link"
|
||||
:to="{name: 'flows/create'}"
|
||||
size="large"
|
||||
type="primary"
|
||||
class="px-3 p-4 section-1-link product-link"
|
||||
>
|
||||
{{ $t("welcome button create") }}
|
||||
</el-button>
|
||||
|
||||
<el-button
|
||||
:icon="Play"
|
||||
tag="a"
|
||||
@@ -70,6 +84,7 @@
|
||||
import permission from "../../models/permission";
|
||||
import action from "../../models/action";
|
||||
import {useAuthStore} from "override/stores/auth";
|
||||
import {useMiscStore} from "override/stores/misc";
|
||||
|
||||
const {topbar = true} = defineProps<{topbar?: boolean}>();
|
||||
|
||||
@@ -87,6 +102,8 @@
|
||||
|
||||
const authStore = useAuthStore();
|
||||
|
||||
const isOSS = computed(() => useMiscStore().configs?.edition === "OSS")
|
||||
|
||||
const canCreate = computed(() => {
|
||||
return authStore.user.hasAnyActionOnAnyNamespace(permission.FLOW, action.CREATE);
|
||||
});
|
||||
|
||||
@@ -100,7 +100,7 @@
|
||||
>
|
||||
<NamespaceSelect
|
||||
v-model="secret.namespace"
|
||||
:readonly="secret.update"
|
||||
:read-only="secret.update"
|
||||
:include-system-namespace="true"
|
||||
all
|
||||
/>
|
||||
|
||||
@@ -506,7 +506,7 @@ export const useFlowStore = defineStore("flow", () => {
|
||||
}
|
||||
|
||||
function loadDependencies(options: { namespace: string, id: string, subtype: "FLOW" | "EXECUTION" }, onlyCount = false) {
|
||||
return store.$http.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies?expandAll=true`).then(response => {
|
||||
return store.$http.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies?expandAll=${onlyCount ? false : true}`).then(response => {
|
||||
return {
|
||||
...(!onlyCount ? {data: transformResponse(response.data, options.subtype)} : {}),
|
||||
count: response.data.nodes ? [...new Set(response.data.nodes.map((r:{uid:string}) => r.uid))].length : 0
|
||||
|
||||
@@ -107,25 +107,11 @@
|
||||
|
||||
:deep(.alert-info) {
|
||||
display: flex;
|
||||
gap: 12px;
|
||||
padding: 16px 16px 0 16px;
|
||||
padding: .5rem !important;
|
||||
background-color: var(--ks-background-info);
|
||||
border: 1px solid var(--ks-border-info);
|
||||
border-left-width: 5px;
|
||||
border-radius: 8px;
|
||||
|
||||
&::before {
|
||||
content: '!';
|
||||
min-width: 20px;
|
||||
height: 20px;
|
||||
margin-top: 4px;
|
||||
border-radius: 50%;
|
||||
background: var(--ks-content-info);
|
||||
border: 1px solid var(--ks-border-info);
|
||||
color: var(--ks-content-inverse);
|
||||
font: 600 13px/20px sans-serif;
|
||||
text-align: center;
|
||||
}
|
||||
border-left-width: 0.25rem;
|
||||
border-radius: 0.5rem;
|
||||
|
||||
p { color: var(--ks-content-info); }
|
||||
}
|
||||
@@ -135,7 +121,7 @@
|
||||
color: var(--ks-content-info);
|
||||
border: 1px solid var(--ks-border-info);
|
||||
font-family: 'Courier New', Courier, monospace;
|
||||
white-space: nowrap; // Prevent button text from wrapping
|
||||
white-space: nowrap;
|
||||
|
||||
.material-design-icon {
|
||||
position: absolute;
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ {what} hinzufügen",
|
||||
"adding_default": "+ Neuen Wert hinzufügen",
|
||||
"clearSelection": "Auswahl aufheben",
|
||||
"close": {
|
||||
"afterExecution": "Nach Ausführung Task schließen",
|
||||
"conditions": "Bedingung schließen",
|
||||
"errors": "Fehlerbehandler schließen",
|
||||
"finally": "Task schließen",
|
||||
"input": "Eingabe schließen",
|
||||
"pluginDefaults": "Plugin-Standard schließen",
|
||||
"tasks": "Task schließen",
|
||||
"triggers": "Trigger schließen"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Fügen Sie einen Block nach der Ausführung hinzu",
|
||||
"conditions": "Bedingung hinzufügen",
|
||||
"default": "Hinzufügen",
|
||||
"errors": "Einen Fehler-Handler hinzufügen",
|
||||
"finally": "Fügen Sie einen Finally-Block hinzu",
|
||||
"inputs": "Ein Eingabefeld hinzufügen",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variable",
|
||||
"yaml": "YAML-Editor"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Diesen Fall entfernen",
|
||||
"default": "Diesen Eintrag entfernen"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Nach Ausführung",
|
||||
"connection": "Verbindungseigenschaften",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Wählen Sie eine Task aus",
|
||||
"conditions": "Wählen Sie eine Bedingung aus",
|
||||
"default": "Wählen Sie einen Typ aus",
|
||||
"errors": "Wählen Sie eine Task aus",
|
||||
"finally": "Wählen Sie eine Task aus",
|
||||
"inputs": "Wählen Sie einen Input-Feldtyp aus",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Agregar un {what}",
|
||||
"adding_default": "+ Añadir un nuevo value",
|
||||
"clearSelection": "Borrar selección",
|
||||
"close": {
|
||||
"afterExecution": "Cerrar después de la ejecución de la task",
|
||||
"conditions": "Condición de cierre",
|
||||
"errors": "Cerrar el manejador de errores",
|
||||
"finally": "Cerrar task",
|
||||
"input": "Cerrar input",
|
||||
"pluginDefaults": "Cerrar plugin predeterminado",
|
||||
"tasks": "Cerrar task",
|
||||
"triggers": "Cerrar trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Agregar un bloque después de la ejecución",
|
||||
"conditions": "Agregar una condición",
|
||||
"default": "Agregar",
|
||||
"errors": "Agregar un manejador de errores",
|
||||
"finally": "Agregar un bloque finally",
|
||||
"inputs": "Agregar un campo de input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variable",
|
||||
"yaml": "Editor YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Eliminar este caso",
|
||||
"default": "Eliminar esta entrada"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Después de la Ejecución",
|
||||
"connection": "Propiedades de conexión",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Seleccionar una task",
|
||||
"conditions": "Seleccione una condición",
|
||||
"default": "Seleccionar un tipo",
|
||||
"errors": "Selecciona una task",
|
||||
"finally": "Seleccionar una task",
|
||||
"inputs": "Seleccione un tipo de campo de input",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Ajouter un {what}",
|
||||
"adding_default": "+ Ajouter une nouvelle valeur",
|
||||
"clearSelection": "Effacer la sélection",
|
||||
"close": {
|
||||
"afterExecution": "Fermer après l'exécution de la task",
|
||||
"conditions": "Condition de fermeture",
|
||||
"errors": "Fermer le gestionnaire d'erreurs",
|
||||
"finally": "Fermer la task",
|
||||
"input": "Fermer l'input",
|
||||
"pluginDefaults": "Fermer le plugin par défaut",
|
||||
"tasks": "Fermer la task",
|
||||
"triggers": "Fermer le trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Ajouter un bloc après l'exécution",
|
||||
"conditions": "Ajouter une condition",
|
||||
"default": "Ajouter",
|
||||
"errors": "Ajouter un gestionnaire d'erreurs",
|
||||
"finally": "Ajouter un bloc finally",
|
||||
"inputs": "Ajouter un champ d'input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variable",
|
||||
"yaml": "Éditeur YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Supprimer ce cas",
|
||||
"default": "Supprimer cette entrée"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Après l'Exécution",
|
||||
"connection": "Propriétés de connexion",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Sélectionnez une task",
|
||||
"conditions": "Sélectionner une condition",
|
||||
"default": "Sélectionner un type",
|
||||
"errors": "Sélectionner une task",
|
||||
"finally": "Sélectionnez une task",
|
||||
"inputs": "Sélectionnez un type de champ input",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ एक {what} जोड़ें",
|
||||
"adding_default": "+ एक नया value जोड़ें",
|
||||
"clearSelection": "चयन साफ़ करें",
|
||||
"close": {
|
||||
"afterExecution": "कार्य पूरा होने के बाद बंद करें",
|
||||
"conditions": "बंद करने की शर्त",
|
||||
"errors": "त्रुटि हैंडलर बंद करें",
|
||||
"finally": "टास्क बंद करें",
|
||||
"input": "इनपुट बंद करें",
|
||||
"pluginDefaults": "प्लगइन डिफ़ॉल्ट बंद करें",
|
||||
"tasks": "टास्क बंद करें",
|
||||
"triggers": "ट्रिगर बंद करें"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "एक निष्पादन के बाद ब्लॉक जोड़ें",
|
||||
"conditions": "एक शर्त जोड़ें",
|
||||
"default": "जोड़ें",
|
||||
"errors": "त्रुटि हैंडलर जोड़ें",
|
||||
"finally": "अंत में एक finally ब्लॉक जोड़ें",
|
||||
"inputs": "इनपुट फ़ील्ड जोड़ें",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "वेरिएबल",
|
||||
"yaml": "YAML संपादक"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "इस केस को हटाएं",
|
||||
"default": "इस प्रविष्टि को हटाएं"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Execution के बाद",
|
||||
"connection": "कनेक्शन गुणधर्म",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "एक task चुनें",
|
||||
"conditions": "कंडीशन चुनें",
|
||||
"default": "एक प्रकार चुनें",
|
||||
"errors": "एक task चुनें",
|
||||
"finally": "एक task चुनें",
|
||||
"inputs": "इनपुट फ़ील्ड प्रकार चुनें",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Aggiungi un {what}",
|
||||
"adding_default": "+ Aggiungi un nuovo value",
|
||||
"clearSelection": "Cancella selezione",
|
||||
"close": {
|
||||
"afterExecution": "Chiudi dopo l'esecuzione del task",
|
||||
"conditions": "Condizione di chiusura",
|
||||
"errors": "Chiudi error handler",
|
||||
"finally": "Chiudi task",
|
||||
"input": "Chiudi input",
|
||||
"pluginDefaults": "Chiudi plugin predefinito",
|
||||
"tasks": "Chiudi task",
|
||||
"triggers": "Chiudi trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Aggiungi un blocco after execution",
|
||||
"conditions": "Aggiungi una condizione",
|
||||
"default": "Aggiungi",
|
||||
"errors": "Aggiungi un gestore degli errori",
|
||||
"finally": "Aggiungi un blocco finally",
|
||||
"inputs": "Aggiungi un campo di input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variabile",
|
||||
"yaml": "Editor YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Rimuovi questo caso",
|
||||
"default": "Rimuovi questa voce"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Dopo l'Esecuzione",
|
||||
"connection": "Proprietà di connessione",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Seleziona un task",
|
||||
"conditions": "Seleziona una condizione",
|
||||
"default": "Seleziona un tipo",
|
||||
"errors": "Seleziona un task",
|
||||
"finally": "Seleziona un task",
|
||||
"inputs": "Seleziona un tipo di input field",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ {what}を追加",
|
||||
"adding_default": "+ 新しいvalueを追加",
|
||||
"clearSelection": "選択をクリア",
|
||||
"close": {
|
||||
"afterExecution": "実行後にタスクを閉じる",
|
||||
"conditions": "クローズ条件",
|
||||
"errors": "エラーハンドラーを閉じる",
|
||||
"finally": "タスクを閉じる",
|
||||
"input": "入力を閉じる",
|
||||
"pluginDefaults": "プラグインのデフォルトを閉じる",
|
||||
"tasks": "タスクを閉じる",
|
||||
"triggers": "トリガーを閉じる"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "実行後ブロックを追加",
|
||||
"conditions": "条件を追加",
|
||||
"default": "追加",
|
||||
"errors": "エラーハンドラーを追加",
|
||||
"finally": "finally ブロックを追加",
|
||||
"inputs": "入力フィールドを追加",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "変数",
|
||||
"yaml": "YAMLエディター"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "このケースを削除",
|
||||
"default": "このエントリを削除"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "実行後",
|
||||
"connection": "接続プロパティ",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "タスクを選択",
|
||||
"conditions": "条件を選択",
|
||||
"default": "タイプを選択",
|
||||
"errors": "タスクを選択",
|
||||
"finally": "タスクを選択",
|
||||
"inputs": "入力フィールドタイプを選択",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ {what} 추가",
|
||||
"adding_default": "+ 새 value 추가",
|
||||
"clearSelection": "선택 해제",
|
||||
"close": {
|
||||
"afterExecution": "실행 task 후 닫기",
|
||||
"conditions": "닫기 조건",
|
||||
"errors": "오류 처리기 닫기",
|
||||
"finally": "작업 닫기",
|
||||
"input": "입력 닫기",
|
||||
"pluginDefaults": "플러그인 기본값 닫기",
|
||||
"tasks": "작업 닫기",
|
||||
"triggers": "트리거 닫기"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "실행 후 블록 추가",
|
||||
"conditions": "조건 추가",
|
||||
"default": "추가",
|
||||
"errors": "오류 처리기 추가",
|
||||
"finally": "마지막 블록 추가",
|
||||
"inputs": "입력 필드 추가",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "변수",
|
||||
"yaml": "YAML 편집기"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "이 케이스를 제거하십시오",
|
||||
"default": "이 항목 제거"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "실행 후",
|
||||
"connection": "연결 속성",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "작업 선택",
|
||||
"conditions": "조건 선택",
|
||||
"default": "유형 선택",
|
||||
"errors": "작업 선택",
|
||||
"finally": "작업 선택",
|
||||
"inputs": "입력 필드 유형 선택",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Dodaj {what}",
|
||||
"adding_default": "+ Dodaj nową wartość",
|
||||
"clearSelection": "Wyczyść zaznaczenie",
|
||||
"close": {
|
||||
"afterExecution": "Zamknij po wykonaniu task",
|
||||
"conditions": "Zamknij warunek",
|
||||
"errors": "Zamknij obsługę błędów",
|
||||
"finally": "Zamknij task",
|
||||
"input": "Zamknij input",
|
||||
"pluginDefaults": "Zamknij domyślną wtyczkę",
|
||||
"tasks": "Zamknij task",
|
||||
"triggers": "Zamknij trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Dodaj blok po wykonaniu",
|
||||
"conditions": "Dodaj warunek",
|
||||
"default": "Dodaj",
|
||||
"errors": "Dodaj obsługę błędów",
|
||||
"finally": "Dodaj blok finally",
|
||||
"inputs": "Dodaj pole input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Zmienna",
|
||||
"yaml": "Edytor YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Usuń ten przypadek",
|
||||
"default": "Usuń ten wpis"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Po wykonaniu",
|
||||
"connection": "Właściwości połączenia",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Wybierz task",
|
||||
"conditions": "Wybierz warunek",
|
||||
"default": "Wybierz typ",
|
||||
"errors": "Wybierz task",
|
||||
"finally": "Wybierz task",
|
||||
"inputs": "Wybierz typ pola input",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Adicionar um {what}",
|
||||
"adding_default": "+ Adicionar um novo value",
|
||||
"clearSelection": "Limpar seleção",
|
||||
"close": {
|
||||
"afterExecution": "Fechar após execução da task",
|
||||
"conditions": "Condição de fechamento",
|
||||
"errors": "Fechar manipulador de erro",
|
||||
"finally": "Fechar task",
|
||||
"input": "Fechar input",
|
||||
"pluginDefaults": "Fechar plugin padrão",
|
||||
"tasks": "Fechar task",
|
||||
"triggers": "Fechar trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Adicionar um bloco após a execução",
|
||||
"conditions": "Adicionar uma condição",
|
||||
"default": "Adicionar",
|
||||
"errors": "Adicionar um manipulador de erro",
|
||||
"finally": "Adicionar um bloco finally",
|
||||
"inputs": "Adicionar um campo de input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variável",
|
||||
"yaml": "Editor YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Remover este caso",
|
||||
"default": "Remover esta entrada"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Após a Execução",
|
||||
"connection": "Propriedades de conexão",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Selecione uma task",
|
||||
"conditions": "Selecione uma condição",
|
||||
"default": "Selecione um tipo",
|
||||
"errors": "Selecione uma task",
|
||||
"finally": "Selecione uma task",
|
||||
"inputs": "Selecione um tipo de campo de input",
|
||||
|
||||
1836
ui/src/translations/pt_BR.json
Normal file
1836
ui/src/translations/pt_BR.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Добавить {what}",
|
||||
"adding_default": "+ Добавить новое value",
|
||||
"clearSelection": "Очистить выбор",
|
||||
"close": {
|
||||
"afterExecution": "Закрыть после выполнения task",
|
||||
"conditions": "Закрыть условие",
|
||||
"errors": "Закрыть обработчик ошибок",
|
||||
"finally": "Закрыть task",
|
||||
"input": "Закрыть input",
|
||||
"pluginDefaults": "Закрыть плагин по умолчанию",
|
||||
"tasks": "Закрыть task",
|
||||
"triggers": "Закрыть trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Добавить блок после выполнения",
|
||||
"conditions": "Добавить условие",
|
||||
"default": "Добавить",
|
||||
"errors": "Добавить обработчик ошибок",
|
||||
"finally": "Добавить блок finally",
|
||||
"inputs": "Добавить поле input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Переменная",
|
||||
"yaml": "Редактор YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Удалить этот кейс",
|
||||
"default": "Удалить эту запись"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "После выполнения",
|
||||
"connection": "Свойства подключения",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Выберите task",
|
||||
"conditions": "Выберите условие",
|
||||
"default": "Выберите тип",
|
||||
"errors": "Выберите task",
|
||||
"finally": "Выберите task",
|
||||
"inputs": "Выберите тип поля input",
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user