mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
39 Commits
fix/sdk-ch
...
v1.0.13
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8567ff5490 | ||
|
|
050e22dd09 | ||
|
|
3552eeefbb | ||
|
|
2e47fb8285 | ||
|
|
b52a07e562 | ||
|
|
3f7c01db41 | ||
|
|
f5dbec96e0 | ||
|
|
fe7a6d9af9 | ||
|
|
06c8c35061 | ||
|
|
8f23e813f2 | ||
|
|
47b7c7cd2e | ||
|
|
aca7c2f694 | ||
|
|
a0f29b7d5d | ||
|
|
0176c8c101 | ||
|
|
b0036bbfca | ||
|
|
fad5edbde8 | ||
|
|
f125f63ae5 | ||
|
|
6db1bfb2ce | ||
|
|
0957e07c78 | ||
|
|
5a4a5e44df | ||
|
|
faee3f1827 | ||
|
|
3604762da0 | ||
|
|
6ceb0de1d5 | ||
|
|
4a62f9c818 | ||
|
|
d14f3e3317 | ||
|
|
7e9030dfcf | ||
|
|
2fce17a8a9 | ||
|
|
67d8509106 | ||
|
|
01e92a6d79 | ||
|
|
883b7c8610 | ||
|
|
11ef823567 | ||
|
|
771cca1441 | ||
|
|
53e8674dfc | ||
|
|
59016ae1af | ||
|
|
7503d6fa21 | ||
|
|
0234a4c64c | ||
|
|
98c9c4d21f | ||
|
|
8e54183a44 | ||
|
|
8aa332c629 |
@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
|
||||
Duration ttl = expiration == null ? null : Duration.parse(expiration);
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
|
||||
.contentType(MediaType.APPLICATION_JSON_TYPE);
|
||||
.contentType(MediaType.TEXT_PLAIN);
|
||||
|
||||
if (ttl != null) {
|
||||
request.header("ttl", ttl.toString());
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
import io.kestra.core.runners.ExecutorInterface;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.core.services.StartExecutorService;
|
||||
@@ -10,6 +12,8 @@ import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -19,6 +23,9 @@ import java.util.Map;
|
||||
description = "Start the Kestra executor"
|
||||
)
|
||||
public class ExecutorCommand extends AbstractServerCommand {
|
||||
@CommandLine.Spec
|
||||
CommandLine.Model.CommandSpec spec;
|
||||
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@@ -28,22 +35,28 @@ public class ExecutorCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private StartExecutorService startExecutorService;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "The list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "Tenant identifier required to load flows from the specified path")
|
||||
private File flowPath;
|
||||
|
||||
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path")
|
||||
private String tenantId;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "List of execution IDs to skip, separated by commas; for troubleshooting only")
|
||||
private List<String> skipExecutions = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "The list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "List of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipFlows = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "The list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "List of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipNamespaces = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "The list of tenants to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "List of tenants to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipTenants = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "The list of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
|
||||
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "List of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue; for debugging only")
|
||||
private List<String> startExecutors = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--not-start-executors"}, split=",", description = "The list of Kafka Stream executors to not start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
|
||||
@CommandLine.Option(names = {"--not-start-executors"}, split=",", description = "Lst of Kafka Stream executors to not start, separated by a command. Use it only with the Kafka queue; for debugging only")
|
||||
private List<String> notStartExecutors = Collections.emptyList();
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@@ -64,6 +77,16 @@ public class ExecutorCommand extends AbstractServerCommand {
|
||||
|
||||
super.call();
|
||||
|
||||
if (flowPath != null) {
|
||||
try {
|
||||
LocalFlowRepositoryLoader localFlowRepositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
|
||||
TenantIdSelectorService tenantIdSelectorService = applicationContext.getBean(TenantIdSelectorService.class);
|
||||
localFlowRepositoryLoader.load(tenantIdSelectorService.getTenantId(this.tenantId), this.flowPath);
|
||||
} catch (IOException e) {
|
||||
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow path", e);
|
||||
}
|
||||
}
|
||||
|
||||
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
|
||||
executorService.run();
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ public class IndexerCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private SkipExecutionService skipExecutionService;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
|
||||
@@ -42,7 +42,7 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@Nullable
|
||||
private FileChangedEventListener fileWatcher;
|
||||
|
||||
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "the flow path containing flow to inject at startup (when running with a memory flow repository)")
|
||||
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "Tenant identifier required to load flows from the specified path")
|
||||
private File flowPath;
|
||||
|
||||
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
|
||||
@@ -51,19 +51,19 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
private int workerThread = defaultWorkerThread();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipExecutions = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipFlows = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipNamespaces = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipTenants = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
|
||||
|
||||
@@ -40,7 +40,7 @@ public class WebServerCommand extends AbstractServerCommand {
|
||||
@Option(names = {"--no-indexer"}, description = "Flag to disable starting an embedded indexer.")
|
||||
private boolean indexerDisabled = false;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
|
||||
private List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
@Override
|
||||
|
||||
@@ -30,15 +30,15 @@ micronaut:
|
||||
read-idle-timeout: 60m
|
||||
write-idle-timeout: 60m
|
||||
idle-timeout: 60m
|
||||
netty:
|
||||
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
|
||||
max-chunk-size: 10MB
|
||||
max-header-size: 32768 # increased from the default of 8k
|
||||
responses:
|
||||
file:
|
||||
cache-seconds: 86400
|
||||
cache-control:
|
||||
public: true
|
||||
netty:
|
||||
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
|
||||
max-chunk-size: 10MB
|
||||
max-header-size: 32768 # increased from the default of 8k
|
||||
|
||||
# Access log configuration, see https://docs.micronaut.io/latest/guide/index.html#accessLogger
|
||||
access-logger:
|
||||
|
||||
@@ -1,16 +1,33 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interface that can be implemented by classes supporting plugin versioning.
|
||||
*
|
||||
* @see Plugin
|
||||
*/
|
||||
public interface PluginVersioning {
|
||||
|
||||
String TITLE = "Plugin Version";
|
||||
String DESCRIPTION = """
|
||||
Defines the version of the plugin to use.
|
||||
|
||||
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
|
||||
@Schema(title = "The version of the plugin to use.")
|
||||
The version must follow the Semantic Versioning (SemVer) specification:
|
||||
- A single-digit MAJOR version (e.g., `1`).
|
||||
- A MAJOR.MINOR version (e.g., `1.1`).
|
||||
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
|
||||
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
|
||||
""";
|
||||
|
||||
@Schema(
|
||||
title = TITLE,
|
||||
description = DESCRIPTION
|
||||
)
|
||||
String getVersion();
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
import io.kestra.plugin.core.dashboard.data.IData;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
@@ -36,6 +38,8 @@ public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F
|
||||
private Map<String, C> columns;
|
||||
|
||||
@Setter
|
||||
@Valid
|
||||
@Nullable
|
||||
private List<AbstractFilter<F>> where;
|
||||
|
||||
private List<OrderBy> orderBy;
|
||||
|
||||
@@ -28,6 +28,7 @@ import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
@@ -77,10 +78,12 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
||||
@Schema(implementation = Object.class)
|
||||
Map<String, Object> inputs;
|
||||
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
||||
@Schema(implementation = Object.class)
|
||||
Map<String, Object> outputs;
|
||||
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@@ -88,6 +91,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
List<Label> labels;
|
||||
|
||||
@With
|
||||
@Schema(implementation = Object.class)
|
||||
Map<String, Object> variables;
|
||||
|
||||
@NotNull
|
||||
|
||||
@@ -3,10 +3,13 @@ package io.kestra.core.models.executions;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
@@ -53,6 +56,7 @@ public class TaskRun implements TenantInterface {
|
||||
@With
|
||||
@JsonInclude(JsonInclude.Include.ALWAYS)
|
||||
@Nullable
|
||||
@Schema(implementation = Object.class)
|
||||
Variables outputs;
|
||||
|
||||
@NotNull
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.io.File;
|
||||
@@ -33,7 +34,7 @@ public record PluginArtifact(
|
||||
String version,
|
||||
URI uri
|
||||
) implements Comparable<PluginArtifact> {
|
||||
|
||||
|
||||
private static final Pattern ARTIFACT_PATTERN = Pattern.compile(
|
||||
"([^: ]+):([^: ]+)(:([^: ]*)(:([^: ]+))?)?:([^: ]+)"
|
||||
);
|
||||
@@ -42,7 +43,8 @@ public record PluginArtifact(
|
||||
);
|
||||
|
||||
public static final String JAR_EXTENSION = "jar";
|
||||
|
||||
public static final String KESTRA_GROUP_ID = "io.kestra";
|
||||
|
||||
/**
|
||||
* Static helper method for constructing a new {@link PluginArtifact} from a JAR file.
|
||||
*
|
||||
@@ -135,6 +137,11 @@ public record PluginArtifact(
|
||||
public String toString() {
|
||||
return toCoordinates();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isOfficial() {
|
||||
return groupId.startsWith(KESTRA_GROUP_ID);
|
||||
}
|
||||
|
||||
public String toCoordinates() {
|
||||
return Stream.of(groupId, artifactId, extension, classifier, version)
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.Version;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpMethod;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.HttpResponse;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.HttpClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -15,9 +19,12 @@ import java.util.Base64;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Services for retrieving available plugin artifacts for Kestra.
|
||||
@@ -39,6 +46,8 @@ public class PluginCatalogService {
|
||||
|
||||
private final boolean icons;
|
||||
private final boolean oss;
|
||||
|
||||
private final Version currentStableVersion;
|
||||
|
||||
/**
|
||||
* Creates a new {@link PluginCatalogService} instance.
|
||||
@@ -53,11 +62,55 @@ public class PluginCatalogService {
|
||||
this.httpClient = httpClient;
|
||||
this.icons = icons;
|
||||
this.oss = communityOnly;
|
||||
|
||||
|
||||
Version version = Version.of(KestraContext.getContext().getVersion());
|
||||
this.currentStableVersion = new Version(version.majorVersion(), version.minorVersion(), version.patchVersion(), null);
|
||||
|
||||
// Immediately trigger an async load of plugin artifacts.
|
||||
this.isLoaded.set(true);
|
||||
this.plugins = CompletableFuture.supplyAsync(this::load);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the version for the given artifacts.
|
||||
*
|
||||
* @param artifacts The list of artifacts to resolve.
|
||||
* @return The list of results.
|
||||
*/
|
||||
public List<PluginResolutionResult> resolveVersions(List<PluginArtifact> artifacts) {
|
||||
if (ListUtils.isEmpty(artifacts)) {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
final Map<String, ApiPluginArtifact> pluginsByGroupAndArtifactId = getAllCompatiblePlugins().stream()
|
||||
.collect(Collectors.toMap(it -> it.groupId() + ":" + it.artifactId(), Function.identity()));
|
||||
|
||||
return artifacts.stream().map(it -> {
|
||||
// Get all compatible versions for current artifact
|
||||
List<String> versions = Optional
|
||||
.ofNullable(pluginsByGroupAndArtifactId.get(it.groupId() + ":" + it.artifactId()))
|
||||
.map(ApiPluginArtifact::versions)
|
||||
.orElse(List.of());
|
||||
|
||||
// Try to resolve the version
|
||||
String resolvedVersion = null;
|
||||
if (!versions.isEmpty()) {
|
||||
if (it.version().equalsIgnoreCase("LATEST")) {
|
||||
resolvedVersion = versions.getFirst();
|
||||
} else {
|
||||
resolvedVersion = versions.contains(it.version()) ? it.version() : null;
|
||||
}
|
||||
}
|
||||
|
||||
// Build the PluginResolutionResult
|
||||
return new PluginResolutionResult(
|
||||
it,
|
||||
resolvedVersion,
|
||||
versions,
|
||||
resolvedVersion != null
|
||||
);
|
||||
}).toList();
|
||||
}
|
||||
|
||||
public synchronized List<PluginManifest> get() {
|
||||
try {
|
||||
@@ -140,7 +193,27 @@ public class PluginCatalogService {
|
||||
isLoaded.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
private List<ApiPluginArtifact> getAllCompatiblePlugins() {
|
||||
|
||||
MutableHttpRequest<Object> request = HttpRequest.create(
|
||||
HttpMethod.GET,
|
||||
"/v1/plugins/artifacts/core-compatibility/" + currentStableVersion
|
||||
);
|
||||
if (oss) {
|
||||
request.getParameters().add("license", "OPENSOURCE");
|
||||
}
|
||||
try {
|
||||
return httpClient
|
||||
.toBlocking()
|
||||
.exchange(request, Argument.listOf(ApiPluginArtifact.class))
|
||||
.body();
|
||||
} catch (Exception e) {
|
||||
log.debug("Failed to retrieve available plugins from Kestra API. Cause: ", e);
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
public record PluginManifest(
|
||||
String title,
|
||||
String icon,
|
||||
@@ -153,4 +226,11 @@ public class PluginCatalogService {
|
||||
return groupId + ":" + artifactId + ":LATEST";
|
||||
}
|
||||
}
|
||||
|
||||
public record ApiPluginArtifact(
|
||||
String groupId,
|
||||
String artifactId,
|
||||
String license,
|
||||
List<String> versions
|
||||
) {}
|
||||
}
|
||||
|
||||
@@ -144,7 +144,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
|
||||
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
|
||||
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
|
||||
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
|
||||
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::asText).orElse(null);
|
||||
|
||||
if (type == null || type.isEmpty()) {
|
||||
return null;
|
||||
|
||||
@@ -82,8 +82,7 @@ public abstract class FilesService {
|
||||
}
|
||||
|
||||
private static String resolveUniqueNameForFile(final Path path) {
|
||||
String filename = path.getFileName().toString();
|
||||
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
|
||||
return IdUtils.from(path.toString()) + "-" + encodedFilename;
|
||||
String filename = path.getFileName().toString().replace(' ', '+');
|
||||
return IdUtils.from(path.toString()) + "-" + filename;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,10 +151,7 @@ abstract class AbstractFileFunction implements Function {
|
||||
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
|
||||
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);
|
||||
|
||||
if (!isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path)) {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the parent execution");
|
||||
}
|
||||
return true;
|
||||
return isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -383,6 +383,7 @@ public class ExecutionService {
|
||||
if (!isFlowable || s.equals(taskRunId)) {
|
||||
TaskRun newTaskRun;
|
||||
|
||||
State.Type targetState = newState;
|
||||
if (task instanceof Pause pauseTask) {
|
||||
State.Type terminalState = newState == State.Type.RUNNING ? State.Type.SUCCESS : newState;
|
||||
Pause.Resumed _resumed = resumed != null ? resumed : Pause.Resumed.now(terminalState);
|
||||
@@ -392,23 +393,23 @@ public class ExecutionService {
|
||||
// if it's a Pause task with no subtask, we terminate the task
|
||||
if (ListUtils.isEmpty(pauseTask.getTasks()) && ListUtils.isEmpty(pauseTask.getErrors()) && ListUtils.isEmpty(pauseTask.getFinally())) {
|
||||
if (newState == State.Type.RUNNING) {
|
||||
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
|
||||
targetState = State.Type.SUCCESS;
|
||||
} else if (newState == State.Type.KILLING) {
|
||||
newTaskRun = newTaskRun.withState(State.Type.KILLED);
|
||||
} else {
|
||||
newTaskRun = newTaskRun.withState(newState);
|
||||
targetState = State.Type.KILLED;
|
||||
}
|
||||
} else {
|
||||
// we should set the state to RUNNING so that subtasks are executed
|
||||
newTaskRun = newTaskRun.withState(State.Type.RUNNING);
|
||||
targetState = State.Type.RUNNING;
|
||||
}
|
||||
newTaskRun = newTaskRun.withState(targetState);
|
||||
} else {
|
||||
newTaskRun = originalTaskRun.withState(newState);
|
||||
newTaskRun = originalTaskRun.withState(targetState);
|
||||
}
|
||||
|
||||
|
||||
if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {
|
||||
ArrayList<TaskRunAttempt> attempts = new ArrayList<>(originalTaskRun.getAttempts());
|
||||
attempts.set(attempts.size() - 1, attempts.getLast().withState(newState));
|
||||
attempts.set(attempts.size() - 1, attempts.getLast().withState(targetState));
|
||||
newTaskRun = newTaskRun.withAttempts(attempts);
|
||||
}
|
||||
|
||||
|
||||
@@ -32,48 +32,84 @@ public class Version implements Comparable<Version> {
|
||||
* @param version the version.
|
||||
* @return a new {@link Version} instance.
|
||||
*/
|
||||
public static Version of(String version) {
|
||||
public static Version of(final Object version) {
|
||||
|
||||
if (version.startsWith("v")) {
|
||||
version = version.substring(1);
|
||||
if (Objects.isNull(version)) {
|
||||
throw new IllegalArgumentException("Invalid version, cannot parse null version");
|
||||
}
|
||||
|
||||
String strVersion = version.toString();
|
||||
|
||||
if (strVersion.startsWith("v")) {
|
||||
strVersion = strVersion.substring(1);
|
||||
}
|
||||
|
||||
int qualifier = version.indexOf("-");
|
||||
int qualifier = strVersion.indexOf("-");
|
||||
|
||||
final String[] versions = qualifier > 0 ?
|
||||
version.substring(0, qualifier).split("\\.") :
|
||||
version.split("\\.");
|
||||
strVersion.substring(0, qualifier).split("\\.") :
|
||||
strVersion.split("\\.");
|
||||
try {
|
||||
final int majorVersion = Integer.parseInt(versions[0]);
|
||||
final int minorVersion = versions.length > 1 ? Integer.parseInt(versions[1]) : 0;
|
||||
final int incrementalVersion = versions.length > 2 ? Integer.parseInt(versions[2]) : 0;
|
||||
final Integer minorVersion = versions.length > 1 ? Integer.parseInt(versions[1]) : null;
|
||||
final Integer incrementalVersion = versions.length > 2 ? Integer.parseInt(versions[2]) : null;
|
||||
|
||||
return new Version(
|
||||
majorVersion,
|
||||
minorVersion,
|
||||
incrementalVersion,
|
||||
qualifier > 0 ? version.substring(qualifier + 1) : null,
|
||||
version
|
||||
qualifier > 0 ? strVersion.substring(qualifier + 1) : null,
|
||||
strVersion
|
||||
);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Invalid version, cannot parse '" + version + "'");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Static helper method for returning the most recent stable version for a current {@link Version}.
|
||||
* Resolves the most appropriate stable version from a collection, based on a given input version.
|
||||
* <p>
|
||||
* The matching rules are:
|
||||
* <ul>
|
||||
* <li>If {@code from} specifies only a major version (e.g. {@code 1}), return the latest stable version
|
||||
* with the same major (e.g. {@code 1.2.3}).</li>
|
||||
* <li>If {@code from} specifies a major and minor version only (e.g. {@code 1.2}), return the latest
|
||||
* stable version with the same major and minor (e.g. {@code 1.2.3}).</li>
|
||||
* <li>If {@code from} specifies a full version with major, minor, and patch (e.g. {@code 1.2.2}),
|
||||
* then only return it if it is exactly present (and stable) in {@code versions}.
|
||||
* No "upgrade" is performed in this case.</li>
|
||||
* <li>If no suitable version is found, returns {@code null}.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param from the current version.
|
||||
* @param versions the list of version.
|
||||
*
|
||||
* @return the last stable version.
|
||||
* @param from the reference version (may specify only major, or major+minor, or major+minor+patch).
|
||||
* @param versions the collection of candidate versions to resolve against.
|
||||
* @return the best matching stable version, or {@code null} if none match.
|
||||
*/
|
||||
public static Version getStable(final Version from, final Collection<Version> versions) {
|
||||
List<Version> compatibleVersions = versions.stream()
|
||||
.filter(v -> v.majorVersion() == from.majorVersion() && v.minorVersion() == from.minorVersion())
|
||||
.toList();
|
||||
if (compatibleVersions.isEmpty()) return null;
|
||||
return Version.getLatest(compatibleVersions);
|
||||
// Case 1: "from" is only a major (e.g. 1)
|
||||
if (from.hasOnlyMajor()) {
|
||||
List<Version> sameMajor = versions.stream()
|
||||
.filter(v -> v.majorVersion() == from.majorVersion())
|
||||
.toList();
|
||||
return sameMajor.isEmpty() ? null : Version.getLatest(sameMajor);
|
||||
}
|
||||
|
||||
// Case 2: "from" is major+minor only (e.g. 1.2)
|
||||
if (from.hasMajorAndMinorOnly()) {
|
||||
List<Version> sameMinor = versions.stream()
|
||||
.filter(v -> v.majorVersion() == from.majorVersion()
|
||||
&& v.minorVersion() == from.minorVersion())
|
||||
.toList();
|
||||
return sameMinor.isEmpty() ? null : Version.getLatest(sameMinor);
|
||||
}
|
||||
|
||||
// Case 3: "from" is full version (major+minor+patch)
|
||||
if (versions.contains(from)) {
|
||||
return from;
|
||||
}
|
||||
|
||||
// No match
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -123,8 +159,8 @@ public class Version implements Comparable<Version> {
|
||||
}
|
||||
|
||||
private final int majorVersion;
|
||||
private final int minorVersion;
|
||||
private final int incrementalVersion;
|
||||
private final Integer minorVersion;
|
||||
private final Integer patchVersion;
|
||||
private final Qualifier qualifier;
|
||||
|
||||
private final String originalVersion;
|
||||
@@ -134,14 +170,14 @@ public class Version implements Comparable<Version> {
|
||||
*
|
||||
* @param majorVersion the major version (must be superior or equal to 0).
|
||||
* @param minorVersion the minor version (must be superior or equal to 0).
|
||||
* @param incrementalVersion the incremental version (must be superior or equal to 0).
|
||||
* @param patchVersion the incremental version (must be superior or equal to 0).
|
||||
* @param qualifier the qualifier.
|
||||
*/
|
||||
public Version(final int majorVersion,
|
||||
final int minorVersion,
|
||||
final int incrementalVersion,
|
||||
final int patchVersion,
|
||||
final String qualifier) {
|
||||
this(majorVersion, minorVersion, incrementalVersion, qualifier, null);
|
||||
this(majorVersion, minorVersion, patchVersion, qualifier, null);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -149,25 +185,25 @@ public class Version implements Comparable<Version> {
|
||||
*
|
||||
* @param majorVersion the major version (must be superior or equal to 0).
|
||||
* @param minorVersion the minor version (must be superior or equal to 0).
|
||||
* @param incrementalVersion the incremental version (must be superior or equal to 0).
|
||||
* @param patchVersion the incremental version (must be superior or equal to 0).
|
||||
* @param qualifier the qualifier.
|
||||
* @param originalVersion the original string version.
|
||||
*/
|
||||
private Version(final int majorVersion,
|
||||
final int minorVersion,
|
||||
final int incrementalVersion,
|
||||
private Version(final Integer majorVersion,
|
||||
final Integer minorVersion,
|
||||
final Integer patchVersion,
|
||||
final String qualifier,
|
||||
final String originalVersion) {
|
||||
this.majorVersion = requirePositive(majorVersion, "major");
|
||||
this.minorVersion = requirePositive(minorVersion, "minor");
|
||||
this.incrementalVersion = requirePositive(incrementalVersion, "incremental");
|
||||
this.patchVersion = requirePositive(patchVersion, "incremental");
|
||||
this.qualifier = qualifier != null ? new Qualifier(qualifier) : null;
|
||||
this.originalVersion = originalVersion;
|
||||
}
|
||||
|
||||
|
||||
private static int requirePositive(int version, final String message) {
|
||||
if (version < 0) {
|
||||
private static Integer requirePositive(Integer version, final String message) {
|
||||
if (version != null && version < 0) {
|
||||
throw new IllegalArgumentException(String.format("The '%s' version must super or equal to 0", message));
|
||||
}
|
||||
return version;
|
||||
@@ -178,11 +214,11 @@ public class Version implements Comparable<Version> {
|
||||
}
|
||||
|
||||
public int minorVersion() {
|
||||
return minorVersion;
|
||||
return minorVersion != null ? minorVersion : 0;
|
||||
}
|
||||
|
||||
public int incrementalVersion() {
|
||||
return incrementalVersion;
|
||||
public int patchVersion() {
|
||||
return patchVersion != null ? patchVersion : 0;
|
||||
}
|
||||
|
||||
public Qualifier qualifier() {
|
||||
@@ -197,9 +233,9 @@ public class Version implements Comparable<Version> {
|
||||
if (this == o) return true;
|
||||
if (!(o instanceof Version)) return false;
|
||||
Version version = (Version) o;
|
||||
return majorVersion == version.majorVersion &&
|
||||
minorVersion == version.minorVersion &&
|
||||
incrementalVersion == version.incrementalVersion &&
|
||||
return Objects.equals(majorVersion,version.majorVersion) &&
|
||||
Objects.equals(minorVersion, version.minorVersion) &&
|
||||
Objects.equals(patchVersion,version.patchVersion) &&
|
||||
Objects.equals(qualifier, version.qualifier);
|
||||
}
|
||||
|
||||
@@ -208,7 +244,7 @@ public class Version implements Comparable<Version> {
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(majorVersion, minorVersion, incrementalVersion, qualifier);
|
||||
return Objects.hash(majorVersion, minorVersion, patchVersion, qualifier);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -218,7 +254,7 @@ public class Version implements Comparable<Version> {
|
||||
public String toString() {
|
||||
if (originalVersion != null) return originalVersion;
|
||||
|
||||
String version = majorVersion + "." + minorVersion + "." + incrementalVersion;
|
||||
String version = majorVersion + "." + minorVersion + "." + patchVersion;
|
||||
return (qualifier != null) ? version +"-" + qualifier : version;
|
||||
}
|
||||
|
||||
@@ -238,7 +274,7 @@ public class Version implements Comparable<Version> {
|
||||
return compareMinor;
|
||||
}
|
||||
|
||||
int compareIncremental = Integer.compare(that.incrementalVersion, this.incrementalVersion);
|
||||
int compareIncremental = Integer.compare(that.patchVersion, this.patchVersion);
|
||||
if (compareIncremental != 0) {
|
||||
return compareIncremental;
|
||||
}
|
||||
@@ -253,6 +289,21 @@ public class Version implements Comparable<Version> {
|
||||
|
||||
return this.qualifier.compareTo(that.qualifier);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return true if only major is specified (e.g. "1")
|
||||
*/
|
||||
private boolean hasOnlyMajor() {
|
||||
return minorVersion == null && patchVersion == null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if major+minor are specified, but no patch (e.g. "1.2")
|
||||
*/
|
||||
private boolean hasMajorAndMinorOnly() {
|
||||
return minorVersion != null && patchVersion == null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether this version is before the given one.
|
||||
|
||||
@@ -33,11 +33,13 @@ public class ExecutionsDataFilterValidator implements ConstraintValidator<Execut
|
||||
}
|
||||
});
|
||||
|
||||
executionsDataFilter.getWhere().forEach(filter -> {
|
||||
if (filter.getField() == Executions.Fields.LABELS && filter.getLabelKey() == null) {
|
||||
violations.add("Label filters must have a `labelKey`.");
|
||||
}
|
||||
});
|
||||
if (executionsDataFilter.getWhere() != null) {
|
||||
executionsDataFilter.getWhere().forEach(filter -> {
|
||||
if (filter.getField() == Executions.Fields.LABELS && filter.getLabelKey() == null) {
|
||||
violations.add("Label filters must have a `labelKey`.");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (!violations.isEmpty()) {
|
||||
context.disableDefaultConstraintViolation();
|
||||
|
||||
@@ -20,8 +20,6 @@ import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@@ -60,7 +58,15 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
public class Download extends AbstractHttp implements RunnableTask<Download.Output> {
|
||||
@Schema(title = "Should the task fail when downloading an empty file.")
|
||||
@Builder.Default
|
||||
private final Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
|
||||
private Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
|
||||
|
||||
@Schema(
|
||||
title = "Name of the file inside the output.",
|
||||
description = """
|
||||
If not provided, the filename will be extracted from the `Content-Disposition` header.
|
||||
If no `Content-Disposition` header, a name would be generated."""
|
||||
)
|
||||
private Property<String> saveAs;
|
||||
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
Logger logger = runContext.logger();
|
||||
@@ -111,20 +117,22 @@ public class Download extends AbstractHttp implements RunnableTask<Download.Outp
|
||||
}
|
||||
}
|
||||
|
||||
String filename = null;
|
||||
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
|
||||
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
|
||||
filename = filenameFromHeader(runContext, contentDisposition);
|
||||
}
|
||||
if (filename != null) {
|
||||
filename = URLEncoder.encode(filename, StandardCharsets.UTF_8);
|
||||
String rFilename = runContext.render(this.saveAs).as(String.class).orElse(null);
|
||||
if (rFilename == null) {
|
||||
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
|
||||
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
|
||||
rFilename = filenameFromHeader(runContext, contentDisposition);
|
||||
if (rFilename != null) {
|
||||
rFilename = rFilename.replace(' ', '+');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("File '{}' downloaded with size '{}'", from, size);
|
||||
|
||||
return Output.builder()
|
||||
.code(response.getStatus().getCode())
|
||||
.uri(runContext.storage().putFile(tempFile, filename))
|
||||
.uri(runContext.storage().putFile(tempFile, rFilename))
|
||||
.headers(response.getHeaders().map())
|
||||
.length(size.get())
|
||||
.build();
|
||||
|
||||
@@ -330,7 +330,7 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("1_each", List.of()).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getAttempts()).isNull();
|
||||
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
restart = executionService.markAs(execution, flow, execution.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getId(), State.Type.FAILED);
|
||||
|
||||
@@ -441,6 +441,7 @@ class ExecutionServiceTest {
|
||||
|
||||
assertThat(killed.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
|
||||
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(killed.getState().getHistories()).hasSize(5);
|
||||
}
|
||||
|
||||
|
||||
@@ -103,28 +103,28 @@ class FilesServiceTest {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
Path fileWithSpace = tempDir.resolve("with space.txt");
|
||||
Path fileWithUnicode = tempDir.resolve("สวัสดี.txt");
|
||||
Path fileWithUnicode = tempDir.resolve("สวัสดี&.txt");
|
||||
|
||||
Files.writeString(fileWithSpace, "content");
|
||||
Files.writeString(fileWithUnicode, "content");
|
||||
|
||||
Path targetFileWithSpace = runContext.workingDir().path().resolve("with space.txt");
|
||||
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี.txt");
|
||||
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี&.txt");
|
||||
|
||||
Files.copy(fileWithSpace, targetFileWithSpace);
|
||||
Files.copy(fileWithUnicode, targetFileWithUnicode);
|
||||
|
||||
Map<String, URI> outputFiles = FilesService.outputFiles(
|
||||
runContext,
|
||||
List.of("with space.txt", "สวัสดี.txt")
|
||||
List.of("with space.txt", "สวัสดี&.txt")
|
||||
);
|
||||
|
||||
assertThat(outputFiles).hasSize(2);
|
||||
assertThat(outputFiles).containsKey("with space.txt");
|
||||
assertThat(outputFiles).containsKey("สวัสดี.txt");
|
||||
assertThat(outputFiles).containsKey("สวัสดี&.txt");
|
||||
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("with space.txt"))).isNotNull();
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี.txt"))).isNotNull();
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี&.txt"))).isNotNull();
|
||||
}
|
||||
|
||||
private URI createFile() throws IOException {
|
||||
|
||||
@@ -109,33 +109,6 @@ public class FileSizeFunctionTest {
|
||||
assertThat(size).isEqualTo(FILE_SIZE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldThrowIllegalArgumentException_givenTrigger_andParentExecution_andMissingNamespace() throws IOException {
|
||||
String executionId = IdUtils.create();
|
||||
URI internalStorageURI = getInternalStorageURI(executionId);
|
||||
URI internalStorageFile = getInternalStorageFile(internalStorageURI);
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "subflow",
|
||||
"namespace", NAMESPACE,
|
||||
"tenantId", MAIN_TENANT),
|
||||
"execution", Map.of("id", IdUtils.create()),
|
||||
"trigger", Map.of(
|
||||
"flowId", FLOW,
|
||||
"executionId", executionId,
|
||||
"tenantId", MAIN_TENANT
|
||||
)
|
||||
);
|
||||
|
||||
Exception ex = assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> variableRenderer.render("{{ fileSize('" + internalStorageFile + "') }}", variables)
|
||||
);
|
||||
|
||||
assertTrue(ex.getMessage().startsWith("Unable to read the file"), "Exception message doesn't match expected one");
|
||||
}
|
||||
|
||||
@Test
|
||||
void returnsCorrectSize_givenUri_andCurrentExecution() throws IOException, IllegalVariableEvaluationException {
|
||||
String executionId = IdUtils.create();
|
||||
|
||||
@@ -256,6 +256,27 @@ class ReadFileFunctionTest {
|
||||
assertThat(variableRenderer.render("{{ read(nsfile) }}", variables)).isEqualTo("Hello World");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReadChildFileEvenIfTrigger() throws IOException, IllegalVariableEvaluationException {
|
||||
String namespace = "my.namespace";
|
||||
String flowId = "flow";
|
||||
String executionId = IdUtils.create();
|
||||
URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
|
||||
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
|
||||
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "flow",
|
||||
"namespace", "notme",
|
||||
"tenantId", MAIN_TENANT),
|
||||
"execution", Map.of("id", "notme"),
|
||||
"trigger", Map.of("namespace", "notme", "flowId", "parent", "executionId", "parent")
|
||||
);
|
||||
|
||||
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
|
||||
assertThat(render).isEqualTo("Hello from a task output");
|
||||
}
|
||||
|
||||
private URI createFile() throws IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
Files.write(tempFile.toPath(), "Hello World".getBytes());
|
||||
|
||||
@@ -5,8 +5,17 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
class VersionTest {
|
||||
import static org.assertj.core.api.Assertions.assertThatObject;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
class VersionTest {
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromIntegerGivenMajorVersion() {
|
||||
Version version = Version.of(1);
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromStringGivenMajorVersion() {
|
||||
Version version = Version.of("1");
|
||||
@@ -21,27 +30,27 @@ class VersionTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromStringGivenMajorMinorIncrementVersion() {
|
||||
void shouldCreateVersionFromStringGivenMajorMinorPatchVersion() {
|
||||
Version version = Version.of("1.2.3");
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
Assertions.assertEquals(2, version.minorVersion());
|
||||
Assertions.assertEquals(3, version.incrementalVersion());
|
||||
Assertions.assertEquals(3, version.patchVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromPrefixedStringGivenMajorMinorIncrementVersion() {
|
||||
void shouldCreateVersionFromPrefixedStringGivenMajorMinorPatchVersion() {
|
||||
Version version = Version.of("v1.2.3");
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
Assertions.assertEquals(2, version.minorVersion());
|
||||
Assertions.assertEquals(3, version.incrementalVersion());
|
||||
Assertions.assertEquals(3, version.patchVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateVersionFromStringGivenMajorMinorIncrementAndQualifierVersion() {
|
||||
void shouldCreateVersionFromStringGivenMajorMinorPatchAndQualifierVersion() {
|
||||
Version version = Version.of("1.2.3-SNAPSHOT");
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
Assertions.assertEquals(2, version.minorVersion());
|
||||
Assertions.assertEquals(3, version.incrementalVersion());
|
||||
Assertions.assertEquals(3, version.patchVersion());
|
||||
Assertions.assertEquals("SNAPSHOT", version.qualifier().toString());
|
||||
}
|
||||
|
||||
@@ -50,7 +59,7 @@ class VersionTest {
|
||||
Version version = Version.of("1.2.3-RC0-SNAPSHOT");
|
||||
Assertions.assertEquals(1, version.majorVersion());
|
||||
Assertions.assertEquals(2, version.minorVersion());
|
||||
Assertions.assertEquals(3, version.incrementalVersion());
|
||||
Assertions.assertEquals(3, version.patchVersion());
|
||||
Assertions.assertEquals("RC0-SNAPSHOT", version.qualifier().toString());
|
||||
}
|
||||
|
||||
@@ -76,13 +85,13 @@ class VersionTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetLatestVersionGivenMajorMinorIncrementalVersions() {
|
||||
void shouldGetLatestVersionGivenMajorMinorPatchVersions() {
|
||||
Version result = Version.getLatest(Version.of("1.0.9"), Version.of("1.0.10"), Version.of("1.0.11"));
|
||||
Assertions.assertEquals(Version.of("1.0.11"), result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetOldestVersionGivenMajorMinorIncrementalVersions() {
|
||||
public void shouldGetOldestVersionGivenMajorMinorPatchVersions() {
|
||||
Version result = Version.getOldest(Version.of("1.0.9"), Version.of("1.0.10"), Version.of("1.0.11"));
|
||||
Assertions.assertEquals(Version.of("1.0.9"), result);
|
||||
}
|
||||
@@ -135,14 +144,50 @@ class VersionTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetStableVersionGivenMajorMinorVersions() {
|
||||
Version result = Version.getStable(Version.of("1.2.0"), List.of(Version.of("1.2.1"), Version.of("1.2.2"), Version.of("0.99.0")));
|
||||
Assertions.assertEquals(Version.of("1.2.2"), result);
|
||||
public void shouldGetStableVersionGivenMajorMinorPatchVersion() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("1.2.1"), versions)).isEqualTo(Version.of("1.2.1"));
|
||||
assertThatObject(Version.getStable(Version.of("1.2.0"), versions)).isNull();
|
||||
assertThatObject(Version.getStable(Version.of("1.2.4"), versions)).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetStableGivenMajorAndMinorVersionOnly() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("1.2"), versions)).isEqualTo(Version.of("1.2.3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetStableGivenMajorVersionOnly() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("1"), versions)).isEqualTo(Version.of("1.2.3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetNullForStableVersionGivenNoCompatibleVersions() {
|
||||
Version result = Version.getStable(Version.of("1.2.0"), List.of(Version.of("1.3.0"), Version.of("2.0.0"), Version.of("0.99.0")));
|
||||
Assertions.assertNull(result);
|
||||
public void shouldGetNullForStableGivenMajorAndMinorVersionOnly() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("2.0"), versions)).isNull();
|
||||
assertThatObject(Version.getStable(Version.of("0.1"), versions)).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetNullForStableGivenMajorVersionOnly() {
|
||||
// Given
|
||||
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
|
||||
|
||||
// When - Then
|
||||
assertThatObject(Version.getStable(Version.of("2"), versions)).isNull();
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRunAttempt;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
@@ -11,6 +12,7 @@ import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@@ -28,11 +30,15 @@ class IfTest {
|
||||
void ifTruthy() throws TimeoutException, QueueException {
|
||||
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "if-condition", null,
|
||||
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120));
|
||||
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("if").getFirst().getAttempts();
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(2);
|
||||
assertThat(execution.findTaskRunsByTaskId("when-true").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
assertThat(flowableAttempts).isNotNull();
|
||||
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "if-condition", null,
|
||||
(f, e) -> Map.of("param", "true") , Duration.ofSeconds(120));
|
||||
|
||||
|
||||
@@ -5,22 +5,32 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRunAttempt;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import java.util.List;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
class SequentialTest {
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/sequential.yaml")
|
||||
void sequential(Execution execution) {
|
||||
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("1-seq").getFirst().getAttempts();
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(11);
|
||||
assertThat(flowableAttempts).isNotNull();
|
||||
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/sequential-with-global-errors.yaml")
|
||||
void sequentialWithGlobalErrors(Execution execution) {
|
||||
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("parent-seq").getFirst().getAttempts();
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(6);
|
||||
assertThat(flowableAttempts).isNotNull();
|
||||
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
}
|
||||
|
||||
|
||||
@@ -156,6 +156,26 @@ class DownloadTest {
|
||||
assertThat(output.getUri().toString()).endsWith("filename.jpg");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void fileNameShouldOverrideContentDisposition() throws Exception {
|
||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
Download task = Download.builder()
|
||||
.id(DownloadTest.class.getSimpleName())
|
||||
.type(DownloadTest.class.getName())
|
||||
.uri(Property.ofValue(embeddedServer.getURI() + "/content-disposition"))
|
||||
.saveAs(Property.ofValue("hardcoded-filename.jpg"))
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
|
||||
|
||||
Download.Output output = task.run(runContext);
|
||||
|
||||
assertThat(output.getUri().toString()).endsWith("hardcoded-filename.jpg");
|
||||
}
|
||||
|
||||
@Test
|
||||
void contentDispositionWithPath() throws Exception {
|
||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||
|
||||
@@ -247,7 +247,7 @@ public class ExecutorService {
|
||||
// first find the normal ended child tasks and send result
|
||||
Optional<State.Type> state;
|
||||
try {
|
||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||
} catch (Exception e) {
|
||||
// This will lead to the next task being still executed, but at least Kestra will not crash.
|
||||
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
|
||||
@@ -268,9 +268,17 @@ public class ExecutorService {
|
||||
Output outputs = flowableParent.outputs(runContext);
|
||||
Map<String, Object> outputMap = MapUtils.merge(workerTaskResult.getTaskRun().getOutputs(), outputs == null ? null : outputs.toMap());
|
||||
Variables variables = variablesService.of(StorageContext.forTask(workerTaskResult.getTaskRun()), outputMap);
|
||||
/// flowable attempt state transition to terminated
|
||||
List<TaskRunAttempt> attempts = Optional.ofNullable(parentTaskRun.getAttempts())
|
||||
.map(ArrayList::new)
|
||||
.orElseGet(ArrayList::new);
|
||||
State.Type endedState=endedTask.get().getTaskRun().getState().getCurrent();
|
||||
TaskRunAttempt updated = attempts.getLast().withState(endedState);
|
||||
attempts.set( attempts.size() - 1, updated);
|
||||
return Optional.of(new WorkerTaskResult(workerTaskResult
|
||||
.getTaskRun()
|
||||
.withOutputs(variables)
|
||||
.withAttempts(attempts)
|
||||
));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().error("Unable to resolve outputs from the Flowable task: {}", e.getMessage(), e);
|
||||
@@ -320,7 +328,6 @@ public class ExecutorService {
|
||||
|
||||
private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun) throws InternalException {
|
||||
Task parent = executor.getFlow().findTaskByTaskId(parentTaskRun.getTaskId());
|
||||
|
||||
if (parent instanceof FlowableTask<?> flowableParent) {
|
||||
// Count the number of flowable tasks executions, some flowable are being called multiple times,
|
||||
// so this is not exactly the number of flowable taskruns but the number of times they are executed.
|
||||
@@ -375,6 +382,7 @@ public class ExecutorService {
|
||||
Output outputs = flowableTask.outputs(runContext);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
|
||||
taskRun = taskRun.withOutputs(variables);
|
||||
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Unable to save output on taskRun '{}'", taskRun, e);
|
||||
}
|
||||
|
||||
@@ -50,16 +50,147 @@ public class FlowTriggerService {
|
||||
.map(io.kestra.plugin.core.trigger.Flow.class::cast);
|
||||
}
|
||||
|
||||
public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<? extends Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
|
||||
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
|
||||
/**
|
||||
* This method computes executions to trigger from flow triggers from a given execution.
|
||||
* It only computes those depending on standard (non-multiple / non-preconditions) conditions, so it must be used
|
||||
* in conjunction with {@link #computeExecutionsFromFlowTriggerPreconditions(Execution, Flow, MultipleConditionStorageInterface)}.
|
||||
*/
|
||||
public List<Execution> computeExecutionsFromFlowTriggerConditions(Execution execution, Flow flow) {
|
||||
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
|
||||
.stream()
|
||||
// we must filter on no multiple conditions and no preconditions to avoid evaluating two times triggers that have standard conditions and multiple conditions
|
||||
.filter(it -> it.getTrigger().getPreconditions() == null && ListUtils.emptyOnNull(it.getTrigger().getConditions()).stream().noneMatch(MultipleCondition.class::isInstance))
|
||||
.toList();
|
||||
|
||||
// short-circuit empty triggers to evaluate
|
||||
if (flowWithFlowTriggers.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// compute all executions to create from flow triggers without taken into account multiple conditions
|
||||
return flowWithFlowTriggers.stream()
|
||||
.map(f -> f.getTrigger().evaluate(
|
||||
Optional.empty(),
|
||||
runContextFactory.of(f.getFlow(), execution),
|
||||
f.getFlow(),
|
||||
execution
|
||||
))
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method computes executions to trigger from flow triggers from a given execution.
|
||||
* It only computes those depending on multiple conditions and preconditions, so it must be used
|
||||
* in conjunction with {@link #computeExecutionsFromFlowTriggerConditions(Execution, Flow)}.
|
||||
*/
|
||||
public List<Execution> computeExecutionsFromFlowTriggerPreconditions(Execution execution, Flow flow, MultipleConditionStorageInterface multipleConditionStorage) {
|
||||
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
|
||||
.stream()
|
||||
// we must filter on multiple conditions or preconditions to avoid evaluating two times triggers that only have standard conditions
|
||||
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getPreconditions() != null || ListUtils.emptyOnNull(flowWithFlowTrigger.getTrigger().getConditions()).stream().anyMatch(MultipleCondition.class::isInstance))
|
||||
.toList();
|
||||
|
||||
// short-circuit empty triggers to evaluate
|
||||
if (flowWithFlowTriggers.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = flowWithFlowTriggers.stream()
|
||||
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
|
||||
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
multipleConditionStorage.getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
multipleCondition
|
||||
)
|
||||
)
|
||||
)
|
||||
// avoid evaluating expired windows (for ex for daily time window or deadline)
|
||||
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
|
||||
.toList();
|
||||
|
||||
// evaluate multiple conditions
|
||||
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
|
||||
Map<String, Boolean> results = f.getMultipleCondition()
|
||||
.getConditions()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(e -> new AbstractMap.SimpleEntry<>(
|
||||
e.getKey(),
|
||||
conditionService.isValid(e.getValue(), f.getFlow(), execution)
|
||||
))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
return Map.entry(f, f.getMultipleConditionWindow().with(results));
|
||||
})
|
||||
.filter(e -> !e.getValue().getResults().isEmpty())
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// persist results
|
||||
multipleConditionStorage.save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
|
||||
|
||||
// compute all executions to create from flow triggers now that multiple conditions storage is populated
|
||||
List<Execution> executions = flowWithFlowTriggers.stream()
|
||||
// will evaluate conditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage
|
||||
)
|
||||
)
|
||||
// will evaluate preconditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger().getPreconditions(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage
|
||||
)
|
||||
)
|
||||
.map(f -> f.getTrigger().evaluate(
|
||||
Optional.of(multipleConditionStorage),
|
||||
runContextFactory.of(f.getFlow(), execution),
|
||||
f.getFlow(),
|
||||
execution
|
||||
))
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.toList();
|
||||
|
||||
// purge fulfilled or expired multiple condition windows
|
||||
Stream.concat(
|
||||
multipleConditionWindowsByFlow.entrySet().stream()
|
||||
.map(e -> Map.entry(
|
||||
e.getKey().getMultipleCondition(),
|
||||
e.getValue()
|
||||
))
|
||||
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
|
||||
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
|
||||
)
|
||||
.map(Map.Entry::getValue),
|
||||
multipleConditionStorage.expired(execution.getTenantId()).stream()
|
||||
).forEach(multipleConditionStorage::delete);
|
||||
|
||||
return executions;
|
||||
}
|
||||
|
||||
private List<FlowWithFlowTrigger> computeFlowTriggers(Execution execution, Flow flow) {
|
||||
if (
|
||||
// prevent recursive flow triggers
|
||||
.filter(flow -> flowService.removeUnwanted(flow, execution))
|
||||
// filter out Test Executions
|
||||
.filter(flow -> execution.getKind() == null)
|
||||
// ensure flow & triggers are enabled
|
||||
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
|
||||
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
||||
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
|
||||
!flowService.removeUnwanted(flow, execution) ||
|
||||
// filter out Test Executions
|
||||
execution.getKind() != null ||
|
||||
// ensure flow & triggers are enabled
|
||||
flow.isDisabled() || flow instanceof FlowWithException ||
|
||||
flow.getTriggers() == null || flow.getTriggers().isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))
|
||||
// filter on the execution state the flow listen to
|
||||
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getStates().contains(execution.getState().getCurrent()))
|
||||
// validate flow triggers conditions excluding multiple conditions
|
||||
@@ -74,96 +205,6 @@ public class FlowTriggerService {
|
||||
execution
|
||||
)
|
||||
)).toList();
|
||||
|
||||
// short-circuit empty triggers to evaluate
|
||||
if (validTriggersBeforeMultipleConditionEval.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = null;
|
||||
if (multipleConditionStorage.isPresent()) {
|
||||
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
|
||||
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
|
||||
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
multipleCondition
|
||||
)
|
||||
)
|
||||
)
|
||||
// avoid evaluating expired windows (for ex for daily time window or deadline)
|
||||
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
|
||||
.toList();
|
||||
|
||||
// evaluate multiple conditions
|
||||
multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
|
||||
Map<String, Boolean> results = f.getMultipleCondition()
|
||||
.getConditions()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(e -> new AbstractMap.SimpleEntry<>(
|
||||
e.getKey(),
|
||||
conditionService.isValid(e.getValue(), f.getFlow(), execution)
|
||||
))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
return Map.entry(f, f.getMultipleConditionWindow().with(results));
|
||||
})
|
||||
.filter(e -> !e.getValue().getResults().isEmpty())
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// persist results
|
||||
multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
|
||||
}
|
||||
|
||||
// compute all executions to create from flow triggers now that multiple conditions storage is populated
|
||||
List<Execution> executions = validTriggersBeforeMultipleConditionEval.stream()
|
||||
// will evaluate conditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage.orElse(null)
|
||||
)
|
||||
)
|
||||
// will evaluate preconditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger().getPreconditions(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage.orElse(null)
|
||||
)
|
||||
)
|
||||
.map(f -> f.getTrigger().evaluate(
|
||||
multipleConditionStorage,
|
||||
runContextFactory.of(f.getFlow(), execution),
|
||||
f.getFlow(),
|
||||
execution
|
||||
))
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.toList();
|
||||
|
||||
if (multipleConditionStorage.isPresent()) {
|
||||
// purge fulfilled or expired multiple condition windows
|
||||
Stream.concat(
|
||||
multipleConditionWindowsByFlow.entrySet().stream()
|
||||
.map(e -> Map.entry(
|
||||
e.getKey().getMultipleCondition(),
|
||||
e.getValue()
|
||||
))
|
||||
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
|
||||
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
|
||||
)
|
||||
.map(Map.Entry::getValue),
|
||||
multipleConditionStorage.get().expired(execution.getTenantId()).stream()
|
||||
).forEach(multipleConditionStorage.get()::delete);
|
||||
}
|
||||
|
||||
return executions;
|
||||
}
|
||||
|
||||
private Stream<MultipleCondition> flowTriggerMultipleConditions(FlowWithFlowTrigger flowWithFlowTrigger) {
|
||||
|
||||
@@ -26,8 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
@KestraTest
|
||||
class FlowTriggerServiceTest {
|
||||
public static final List<Label> EMPTY_LABELS = List.of();
|
||||
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
|
||||
|
||||
|
||||
@Inject
|
||||
private TestRunContextFactory runContextFactory;
|
||||
@Inject
|
||||
@@ -56,10 +55,9 @@ class FlowTriggerServiceTest {
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
simpleFlowExecution,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
flowWithFlowTrigger
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
|
||||
@@ -81,10 +79,9 @@ class FlowTriggerServiceTest {
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
simpleFlowExecution,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
flowWithFlowTrigger
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
@@ -109,10 +106,9 @@ class FlowTriggerServiceTest {
|
||||
.kind(ExecutionKind.TEST)
|
||||
.build();
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
simpleFlowExecutionComingFromATest,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
flowWithFlowTrigger
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
version=1.0.8
|
||||
version=1.0.13
|
||||
|
||||
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
|
||||
org.gradle.parallel=true
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.runners.ScheduleContextInterface;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
|
||||
import io.kestra.jdbc.runner.JdbcSchedulerContext;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
@@ -11,6 +13,10 @@ import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.Temporal;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
@@ -45,4 +51,11 @@ public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Temporal toNextExecutionTime(ZonedDateTime now) {
|
||||
// next_execution_date in the table is stored in UTC
|
||||
// convert 'now' to UTC LocalDateTime to avoid any timezone/offset interpretation by the database.
|
||||
return now.withZoneSameInstant(ZoneOffset.UTC).toLocalDateTime();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.Temporal;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -151,7 +152,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(
|
||||
(field("next_execution_date").lessThan(now.toOffsetDateTime())
|
||||
(field("next_execution_date").lessThan(toNextExecutionTime(now))
|
||||
// we check for null for backwards compatibility
|
||||
.or(field("next_execution_date").isNull()))
|
||||
.and(field("execution_id").isNull())
|
||||
@@ -162,14 +163,14 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
.fetch()
|
||||
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class)));
|
||||
}
|
||||
|
||||
|
||||
public List<Trigger> findByNextExecutionDateReadyButLockedTriggers(ZonedDateTime now) {
|
||||
return this.jdbcRepository.getDslContextWrapper()
|
||||
.transactionResult(configuration -> DSL.using(configuration)
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(
|
||||
(field("next_execution_date").lessThan(now.toOffsetDateTime())
|
||||
(field("next_execution_date").lessThan(toNextExecutionTime(now))
|
||||
// we check for null for backwards compatibility
|
||||
.or(field("next_execution_date").isNull()))
|
||||
.and(field("execution_id").isNotNull())
|
||||
@@ -178,6 +179,10 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
.fetch()
|
||||
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class))));
|
||||
}
|
||||
|
||||
protected Temporal toNextExecutionTime(ZonedDateTime now) {
|
||||
return now.toOffsetDateTime();
|
||||
}
|
||||
|
||||
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
|
||||
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;
|
||||
|
||||
@@ -24,10 +24,10 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the concurrency limit counter then process the count using the consumer function.
|
||||
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
|
||||
* Fetch the concurrency limit counter, then process the count using the consumer function.
|
||||
* It locked the raw and is wrapped in a transaction, so the consumer should use the provided dslContext for any database access.
|
||||
* <p>
|
||||
* Note that to avoid a race when no concurrency limit counter exists, it first always try to insert a 0 counter.
|
||||
* Note that to avoid a race when no concurrency limit counter exists, it first always tries to insert a 0 counter.
|
||||
*/
|
||||
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, ConcurrencyLimit, Pair<ExecutionRunning, ConcurrencyLimit>> consumer) {
|
||||
return this.jdbcRepository
|
||||
@@ -106,8 +106,7 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
|
||||
.and(field("namespace").eq(flow.getNamespace()))
|
||||
.and(field("flow_id").eq(flow.getId()));
|
||||
|
||||
return Optional.ofNullable(select.forUpdate().fetchOne())
|
||||
.map(record -> this.jdbcRepository.map(record));
|
||||
return this.jdbcRepository.fetchOne(select.forUpdate());
|
||||
}
|
||||
|
||||
private void save(DSLContext dslContext, ConcurrencyLimit concurrencyLimit) {
|
||||
|
||||
@@ -12,7 +12,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRepository {
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutionQueued> jdbcRepository;
|
||||
@@ -70,18 +69,12 @@ public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRep
|
||||
this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
var select = DSL
|
||||
.using(configuration)
|
||||
.select(AbstractJdbcRepository.field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(buildTenantCondition(execution.getTenantId()))
|
||||
.and(field("key").eq(IdUtils.fromParts(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
|
||||
.forUpdate();
|
||||
|
||||
Optional<ExecutionQueued> maybeExecution = this.jdbcRepository.fetchOne(select);
|
||||
if (maybeExecution.isPresent()) {
|
||||
this.jdbcRepository.delete(maybeExecution.get());
|
||||
}
|
||||
DSL
|
||||
.using(configuration)
|
||||
.deleteFrom(this.jdbcRepository.getTable())
|
||||
.where(buildTenantCondition(execution.getTenantId()))
|
||||
.and(field("key").eq(IdUtils.fromParts(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
|
||||
.execute();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -423,7 +423,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
|
||||
MultipleConditionEvent multipleConditionEvent = either.getLeft();
|
||||
|
||||
flowTriggerService.computeExecutionsFromFlowTriggers(multipleConditionEvent.execution(), List.of(multipleConditionEvent.flow()), Optional.of(multipleConditionStorage))
|
||||
flowTriggerService.computeExecutionsFromFlowTriggerPreconditions(multipleConditionEvent.execution(), multipleConditionEvent.flow(), multipleConditionStorage)
|
||||
.forEach(exec -> {
|
||||
try {
|
||||
executionQueue.emit(exec);
|
||||
@@ -650,8 +650,24 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
.orElse(null);
|
||||
workerJobQueue.emit(workerGroupKey, workerTask);
|
||||
}
|
||||
/// flowable attempt state transition to running
|
||||
if (workerTask.getTask().isFlowable()) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
|
||||
List<TaskRunAttempt> attempts = Optional.ofNullable(workerTask.getTaskRun().getAttempts())
|
||||
.map(ArrayList::new)
|
||||
.orElseGet(ArrayList::new);
|
||||
|
||||
|
||||
attempts.add(
|
||||
TaskRunAttempt.builder()
|
||||
.state(new State().withState(State.Type.RUNNING))
|
||||
.build()
|
||||
);
|
||||
|
||||
TaskRun updatedTaskRun = workerTask.getTaskRun()
|
||||
.withAttempts(attempts)
|
||||
.withState(State.Type.RUNNING);
|
||||
|
||||
workerTaskResults.add(new WorkerTaskResult(updatedTaskRun));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@@ -1196,8 +1212,10 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
private void processFlowTriggers(Execution execution) throws QueueException {
|
||||
// directly process simple conditions
|
||||
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
|
||||
.filter(f ->ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
|
||||
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f.getFlow()), Optional.empty()).stream())
|
||||
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
|
||||
.map(f -> f.getFlow())
|
||||
.distinct() // as computeExecutionsFromFlowTriggers is based on flow, we must map FlowWithFlowTrigger to a flow and distinct to avoid multiple execution for the same flow
|
||||
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggerConditions(execution, f).stream())
|
||||
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
|
||||
|
||||
// send multiple conditions to the multiple condition queue for later processing
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.repositories.TriggerRepositoryInterface;
|
||||
import io.kestra.core.runners.ScheduleContextInterface;
|
||||
import io.kestra.core.runners.Scheduler;
|
||||
import io.kestra.core.runners.SchedulerTriggerStateInterface;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.core.services.FlowService;
|
||||
@@ -56,6 +57,9 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
.forEach(abstractTrigger -> triggerRepository.delete(Trigger.of(flow, abstractTrigger)));
|
||||
}
|
||||
});
|
||||
|
||||
// No-op consumption of the trigger queue, so the events are purged from the queue
|
||||
this.triggerQueue.receive(Scheduler.class, trigger -> { });
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -138,6 +139,7 @@ public abstract class JdbcRunnerRetryTest {
|
||||
retryCaseTest.retryDynamicTask(execution);
|
||||
}
|
||||
|
||||
@FlakyTest(description = "it seems this flow sometimes stay stuck in RUNNING")
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/retry-with-flowable-errors.yaml")
|
||||
void retryWithFlowableErrors(Execution execution){
|
||||
|
||||
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@Slf4j
|
||||
@Singleton
|
||||
public class TestRunner implements Runnable, AutoCloseable {
|
||||
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
|
||||
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors()) * 16;
|
||||
@Setter private boolean schedulerEnabled = true;
|
||||
@Setter private boolean workerEnabled = true;
|
||||
|
||||
|
||||
@@ -272,6 +272,7 @@
|
||||
import Id from "../Id.vue";
|
||||
import SelectTableActions from "../../mixins/selectTableActions";
|
||||
import _merge from "lodash/merge";
|
||||
import moment from "moment";
|
||||
import LogsWrapper from "../logs/LogsWrapper.vue";
|
||||
import KestraFilter from "../filter/KestraFilter.vue"
|
||||
import {mapStores} from "pinia";
|
||||
@@ -500,6 +501,15 @@
|
||||
loadQuery(base) {
|
||||
let queryFilter = this.queryWithFilter();
|
||||
|
||||
const timeRange = queryFilter["filters[timeRange][EQUALS]"];
|
||||
if (timeRange) {
|
||||
const end = new Date();
|
||||
const start = new Date(end.getTime() - moment.duration(timeRange).asMilliseconds());
|
||||
queryFilter["filters[startDate][GREATER_THAN_OR_EQUAL_TO]"] = start.toISOString();
|
||||
queryFilter["filters[endDate][LESS_THAN_OR_EQUAL_TO]"] = end.toISOString();
|
||||
delete queryFilter["filters[timeRange][EQUALS]"];
|
||||
}
|
||||
|
||||
return _merge(base, queryFilter)
|
||||
},
|
||||
},
|
||||
|
||||
@@ -56,7 +56,7 @@
|
||||
this.follow();
|
||||
window.addEventListener("popstate", this.follow)
|
||||
|
||||
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId})).count;
|
||||
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId}, true)).count;
|
||||
},
|
||||
mounted() {
|
||||
this.previousExecutionId = this.$route.params.id
|
||||
|
||||
@@ -528,27 +528,25 @@
|
||||
}
|
||||
.content-container {
|
||||
height: calc(100vh - 0px);
|
||||
overflow-y: auto !important;
|
||||
overflow-y: scroll;
|
||||
overflow-x: hidden;
|
||||
scrollbar-gutter: stable;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
:deep(.el-collapse) {
|
||||
.el-collapse-item__wrap {
|
||||
overflow-y: auto !important;
|
||||
max-height: none !important;
|
||||
}
|
||||
|
||||
.el-collapse-item__content {
|
||||
overflow-y: auto !important;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
}
|
||||
|
||||
:deep(.var-value) {
|
||||
overflow-y: auto !important;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
@@ -247,6 +247,7 @@
|
||||
const suggestWidgetResizeObserver = ref<MutationObserver>()
|
||||
const suggestWidgetObserver = ref<MutationObserver>()
|
||||
const suggestWidget = ref<HTMLElement>()
|
||||
const resizeObserver = ref<ResizeObserver>()
|
||||
|
||||
|
||||
|
||||
@@ -796,6 +797,20 @@
|
||||
setTimeout(() => monaco.editor.remeasureFonts(), 1)
|
||||
emit("editorDidMount", editorResolved.value);
|
||||
|
||||
/* Hhandle resizing. */
|
||||
resizeObserver.value = new ResizeObserver(() => {
|
||||
if (localEditor.value) {
|
||||
localEditor.value.layout();
|
||||
}
|
||||
if (localDiffEditor.value) {
|
||||
localDiffEditor.value.getModifiedEditor().layout();
|
||||
localDiffEditor.value.getOriginalEditor().layout();
|
||||
}
|
||||
});
|
||||
if (editorRef.value) {
|
||||
resizeObserver.value.observe(editorRef.value);
|
||||
}
|
||||
|
||||
highlightLine();
|
||||
}
|
||||
|
||||
@@ -853,6 +868,8 @@
|
||||
function destroy() {
|
||||
disposeObservers();
|
||||
disposeCompletions.value?.();
|
||||
resizeObserver.value?.disconnect();
|
||||
resizeObserver.value = undefined;
|
||||
if (localDiffEditor.value !== undefined) {
|
||||
localDiffEditor.value?.dispose();
|
||||
localDiffEditor.value?.getModel()?.modified?.dispose();
|
||||
|
||||
@@ -475,7 +475,7 @@
|
||||
return this.namespacesStore
|
||||
.createKv({
|
||||
...this.kv,
|
||||
contentType: ["DATE", "DATETIME"].includes(type) ? "text/plain" : "application/json",
|
||||
contentType: "text/plain",
|
||||
value
|
||||
})
|
||||
.then(() => {
|
||||
|
||||
@@ -100,7 +100,7 @@
|
||||
>
|
||||
<NamespaceSelect
|
||||
v-model="secret.namespace"
|
||||
:readonly="secret.update"
|
||||
:read-only="secret.update"
|
||||
:include-system-namespace="true"
|
||||
all
|
||||
/>
|
||||
|
||||
@@ -506,7 +506,7 @@ export const useFlowStore = defineStore("flow", () => {
|
||||
}
|
||||
|
||||
function loadDependencies(options: { namespace: string, id: string, subtype: "FLOW" | "EXECUTION" }, onlyCount = false) {
|
||||
return store.$http.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies?expandAll=true`).then(response => {
|
||||
return store.$http.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies?expandAll=${onlyCount ? false : true}`).then(response => {
|
||||
return {
|
||||
...(!onlyCount ? {data: transformResponse(response.data, options.subtype)} : {}),
|
||||
count: response.data.nodes ? [...new Set(response.data.nodes.map((r:{uid:string}) => r.uid))].length : 0
|
||||
|
||||
@@ -107,25 +107,11 @@
|
||||
|
||||
:deep(.alert-info) {
|
||||
display: flex;
|
||||
gap: 12px;
|
||||
padding: 16px 16px 0 16px;
|
||||
padding: .5rem !important;
|
||||
background-color: var(--ks-background-info);
|
||||
border: 1px solid var(--ks-border-info);
|
||||
border-left-width: 5px;
|
||||
border-radius: 8px;
|
||||
|
||||
&::before {
|
||||
content: '!';
|
||||
min-width: 20px;
|
||||
height: 20px;
|
||||
margin-top: 4px;
|
||||
border-radius: 50%;
|
||||
background: var(--ks-content-info);
|
||||
border: 1px solid var(--ks-border-info);
|
||||
color: var(--ks-content-inverse);
|
||||
font: 600 13px/20px sans-serif;
|
||||
text-align: center;
|
||||
}
|
||||
border-left-width: 0.25rem;
|
||||
border-radius: 0.5rem;
|
||||
|
||||
p { color: var(--ks-content-info); }
|
||||
}
|
||||
@@ -135,7 +121,7 @@
|
||||
color: var(--ks-content-info);
|
||||
border: 1px solid var(--ks-border-info);
|
||||
font-family: 'Courier New', Courier, monospace;
|
||||
white-space: nowrap; // Prevent button text from wrapping
|
||||
white-space: nowrap;
|
||||
|
||||
.material-design-icon {
|
||||
position: absolute;
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ {what} hinzufügen",
|
||||
"adding_default": "+ Neuen Wert hinzufügen",
|
||||
"clearSelection": "Auswahl aufheben",
|
||||
"close": {
|
||||
"afterExecution": "Nach Ausführung Task schließen",
|
||||
"conditions": "Bedingung schließen",
|
||||
"errors": "Fehlerbehandler schließen",
|
||||
"finally": "Task schließen",
|
||||
"input": "Eingabe schließen",
|
||||
"pluginDefaults": "Plugin-Standard schließen",
|
||||
"tasks": "Task schließen",
|
||||
"triggers": "Trigger schließen"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Fügen Sie einen Block nach der Ausführung hinzu",
|
||||
"conditions": "Bedingung hinzufügen",
|
||||
"default": "Hinzufügen",
|
||||
"errors": "Einen Fehler-Handler hinzufügen",
|
||||
"finally": "Fügen Sie einen Finally-Block hinzu",
|
||||
"inputs": "Ein Eingabefeld hinzufügen",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variable",
|
||||
"yaml": "YAML-Editor"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Diesen Fall entfernen",
|
||||
"default": "Diesen Eintrag entfernen"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Nach Ausführung",
|
||||
"connection": "Verbindungseigenschaften",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Wählen Sie eine Task aus",
|
||||
"conditions": "Wählen Sie eine Bedingung aus",
|
||||
"default": "Wählen Sie einen Typ aus",
|
||||
"errors": "Wählen Sie eine Task aus",
|
||||
"finally": "Wählen Sie eine Task aus",
|
||||
"inputs": "Wählen Sie einen Input-Feldtyp aus",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Agregar un {what}",
|
||||
"adding_default": "+ Añadir un nuevo value",
|
||||
"clearSelection": "Borrar selección",
|
||||
"close": {
|
||||
"afterExecution": "Cerrar después de la ejecución de la task",
|
||||
"conditions": "Condición de cierre",
|
||||
"errors": "Cerrar el manejador de errores",
|
||||
"finally": "Cerrar task",
|
||||
"input": "Cerrar input",
|
||||
"pluginDefaults": "Cerrar plugin predeterminado",
|
||||
"tasks": "Cerrar task",
|
||||
"triggers": "Cerrar trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Agregar un bloque después de la ejecución",
|
||||
"conditions": "Agregar una condición",
|
||||
"default": "Agregar",
|
||||
"errors": "Agregar un manejador de errores",
|
||||
"finally": "Agregar un bloque finally",
|
||||
"inputs": "Agregar un campo de input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variable",
|
||||
"yaml": "Editor YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Eliminar este caso",
|
||||
"default": "Eliminar esta entrada"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Después de la Ejecución",
|
||||
"connection": "Propiedades de conexión",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Seleccionar una task",
|
||||
"conditions": "Seleccione una condición",
|
||||
"default": "Seleccionar un tipo",
|
||||
"errors": "Selecciona una task",
|
||||
"finally": "Seleccionar una task",
|
||||
"inputs": "Seleccione un tipo de campo de input",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Ajouter un {what}",
|
||||
"adding_default": "+ Ajouter une nouvelle valeur",
|
||||
"clearSelection": "Effacer la sélection",
|
||||
"close": {
|
||||
"afterExecution": "Fermer après l'exécution de la task",
|
||||
"conditions": "Condition de fermeture",
|
||||
"errors": "Fermer le gestionnaire d'erreurs",
|
||||
"finally": "Fermer la task",
|
||||
"input": "Fermer l'input",
|
||||
"pluginDefaults": "Fermer le plugin par défaut",
|
||||
"tasks": "Fermer la task",
|
||||
"triggers": "Fermer le trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Ajouter un bloc après l'exécution",
|
||||
"conditions": "Ajouter une condition",
|
||||
"default": "Ajouter",
|
||||
"errors": "Ajouter un gestionnaire d'erreurs",
|
||||
"finally": "Ajouter un bloc finally",
|
||||
"inputs": "Ajouter un champ d'input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variable",
|
||||
"yaml": "Éditeur YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Supprimer ce cas",
|
||||
"default": "Supprimer cette entrée"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Après l'Exécution",
|
||||
"connection": "Propriétés de connexion",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Sélectionnez une task",
|
||||
"conditions": "Sélectionner une condition",
|
||||
"default": "Sélectionner un type",
|
||||
"errors": "Sélectionner une task",
|
||||
"finally": "Sélectionnez une task",
|
||||
"inputs": "Sélectionnez un type de champ input",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ एक {what} जोड़ें",
|
||||
"adding_default": "+ एक नया value जोड़ें",
|
||||
"clearSelection": "चयन साफ़ करें",
|
||||
"close": {
|
||||
"afterExecution": "कार्य पूरा होने के बाद बंद करें",
|
||||
"conditions": "बंद करने की शर्त",
|
||||
"errors": "त्रुटि हैंडलर बंद करें",
|
||||
"finally": "टास्क बंद करें",
|
||||
"input": "इनपुट बंद करें",
|
||||
"pluginDefaults": "प्लगइन डिफ़ॉल्ट बंद करें",
|
||||
"tasks": "टास्क बंद करें",
|
||||
"triggers": "ट्रिगर बंद करें"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "एक निष्पादन के बाद ब्लॉक जोड़ें",
|
||||
"conditions": "एक शर्त जोड़ें",
|
||||
"default": "जोड़ें",
|
||||
"errors": "त्रुटि हैंडलर जोड़ें",
|
||||
"finally": "अंत में एक finally ब्लॉक जोड़ें",
|
||||
"inputs": "इनपुट फ़ील्ड जोड़ें",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "वेरिएबल",
|
||||
"yaml": "YAML संपादक"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "इस केस को हटाएं",
|
||||
"default": "इस प्रविष्टि को हटाएं"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Execution के बाद",
|
||||
"connection": "कनेक्शन गुणधर्म",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "एक task चुनें",
|
||||
"conditions": "कंडीशन चुनें",
|
||||
"default": "एक प्रकार चुनें",
|
||||
"errors": "एक task चुनें",
|
||||
"finally": "एक task चुनें",
|
||||
"inputs": "इनपुट फ़ील्ड प्रकार चुनें",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Aggiungi un {what}",
|
||||
"adding_default": "+ Aggiungi un nuovo value",
|
||||
"clearSelection": "Cancella selezione",
|
||||
"close": {
|
||||
"afterExecution": "Chiudi dopo l'esecuzione del task",
|
||||
"conditions": "Condizione di chiusura",
|
||||
"errors": "Chiudi error handler",
|
||||
"finally": "Chiudi task",
|
||||
"input": "Chiudi input",
|
||||
"pluginDefaults": "Chiudi plugin predefinito",
|
||||
"tasks": "Chiudi task",
|
||||
"triggers": "Chiudi trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Aggiungi un blocco after execution",
|
||||
"conditions": "Aggiungi una condizione",
|
||||
"default": "Aggiungi",
|
||||
"errors": "Aggiungi un gestore degli errori",
|
||||
"finally": "Aggiungi un blocco finally",
|
||||
"inputs": "Aggiungi un campo di input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variabile",
|
||||
"yaml": "Editor YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Rimuovi questo caso",
|
||||
"default": "Rimuovi questa voce"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Dopo l'Esecuzione",
|
||||
"connection": "Proprietà di connessione",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Seleziona un task",
|
||||
"conditions": "Seleziona una condizione",
|
||||
"default": "Seleziona un tipo",
|
||||
"errors": "Seleziona un task",
|
||||
"finally": "Seleziona un task",
|
||||
"inputs": "Seleziona un tipo di input field",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ {what}を追加",
|
||||
"adding_default": "+ 新しいvalueを追加",
|
||||
"clearSelection": "選択をクリア",
|
||||
"close": {
|
||||
"afterExecution": "実行後にタスクを閉じる",
|
||||
"conditions": "クローズ条件",
|
||||
"errors": "エラーハンドラーを閉じる",
|
||||
"finally": "タスクを閉じる",
|
||||
"input": "入力を閉じる",
|
||||
"pluginDefaults": "プラグインのデフォルトを閉じる",
|
||||
"tasks": "タスクを閉じる",
|
||||
"triggers": "トリガーを閉じる"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "実行後ブロックを追加",
|
||||
"conditions": "条件を追加",
|
||||
"default": "追加",
|
||||
"errors": "エラーハンドラーを追加",
|
||||
"finally": "finally ブロックを追加",
|
||||
"inputs": "入力フィールドを追加",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "変数",
|
||||
"yaml": "YAMLエディター"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "このケースを削除",
|
||||
"default": "このエントリを削除"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "実行後",
|
||||
"connection": "接続プロパティ",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "タスクを選択",
|
||||
"conditions": "条件を選択",
|
||||
"default": "タイプを選択",
|
||||
"errors": "タスクを選択",
|
||||
"finally": "タスクを選択",
|
||||
"inputs": "入力フィールドタイプを選択",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ {what} 추가",
|
||||
"adding_default": "+ 새 value 추가",
|
||||
"clearSelection": "선택 해제",
|
||||
"close": {
|
||||
"afterExecution": "실행 task 후 닫기",
|
||||
"conditions": "닫기 조건",
|
||||
"errors": "오류 처리기 닫기",
|
||||
"finally": "작업 닫기",
|
||||
"input": "입력 닫기",
|
||||
"pluginDefaults": "플러그인 기본값 닫기",
|
||||
"tasks": "작업 닫기",
|
||||
"triggers": "트리거 닫기"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "실행 후 블록 추가",
|
||||
"conditions": "조건 추가",
|
||||
"default": "추가",
|
||||
"errors": "오류 처리기 추가",
|
||||
"finally": "마지막 블록 추가",
|
||||
"inputs": "입력 필드 추가",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "변수",
|
||||
"yaml": "YAML 편집기"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "이 케이스를 제거하십시오",
|
||||
"default": "이 항목 제거"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "실행 후",
|
||||
"connection": "연결 속성",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "작업 선택",
|
||||
"conditions": "조건 선택",
|
||||
"default": "유형 선택",
|
||||
"errors": "작업 선택",
|
||||
"finally": "작업 선택",
|
||||
"inputs": "입력 필드 유형 선택",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Dodaj {what}",
|
||||
"adding_default": "+ Dodaj nową wartość",
|
||||
"clearSelection": "Wyczyść zaznaczenie",
|
||||
"close": {
|
||||
"afterExecution": "Zamknij po wykonaniu task",
|
||||
"conditions": "Zamknij warunek",
|
||||
"errors": "Zamknij obsługę błędów",
|
||||
"finally": "Zamknij task",
|
||||
"input": "Zamknij input",
|
||||
"pluginDefaults": "Zamknij domyślną wtyczkę",
|
||||
"tasks": "Zamknij task",
|
||||
"triggers": "Zamknij trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Dodaj blok po wykonaniu",
|
||||
"conditions": "Dodaj warunek",
|
||||
"default": "Dodaj",
|
||||
"errors": "Dodaj obsługę błędów",
|
||||
"finally": "Dodaj blok finally",
|
||||
"inputs": "Dodaj pole input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Zmienna",
|
||||
"yaml": "Edytor YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Usuń ten przypadek",
|
||||
"default": "Usuń ten wpis"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Po wykonaniu",
|
||||
"connection": "Właściwości połączenia",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Wybierz task",
|
||||
"conditions": "Wybierz warunek",
|
||||
"default": "Wybierz typ",
|
||||
"errors": "Wybierz task",
|
||||
"finally": "Wybierz task",
|
||||
"inputs": "Wybierz typ pola input",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Adicionar um {what}",
|
||||
"adding_default": "+ Adicionar um novo value",
|
||||
"clearSelection": "Limpar seleção",
|
||||
"close": {
|
||||
"afterExecution": "Fechar após execução da task",
|
||||
"conditions": "Condição de fechamento",
|
||||
"errors": "Fechar manipulador de erro",
|
||||
"finally": "Fechar task",
|
||||
"input": "Fechar input",
|
||||
"pluginDefaults": "Fechar plugin padrão",
|
||||
"tasks": "Fechar task",
|
||||
"triggers": "Fechar trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Adicionar um bloco após a execução",
|
||||
"conditions": "Adicionar uma condição",
|
||||
"default": "Adicionar",
|
||||
"errors": "Adicionar um manipulador de erro",
|
||||
"finally": "Adicionar um bloco finally",
|
||||
"inputs": "Adicionar um campo de input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Variável",
|
||||
"yaml": "Editor YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Remover este caso",
|
||||
"default": "Remover esta entrada"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "Após a Execução",
|
||||
"connection": "Propriedades de conexão",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Selecione uma task",
|
||||
"conditions": "Selecione uma condição",
|
||||
"default": "Selecione um tipo",
|
||||
"errors": "Selecione uma task",
|
||||
"finally": "Selecione uma task",
|
||||
"inputs": "Selecione um tipo de campo de input",
|
||||
|
||||
1836
ui/src/translations/pt_BR.json
Normal file
1836
ui/src/translations/pt_BR.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -870,19 +870,10 @@
|
||||
"adding": "+ Добавить {what}",
|
||||
"adding_default": "+ Добавить новое value",
|
||||
"clearSelection": "Очистить выбор",
|
||||
"close": {
|
||||
"afterExecution": "Закрыть после выполнения task",
|
||||
"conditions": "Закрыть условие",
|
||||
"errors": "Закрыть обработчик ошибок",
|
||||
"finally": "Закрыть task",
|
||||
"input": "Закрыть input",
|
||||
"pluginDefaults": "Закрыть плагин по умолчанию",
|
||||
"tasks": "Закрыть task",
|
||||
"triggers": "Закрыть trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "Добавить блок после выполнения",
|
||||
"conditions": "Добавить условие",
|
||||
"default": "Добавить",
|
||||
"errors": "Добавить обработчик ошибок",
|
||||
"finally": "Добавить блок finally",
|
||||
"inputs": "Добавить поле input",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "Переменная",
|
||||
"yaml": "Редактор YAML"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "Удалить этот кейс",
|
||||
"default": "Удалить эту запись"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "После выполнения",
|
||||
"connection": "Свойства подключения",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "Выберите task",
|
||||
"conditions": "Выберите условие",
|
||||
"default": "Выберите тип",
|
||||
"errors": "Выберите task",
|
||||
"finally": "Выберите task",
|
||||
"inputs": "Выберите тип поля input",
|
||||
|
||||
@@ -870,19 +870,10 @@
|
||||
"adding": "+ 添加{what}",
|
||||
"adding_default": "+ 添加新value",
|
||||
"clearSelection": "清除选择",
|
||||
"close": {
|
||||
"afterExecution": "执行任务后关闭",
|
||||
"conditions": "关闭条件",
|
||||
"errors": "关闭错误处理程序",
|
||||
"finally": "关闭 task",
|
||||
"input": "关闭输入",
|
||||
"pluginDefaults": "关闭插件默认",
|
||||
"tasks": "关闭task",
|
||||
"triggers": "关闭 trigger"
|
||||
},
|
||||
"creation": {
|
||||
"afterExecution": "添加执行后块",
|
||||
"conditions": "添加条件",
|
||||
"default": "添加",
|
||||
"errors": "添加错误处理程序",
|
||||
"finally": "添加一个finally块",
|
||||
"inputs": "添加一个input字段",
|
||||
@@ -916,6 +907,10 @@
|
||||
"variable": "变量",
|
||||
"yaml": "YAML编辑器"
|
||||
},
|
||||
"remove": {
|
||||
"cases": "删除此案例",
|
||||
"default": "删除此条目"
|
||||
},
|
||||
"sections": {
|
||||
"afterExecution": "执行后",
|
||||
"connection": "连接属性",
|
||||
@@ -932,6 +927,7 @@
|
||||
"select": {
|
||||
"afterExecution": "选择一个task",
|
||||
"conditions": "选择条件",
|
||||
"default": "选择类型",
|
||||
"errors": "选择一个task",
|
||||
"finally": "选择一个task",
|
||||
"inputs": "选择一个input字段类型",
|
||||
|
||||
@@ -71,6 +71,9 @@ import io.opentelemetry.context.propagation.ContextPropagators;
|
||||
import io.opentelemetry.context.propagation.TextMapPropagator;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.extensions.Extension;
|
||||
import io.swagger.v3.oas.annotations.extensions.ExtensionProperty;
|
||||
import io.swagger.v3.oas.annotations.enums.ParameterIn;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.ExampleObject;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
@@ -121,7 +124,7 @@ public class ExecutionController {
|
||||
@Nullable
|
||||
@Value("${micronaut.server.context-path}")
|
||||
protected String basePath;
|
||||
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@@ -168,7 +171,7 @@ public class ExecutionController {
|
||||
|
||||
@Inject
|
||||
private ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
||||
|
||||
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
|
||||
@@ -186,7 +189,7 @@ public class ExecutionController {
|
||||
|
||||
@Inject
|
||||
private Optional<OpenTelemetry> openTelemetry;
|
||||
|
||||
|
||||
@Inject
|
||||
private ExecutionStreamingService executionStreamingService;
|
||||
|
||||
@@ -206,7 +209,7 @@ public class ExecutionController {
|
||||
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
|
||||
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
|
||||
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
//Deprecated params
|
||||
@Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@@ -355,9 +358,9 @@ public class ExecutionController {
|
||||
@ApiResponse(responseCode = "204", description = "On success")
|
||||
public HttpResponse<Void> deleteExecution(
|
||||
@Parameter(description = "The execution id") @PathVariable String executionId,
|
||||
@Parameter(description = "Whether to delete execution logs") @QueryValue(defaultValue = "true") Boolean deleteLogs,
|
||||
@Parameter(description = "Whether to delete execution metrics") @QueryValue(defaultValue = "true") Boolean deleteMetrics,
|
||||
@Parameter(description = "Whether to delete execution files in the internal storage") @QueryValue(defaultValue = "true") Boolean deleteStorage
|
||||
@Parameter(description = "Whether to delete execution logs", required = false) @QueryValue(defaultValue = "true") Boolean deleteLogs,
|
||||
@Parameter(description = "Whether to delete execution metrics", required = false) @QueryValue(defaultValue = "true") Boolean deleteMetrics,
|
||||
@Parameter(description = "Whether to delete execution files in the internal storage", required = false) @QueryValue(defaultValue = "true") Boolean deleteStorage
|
||||
) throws IOException {
|
||||
Optional<Execution> execution = executionRepository.findById(tenantService.resolveTenant(), executionId);
|
||||
if (execution.isPresent()) {
|
||||
@@ -376,9 +379,9 @@ public class ExecutionController {
|
||||
public MutableHttpResponse<?> deleteExecutionsByIds(
|
||||
@RequestBody(description = "The execution id") @Body List<String> executionsId,
|
||||
@Parameter(description = "Whether to delete non-terminated executions") @Nullable @QueryValue(defaultValue = "false") Boolean includeNonTerminated,
|
||||
@Parameter(description = "Whether to delete execution logs") @QueryValue(defaultValue = "true") Boolean deleteLogs,
|
||||
@Parameter(description = "Whether to delete execution metrics") @QueryValue(defaultValue = "true") Boolean deleteMetrics,
|
||||
@Parameter(description = "Whether to delete execution files in the internal storage") @QueryValue(defaultValue = "true") Boolean deleteStorage
|
||||
@Parameter(description = "Whether to delete execution logs", required = false) @QueryValue(defaultValue = "true") Boolean deleteLogs,
|
||||
@Parameter(description = "Whether to delete execution metrics", required = false) @QueryValue(defaultValue = "true") Boolean deleteMetrics,
|
||||
@Parameter(description = "Whether to delete execution files in the internal storage", required = false) @QueryValue(defaultValue = "true") Boolean deleteStorage
|
||||
) throws IOException {
|
||||
List<Execution> executions = new ArrayList<>();
|
||||
Set<ManualConstraintViolation<String>> invalids = new HashSet<>();
|
||||
@@ -417,27 +420,27 @@ public class ExecutionController {
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Operation(tags = {"Executions"}, summary = "Delete executions filter by query parameters")
|
||||
public HttpResponse<?> deleteExecutionsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
|
||||
@Parameter(description = "Whether to delete non-terminated executions") @Nullable @QueryValue(defaultValue = "false") Boolean includeNonTerminated,
|
||||
@Parameter(description = "Whether to delete execution logs") @QueryValue(defaultValue = "true") Boolean deleteLogs,
|
||||
@Parameter(description = "Whether to delete execution metrics") @QueryValue(defaultValue = "true") Boolean deleteMetrics,
|
||||
@Parameter(description = "Whether to delete execution files in the internal storage") @QueryValue(defaultValue = "true") Boolean deleteStorage
|
||||
@Parameter(description = "Whether to delete execution logs", required = false) @QueryValue(defaultValue = "true") Boolean deleteLogs,
|
||||
@Parameter(description = "Whether to delete execution metrics", required = false) @QueryValue(defaultValue = "true") Boolean deleteMetrics,
|
||||
@Parameter(description = "Whether to delete execution files in the internal storage", required = false) @QueryValue(defaultValue = "true") Boolean deleteStorage
|
||||
) throws IOException {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
filters,
|
||||
@@ -666,7 +669,16 @@ public class ExecutionController {
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Post(uri = "/{namespace}/{id}", consumes = MediaType.MULTIPART_FORM_DATA)
|
||||
@Operation(tags = {"Executions"}, summary = "Create a new execution for a flow")
|
||||
@Operation(
|
||||
tags = {"Executions"},
|
||||
summary = "Create a new execution for a flow",
|
||||
extensions = @Extension(
|
||||
name = "x-sdk-customization",
|
||||
properties = {
|
||||
@ExtensionProperty(name = "x-multipart", value = "true")
|
||||
}
|
||||
)
|
||||
)
|
||||
@ApiResponse(responseCode = "409", description = "if the flow is disabled")
|
||||
@ApiResponse(responseCode = "200", description = "On execution created", content = {@Content(schema = @Schema(implementation = ExecutionResponse.class))})
|
||||
@SingleResult
|
||||
@@ -996,22 +1008,22 @@ public class ExecutionController {
|
||||
@Post(uri = "/restart/by-query")
|
||||
@Operation(tags = {"Executions"}, summary = "Restart executions filter by query parameters")
|
||||
public HttpResponse<?> restartExecutionsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
) throws Exception {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
filters,
|
||||
@@ -1056,13 +1068,32 @@ public class ExecutionController {
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Post(uri = "/{executionId}/replay-with-inputs", consumes = MediaType.MULTIPART_FORM_DATA)
|
||||
@Operation(tags = {"Executions"}, summary = "Create a new execution from an old one and start it from a specified task run id")
|
||||
@Operation(
|
||||
tags = {"Executions"},
|
||||
summary = "Create a new execution from an old one and start it from a specified task run id",
|
||||
extensions = @Extension(
|
||||
name = "x-sdk-customization",
|
||||
properties = {
|
||||
@ExtensionProperty(name = "x-multipart", value = "true")
|
||||
}
|
||||
)
|
||||
)
|
||||
public Mono<Execution> replayExecutionWithinputs(
|
||||
@Parameter(description = "the original execution id to clone") @PathVariable String executionId,
|
||||
@Parameter(description = "The taskrun id") @Nullable @QueryValue String taskRunId,
|
||||
@Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue Integer revision,
|
||||
@Parameter(description = "Set a list of breakpoints at specific tasks 'id.value', separated by a coma.") @QueryValue Optional<String> breakpoints,
|
||||
@RequestBody(description = "The inputs") @Body MultipartBody inputs
|
||||
@RequestBody(
|
||||
description = "The inputs (multipart map)",
|
||||
content = @Content(
|
||||
mediaType = MediaType.MULTIPART_FORM_DATA,
|
||||
schema = @Schema(
|
||||
type = "object",
|
||||
additionalProperties = Schema.AdditionalPropertiesValue.TRUE,
|
||||
additionalPropertiesSchema = Object.class
|
||||
)
|
||||
)
|
||||
) @Body MultipartBody inputs
|
||||
) {
|
||||
Optional<Execution> execution = executionRepository.findById(tenantService.resolveTenant(), executionId);
|
||||
if (execution.isEmpty()) {
|
||||
@@ -1239,22 +1270,22 @@ public class ExecutionController {
|
||||
@ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = BulkResponse.class))})
|
||||
@ApiResponse(responseCode = "422", description = "Changed state with errors", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
|
||||
public HttpResponse<?> updateExecutionsStatusByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
@Parameter(description = "The new state of the executions") @NotNull @QueryValue State.Type newStatus
|
||||
) throws QueueException {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
@@ -1302,7 +1333,7 @@ public class ExecutionController {
|
||||
if (execution.getState().isTerminated() && !isOnKillCascade) {
|
||||
throw new IllegalStateException("Execution is already finished, can't kill it");
|
||||
}
|
||||
|
||||
|
||||
eventPublisher.publishEvent(CrudEvent.of(execution, execution.withState(State.Type.KILLING)));
|
||||
killQueue.emit(ExecutionKilledExecution
|
||||
.builder()
|
||||
@@ -1512,22 +1543,22 @@ public class ExecutionController {
|
||||
@Post(uri = "/resume/by-query")
|
||||
@Operation(tags = {"Executions"}, summary = "Resume executions filter by query parameters")
|
||||
public HttpResponse<?> resumeExecutionsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
) throws Exception {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
filters,
|
||||
@@ -1621,22 +1652,22 @@ public class ExecutionController {
|
||||
@Post(uri = "/pause/by-query")
|
||||
@Operation(tags = {"Executions"}, summary = "Pause executions filter by query parameters")
|
||||
public HttpResponse<?> pauseExecutionsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
) throws Exception {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
filters,
|
||||
@@ -1665,22 +1696,22 @@ public class ExecutionController {
|
||||
@Delete(uri = "/kill/by-query")
|
||||
@Operation(tags = {"Executions"}, summary = "Kill executions filter by query parameters")
|
||||
public HttpResponse<?> killExecutionsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
) throws QueueException {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
filters,
|
||||
@@ -1709,22 +1740,22 @@ public class ExecutionController {
|
||||
@Post(uri = "/replay/by-query")
|
||||
@Operation(tags = {"Executions"}, summary = "Create new executions from old ones filter by query parameters. Keep the flow revision")
|
||||
public HttpResponse<?> replayExecutionsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
|
||||
@Parameter(description = "If latest revision should be used") @Nullable @QueryValue(defaultValue = "false") Boolean latestRevision
|
||||
) throws Exception {
|
||||
@@ -1800,7 +1831,17 @@ public class ExecutionController {
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Get(uri = "/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM)
|
||||
@Operation(tags = {"Executions"}, summary = "Follow an execution")
|
||||
@Operation(
|
||||
tags = {"Executions"},
|
||||
summary = "Follow an execution",
|
||||
extensions = @Extension(
|
||||
name = "x-sdk-customization",
|
||||
properties = {
|
||||
@ExtensionProperty(name = "x-replace-follow-execution", value = "true"),
|
||||
@ExtensionProperty(name = "x-skipped", value = "true")
|
||||
}
|
||||
)
|
||||
)
|
||||
public Flux<Event<Execution>> followExecution(
|
||||
@Parameter(description = "The execution id") @PathVariable String executionId
|
||||
) {
|
||||
@@ -2017,22 +2058,22 @@ public class ExecutionController {
|
||||
@Post(uri = "/labels/by-query")
|
||||
@Operation(tags = {"Executions"}, summary = "Set label on executions filter by query parameters")
|
||||
public HttpResponse<?> setLabelsOnTerminatedExecutionsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
|
||||
@RequestBody(description = "The labels to add to the execution") @Body @NotNull @Valid List<Label> setLabels
|
||||
) {
|
||||
@@ -2134,22 +2175,22 @@ public class ExecutionController {
|
||||
@Post(uri = "/unqueue/by-query")
|
||||
@Operation(tags = {"Executions"}, summary = "Unqueue executions filter by query parameters")
|
||||
public HttpResponse<?> unqueueExecutionsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
|
||||
@Parameter(description = "The new state of the unqueued executions") @Nullable @QueryValue State.Type newState
|
||||
) throws Exception {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
@@ -2248,22 +2289,22 @@ public class ExecutionController {
|
||||
@Post(uri = "/force-run/by-query")
|
||||
@Operation(tags = {"Executions"}, summary = "Force run executions filter by query parameters")
|
||||
public HttpResponse<?> forceRunExecutionsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
|
||||
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
|
||||
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
|
||||
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
|
||||
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
|
||||
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
|
||||
}) @Nullable @QueryValue Duration timeRange,
|
||||
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
|
||||
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
|
||||
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
|
||||
) throws Exception {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
filters,
|
||||
@@ -2338,7 +2379,17 @@ public class ExecutionController {
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Get(uri = "/{executionId}/follow-dependencies", produces = MediaType.TEXT_EVENT_STREAM)
|
||||
@Operation(tags = {"Executions"}, summary = "Follow all execution dependencies executions")
|
||||
@Operation(
|
||||
tags = {"Executions"},
|
||||
summary = "Follow all execution dependencies executions",
|
||||
extensions = @Extension(
|
||||
name = "x-sdk-customization",
|
||||
properties = {
|
||||
@ExtensionProperty(name = "x-replace-follow-dependencies-execution", value = "true"),
|
||||
@ExtensionProperty(name = "x-skipped", value = "true")
|
||||
}
|
||||
)
|
||||
)
|
||||
public Flux<Event<ExecutionStatusEvent>> followDependenciesExecutions(
|
||||
@Parameter(description = "The execution id") @PathVariable String executionId,
|
||||
@Parameter(description = "If true, list only destination dependencies, otherwise list also source dependencies") @QueryValue(defaultValue = "false") boolean destinationOnly,
|
||||
|
||||
@@ -46,6 +46,7 @@ import io.micronaut.validation.Validated;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.enums.ParameterIn;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.parameters.RequestBody;
|
||||
@@ -163,7 +164,7 @@ public class FlowController {
|
||||
@Get(uri = "{namespace}/{id}")
|
||||
@Operation(tags = {"Flows"}, summary = "Get a flow")
|
||||
@Schema(
|
||||
oneOf = {FlowWithSource.class, Flow.class}
|
||||
implementation = FlowWithSource.class
|
||||
)
|
||||
//FIXME we return Object instead of Flow as Micronaut, since 4, has an issue with subtypes serialization, see https://github.com/micronaut-projects/micronaut-core/issues/10294.
|
||||
public Object getFlow(
|
||||
@@ -222,7 +223,7 @@ public class FlowController {
|
||||
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
|
||||
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
|
||||
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
|
||||
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat() List<QueryFilter> filters,
|
||||
// Deprecated params
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
|
||||
@@ -277,7 +278,7 @@ public class FlowController {
|
||||
*/
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Post(consumes = MediaType.ALL)
|
||||
@Operation(tags = {"Flows"}, summary = "Create a flow from json object", deprecated = true)
|
||||
@Operation(tags = {"Flows"}, summary = "Create a flow from json object", deprecated = true, hidden = true)
|
||||
@Deprecated(forRemoval = true, since = "0.18")
|
||||
@Hidden // we hide it otherwise this is the one that will be included in the OpenAPI spec instead of the YAML one.
|
||||
public HttpResponse<Flow> createFlowFromJson(
|
||||
@@ -334,7 +335,8 @@ public class FlowController {
|
||||
summary = "Update a complete namespace from json object",
|
||||
description = "All flow will be created / updated for this namespace.\n" +
|
||||
"Flow that already created but not in `flows` will be deleted if the query delete is `true`",
|
||||
deprecated = true
|
||||
deprecated = true,
|
||||
hidden = true
|
||||
)
|
||||
@Deprecated(forRemoval = true, since = "0.18")
|
||||
@Hidden // we hide it otherwise this is the one that will be included in the OpenAPI spec instead of the YAML one.
|
||||
@@ -437,7 +439,7 @@ public class FlowController {
|
||||
|
||||
@Put(uri = "{namespace}/{id}", consumes = MediaType.APPLICATION_YAML)
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Operation(tags = {"Flows"}, summary = "Update a flow")
|
||||
@Operation(tags = {"Flows"}, summary = "Update a flow")// force deprecated = false otherwise it is marked as deprecated, dont know why
|
||||
@ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = FlowWithSource.class))})
|
||||
public HttpResponse<FlowWithSource> updateFlow(
|
||||
@Parameter(description = "The flow namespace") @PathVariable String namespace,
|
||||
@@ -476,9 +478,9 @@ public class FlowController {
|
||||
/**
|
||||
* @deprecated use {@link #updateFlow(String, String, String)} instead
|
||||
*/
|
||||
@Put(uri = "{namespace}/{id}", consumes = MediaType.ALL)
|
||||
@Put(uri = "{namespace}/{id}", consumes = MediaType.APPLICATION_JSON)
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Operation(tags = {"Flows"}, summary = "Update a flow", deprecated = true)
|
||||
@Operation(tags = {"Flows"}, operationId = "updateFlowFromJson", summary = "Update a flow", deprecated = true, hidden = true)
|
||||
@Deprecated(forRemoval = true, since = "0.18")
|
||||
@Hidden // we hide it otherwise this is the one that will be included in the OpenAPI spec instead of the JSON one.
|
||||
public HttpResponse<Flow> updateFlowFromJson(
|
||||
@@ -666,7 +668,7 @@ public class FlowController {
|
||||
@Post(uri = "/validate/task", consumes = MediaType.APPLICATION_YAML)
|
||||
@Operation(tags = {"Flows"}, summary = "Validate a task")
|
||||
public ValidateConstraintViolation validateTask(
|
||||
@RequestBody(description = "A task definition that can be from tasks or triggers") @Body String task,
|
||||
@RequestBody(description = "A task definition that can be from tasks or triggers") @Schema(implementation = Object.class) @Body String task,
|
||||
@Parameter(description = "The type of task") @QueryValue TaskValidationType section
|
||||
) {
|
||||
ValidateConstraintViolation.ValidateConstraintViolationBuilder<?, ?> validateConstraintViolationBuilder = ValidateConstraintViolation.builder();
|
||||
@@ -703,12 +705,12 @@ public class FlowController {
|
||||
summary = "Export flows as a ZIP archive of yaml sources."
|
||||
)
|
||||
public HttpResponse<byte[]> exportFlowsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat() List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the flows to include") @Nullable @QueryValue List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
|
||||
) throws IOException {
|
||||
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
|
||||
|
||||
@@ -741,12 +743,12 @@ public class FlowController {
|
||||
summary = "Delete flows returned by the query parameters."
|
||||
)
|
||||
public HttpResponse<BulkResponse> deleteFlowsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the flows to include") @Nullable @QueryValue List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
|
||||
) {
|
||||
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
|
||||
|
||||
@@ -784,12 +786,12 @@ public class FlowController {
|
||||
summary = "Disable flows returned by the query parameters."
|
||||
)
|
||||
public HttpResponse<BulkResponse> disableFlowsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat() List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the flows to include") @Nullable @QueryValue List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
|
||||
) {
|
||||
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
|
||||
|
||||
@@ -816,12 +818,12 @@ public class FlowController {
|
||||
summary = "Enable flows returned by the query parameters."
|
||||
)
|
||||
public HttpResponse<BulkResponse> enableFlowsByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat() List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the flows to include") @Nullable @QueryValue List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
|
||||
) {
|
||||
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
|
||||
|
||||
|
||||
@@ -22,8 +22,8 @@ import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.parameters.RequestBody;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.io.*;
|
||||
import java.time.*;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
|
||||
@Validated
|
||||
@@ -91,7 +91,7 @@ public class KVController {
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Put(uri = "{key}", consumes = {MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN})
|
||||
@Put(uri = "{key}", consumes = {MediaType.TEXT_PLAIN})
|
||||
@Operation(tags = {"KV"}, summary = "Puts a key-value pair in store")
|
||||
public void setKeyValue(
|
||||
HttpHeaders httpHeaders,
|
||||
|
||||
@@ -27,6 +27,7 @@ import io.micronaut.scheduling.annotation.ExecuteOn;
|
||||
import io.micronaut.validation.Validated;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.enums.ParameterIn;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import org.slf4j.event.Level;
|
||||
@@ -66,7 +67,7 @@ public class LogController {
|
||||
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
|
||||
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
|
||||
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
|
||||
@Parameter(description = "Filters") @Nullable @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @Nullable @QueryFilterFormat List<QueryFilter> filters,
|
||||
// Deprecated params
|
||||
@Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Parameter(description = "A namespace filter prefix",deprecated = true) @Nullable @QueryValue String namespace,
|
||||
|
||||
@@ -19,6 +19,7 @@ import io.micronaut.scheduling.annotation.ExecuteOn;
|
||||
import io.micronaut.validation.Validated;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.enums.ParameterIn;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -44,7 +45,7 @@ public class NamespaceSecretController {
|
||||
@Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page,
|
||||
@Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size,
|
||||
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort,
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters
|
||||
) throws IllegalArgumentException, IOException {
|
||||
final String tenantId = this.tenantService.resolveTenant();
|
||||
List<String> items = secretService.inheritedSecrets(tenantId, namespace).get(namespace).stream().toList();
|
||||
|
||||
@@ -20,6 +20,7 @@ import io.micronaut.scheduling.TaskExecutors;
|
||||
import io.micronaut.scheduling.annotation.ExecuteOn;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.enums.ParameterIn;
|
||||
import io.swagger.v3.oas.annotations.media.ExampleObject;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.constraints.Min;
|
||||
@@ -44,7 +45,7 @@ public class TaskRunController {
|
||||
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
|
||||
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
|
||||
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
// Deprecated params
|
||||
@Parameter(description = "A string filter",deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
|
||||
@@ -32,6 +32,7 @@ import io.micronaut.scheduling.annotation.ExecuteOn;
|
||||
import io.micronaut.validation.Validated;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.enums.ParameterIn;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Min;
|
||||
@@ -82,7 +83,7 @@ public class TriggerController {
|
||||
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
|
||||
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
|
||||
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
// Deprecated params
|
||||
@Parameter(description = "A string filter",deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
@@ -205,10 +206,10 @@ public class TriggerController {
|
||||
@Post(uri = "/unlock/by-query")
|
||||
@Operation(tags = {"Triggers"}, summary = "Unlock triggers by query parameters")
|
||||
public MutableHttpResponse<?> unlockTriggersByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace
|
||||
) {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
filters,
|
||||
@@ -280,13 +281,13 @@ public class TriggerController {
|
||||
if (abstractTrigger == null) {
|
||||
throw new HttpStatusException(HttpStatus.NOT_FOUND, String.format("Flow %s has no trigger %s", newTrigger.getFlowId(), newTrigger.getTriggerId()));
|
||||
}
|
||||
|
||||
|
||||
if (abstractTrigger instanceof RealtimeTriggerInterface) {
|
||||
throw new IllegalArgumentException("Realtime triggers can not be updated through the API, please edit the trigger from the flow.");
|
||||
}
|
||||
|
||||
|
||||
Trigger updatedTrigger;
|
||||
|
||||
|
||||
if (newTrigger.getBackfill() != null) {
|
||||
try {
|
||||
updatedTrigger = setTriggerBackfill(newTrigger, maybeFlow.get(), abstractTrigger);
|
||||
@@ -296,13 +297,13 @@ public class TriggerController {
|
||||
} else {
|
||||
updatedTrigger = setTriggerDisabled(newTrigger.uid(), newTrigger.getDisabled(), abstractTrigger, maybeFlow.get());
|
||||
}
|
||||
|
||||
|
||||
if (updatedTrigger == null) {
|
||||
return HttpResponse.notFound();
|
||||
}
|
||||
return HttpResponse.ok(updatedTrigger);
|
||||
}
|
||||
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Post(uri = "/{namespace}/{flowId}/{triggerId}/restart")
|
||||
@Operation(tags = {"Triggers"}, summary = "Restart a trigger")
|
||||
@@ -369,10 +370,10 @@ public class TriggerController {
|
||||
@Post(uri = "/backfill/pause/by-query")
|
||||
@Operation(tags = {"Triggers"}, summary = "Pause backfill for given triggers")
|
||||
public MutableHttpResponse<?> pauseBackfillByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace
|
||||
) throws QueueException {
|
||||
// Updating the backfill within the flux does not works
|
||||
List<Trigger> triggers = triggerRepository
|
||||
@@ -408,10 +409,10 @@ public class TriggerController {
|
||||
@Post(uri = "/backfill/unpause/by-query")
|
||||
@Operation(tags = {"Triggers"}, summary = "Unpause backfill for given triggers")
|
||||
public MutableHttpResponse<?> unpauseBackfillByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace
|
||||
) throws QueueException {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
filters,
|
||||
@@ -477,10 +478,10 @@ public class TriggerController {
|
||||
@Post(uri = "/backfill/delete/by-query")
|
||||
@Operation(tags = {"Triggers"}, summary = "Delete backfill for given triggers")
|
||||
public MutableHttpResponse<?> deleteBackfillByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace
|
||||
) throws QueueException {
|
||||
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
|
||||
filters,
|
||||
@@ -521,10 +522,10 @@ public class TriggerController {
|
||||
@Post(uri = "/set-disabled/by-query")
|
||||
@Operation(tags = {"Triggers"}, summary = "Disable/enable triggers by query parameters")
|
||||
public MutableHttpResponse<?> disabledTriggersByQuery(
|
||||
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
|
||||
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
|
||||
|
||||
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
|
||||
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
|
||||
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
|
||||
|
||||
@Parameter(description = "The disabled state") @QueryValue(defaultValue = "true") Boolean disabled
|
||||
) throws QueueException {
|
||||
@@ -557,24 +558,24 @@ public class TriggerController {
|
||||
|
||||
public void setTriggerDisabled(Trigger trigger, Boolean disabled) throws QueueException {
|
||||
Optional<Flow> maybeFlow = this.flowRepository.findById(this.tenantService.resolveTenant(), trigger.getNamespace(), trigger.getFlowId());
|
||||
|
||||
|
||||
if (maybeFlow.isEmpty()) {
|
||||
return; // Flow doesn't exist
|
||||
}
|
||||
|
||||
|
||||
Optional<AbstractTrigger> maybeAbstractTrigger = maybeFlow.flatMap(flow -> flow.getTriggers().stream().filter(t -> t.getId().equals(trigger.getTriggerId())).findFirst());
|
||||
|
||||
|
||||
if (maybeAbstractTrigger.isEmpty()) {
|
||||
return; // Trigger doesn't exist
|
||||
}
|
||||
|
||||
|
||||
if (maybeAbstractTrigger.get() instanceof RealtimeTriggerInterface) {
|
||||
return; // RealTimeTriggers can't be disabled/enabled through API.
|
||||
}
|
||||
|
||||
|
||||
setTriggerDisabled(trigger.uid(), disabled, maybeAbstractTrigger.get(), maybeFlow.get());
|
||||
}
|
||||
|
||||
|
||||
private Trigger setTriggerDisabled(String triggerUID, Boolean disabled, AbstractTrigger triggerDefinition, Flow flow) throws QueueException {
|
||||
return this.triggerRepository.lock(triggerUID, throwFunction(current -> {
|
||||
if (disabled.equals(current.getDisabled())) {
|
||||
@@ -583,46 +584,46 @@ public class TriggerController {
|
||||
return doSetTriggerDisabled(current, disabled, flow, triggerDefinition);
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
private Trigger setTriggerBackfill(Trigger newTrigger, Flow flow, AbstractTrigger abstractTrigger) throws Exception {
|
||||
return this.triggerRepository.lock(newTrigger.uid(), throwFunction(current -> doSetTriggerBackfill(current, newTrigger.getBackfill(), flow, abstractTrigger)));
|
||||
}
|
||||
|
||||
|
||||
protected Trigger doSetTriggerDisabled(Trigger currentState, Boolean disabled, Flow flow, AbstractTrigger trigger) throws QueueException {
|
||||
Trigger.TriggerBuilder<?, ?> builder = currentState.toBuilder().disabled(disabled);
|
||||
|
||||
|
||||
if (disabled) {
|
||||
builder = builder.nextExecutionDate(null);
|
||||
}
|
||||
|
||||
|
||||
Trigger updated = builder.build();
|
||||
triggerQueue.emit(updated);
|
||||
return updated;
|
||||
}
|
||||
|
||||
|
||||
protected Trigger doSetTriggerBackfill(Trigger currentState, Backfill backfill, Flow flow, AbstractTrigger trigger) throws Exception {
|
||||
Trigger updated;
|
||||
ZonedDateTime nextExecutionDate = null;
|
||||
|
||||
|
||||
RunContext runContext = runContextFactory.of(flow, trigger);
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||
|
||||
|
||||
// We must set up the backfill before the update to calculate the next execution date
|
||||
updated = currentState.withBackfill(backfill);
|
||||
|
||||
|
||||
if (trigger instanceof PollingTriggerInterface pollingTriggerInterface) {
|
||||
nextExecutionDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.of(updated));
|
||||
}
|
||||
|
||||
|
||||
updated = updated
|
||||
.toBuilder()
|
||||
.nextExecutionDate(nextExecutionDate)
|
||||
.build();
|
||||
|
||||
|
||||
triggerQueue.emit(updated);
|
||||
return updated;
|
||||
}
|
||||
|
||||
|
||||
public int backfillsAction(List<Trigger> triggers, BACKFILL_ACTION action) throws QueueException {
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
triggers.forEach(throwConsumer(trigger -> {
|
||||
|
||||
@@ -686,7 +686,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
|
||||
assertThat(restartedExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNull();
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
|
||||
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
||||
});
|
||||
},
|
||||
@@ -700,7 +700,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
assertThat(finishedRestartedExecution.getTaskRunList().getFirst().getAttempts().size()).isEqualTo(1);
|
||||
assertThat(finishedRestartedExecution.getTaskRunList().get(1).getAttempts().size()).isEqualTo(1);
|
||||
assertThat(finishedRestartedExecution.getTaskRunList().get(2).getAttempts()).isNull();
|
||||
assertThat(finishedRestartedExecution.getTaskRunList().get(2).getAttempts()).isNotNull();
|
||||
assertThat(finishedRestartedExecution.getTaskRunList().get(2).getState().getHistories().stream().filter(state -> state.getState() == State.Type.PAUSED).count()).isEqualTo(1L);
|
||||
assertThat(finishedRestartedExecution.getTaskRunList().get(3).getAttempts().size()).isEqualTo(2);
|
||||
assertThat(finishedRestartedExecution.getTaskRunList().get(4).getAttempts().size()).isEqualTo(1);
|
||||
|
||||
@@ -163,16 +163,16 @@ class KVControllerTest {
|
||||
|
||||
static Stream<Arguments> kvSetKeyValueArgs() {
|
||||
return Stream.of(
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "{\"hello\":\"world\"}", Map.class),
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "[\"hello\",\"world\"]", List.class),
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "\"hello\"", String.class),
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "1", Integer.class),
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "1.0", BigDecimal.class),
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "true", Boolean.class),
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "false", Boolean.class),
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "2021-09-01", LocalDate.class),
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "2021-09-01T01:02:03Z", Instant.class),
|
||||
Arguments.of(MediaType.APPLICATION_JSON, "\"PT5S\"", Duration.class)
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "{\"hello\":\"world\"}", Map.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "[\"hello\",\"world\"]", List.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "\"hello\"", String.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "1", Integer.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "1.0", BigDecimal.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "true", Boolean.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "false", Boolean.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "2021-09-01", LocalDate.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "2021-09-01T01:02:03Z", Instant.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "\"PT5S\"", Duration.class)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -256,7 +256,7 @@ class KVControllerTest {
|
||||
assertThat(httpClientResponseException.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
assertThat(httpClientResponseException.getMessage()).isEqualTo(expectedErrorMessage);
|
||||
|
||||
httpClientResponseException = Assertions.assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/bad$key", "\"content\"").contentType(MediaType.APPLICATION_JSON)));
|
||||
httpClientResponseException = Assertions.assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/bad$key", "\"content\"").contentType(MediaType.TEXT_PLAIN)));
|
||||
assertThat(httpClientResponseException.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
assertThat(httpClientResponseException.getMessage()).isEqualTo(expectedErrorMessage);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user