Compare commits

...

39 Commits

Author SHA1 Message Date
github-actions[bot]
8567ff5490 chore(version): update to version '1.0.13' 2025-11-18 13:10:22 +00:00
Loïc Mathieu
050e22dd09 fix(execution): use jdbcRepository.findOne to be tolerant of multiple results
It uses findAny() under the cover which does not throw if more than one result is returned.

Fixes #12943
2025-11-18 10:23:39 +01:00
Florian Hussonnois
3552eeefbb fix(scheduler): mysql convert 'now' to UTC to avoid any offset error on next_execution_date
Fixed a previous commit to only apply the change for MySQL

Related-to: kestra-io/kestra-ee#5611
2025-11-18 09:58:01 +01:00
Loïc Mathieu
2e47fb8285 fix(core): compilation issue 2025-11-17 15:07:33 +01:00
Piyush Bhaskar
b52a07e562 fix(core): add resize observer for editor container (#12991) 2025-11-17 14:00:19 +05:30
Loïc Mathieu
3f7c01db41 fix(flow): flow trigger with both conditions and preconditions
When a flow have both a condition and a precondition, the condition was evaluated twice which lead to double execution triggered.

Fixes
2025-11-14 18:11:24 +01:00
MilosPaunovic
f5dbec96e0 chore(core): count only direct dependencies for badge number
Closes https://github.com/kestra-io/kestra/issues/12817.
2025-11-14 08:20:14 +01:00
github-actions[bot]
fe7a6d9af9 chore(version): update to version '1.0.12' 2025-11-13 13:42:02 +00:00
Loïc Mathieu
06c8c35061 fix(flow): don't URLEncode the fileName inside the Download task
Also provide a `fileName` property that when set would override any filename from the content disposition in case it causes issues.
2025-11-13 11:12:18 +01:00
Loïc Mathieu
8f23e813f2 fix(system): consume the trigger queue so it is properly cleaned
Fixes https://github.com/kestra-io/kestra/issues/11671
2025-11-13 11:12:02 +01:00
Piyush Bhaskar
47b7c7cd2e fix(core): adjust overflow behavior (#12879) 2025-11-13 13:59:42 +05:30
Loïc Mathieu
aca7c2f694 fix(system): access log configuration
Due to a change in the configuration file, access log configuration was in the wrong sub-document.

Fixes https://github.com/kestra-io/kestra-ee/issues/5670
2025-11-12 15:09:06 +01:00
mustafatarek
a0f29b7d5d feat(core): add attempts for flowable tasks 2025-11-12 09:35:31 +01:00
Piyush Bhaskar
0176c8c101 fix(secrets): NS update for a secret should be disabled properly with correct prop (#12834) 2025-11-12 12:41:55 +05:30
YannC
b0036bbfca fix: where prop can be null (#12828) 2025-11-10 18:41:22 +01:00
github-actions[bot]
fad5edbde8 chore(version): update to version '1.0.11' 2025-11-10 14:35:23 +00:00
Loïc Mathieu
f125f63ae5 fix(executions): allow reading from subflow even if we have a parent
This fixes an issue where you cannot read from a Subflow file if the execution has iteself be triggered by another Subflow task.
It was caused by the trigger check beeing too aggressive, if it didn't pass the check it fail instead of return false so the other check would not be processed.

Fixes #12629
2025-11-10 13:27:06 +01:00
Florian Hussonnois
6db1bfb2ce fix(core): fix plugin stable version resolution (kestra-io/kestra-ee#5129)
Rename incremental field to patch

Fixes: kestra-io/kestra-ee#5129
2025-11-10 11:05:40 +01:00
Florian Hussonnois
0957e07c78 fix(plugins): remove regex validation on version property
Changes:
* Fixes stable method in Version class
* Remove regex validation on 'version' property

Related-to: kestra-io/kestra-ee#5090
2025-11-10 11:05:39 +01:00
Florian Hussonnois
5a4a5e44df fix(core): resolution of plugin must be done with a stable version 2025-11-10 11:05:39 +01:00
Florian Hussonnois
faee3f1827 fix(core): fix PluginCatalogService resolve method 2025-11-10 11:05:39 +01:00
Florian Hussonnois
3604762da0 fix(system): add resolveVersions method to PluginCatalogService
Related-to: kestra-io/kestra-ee#5171
2025-11-10 11:05:38 +01:00
YannC
6ceb0de1d5 fix: when removing a queued execution, directly delete instead of fetching then delete to reduce deadlock (#12789) 2025-11-10 10:32:23 +01:00
Loïc Mathieu
4a62f9c818 fix(executions): don't urlencode files as they would already be inside the storage 2025-11-10 09:28:30 +01:00
brian-mulier-p
d14f3e3317 fix(tests): bump amount of threads on tests (#12777) 2025-11-07 09:44:44 +01:00
Piyush Bhaskar
7e9030dfcf refactor(core): properly do trigger filter (#12780) 2025-11-07 11:46:23 +05:30
Ludovic DEHON
2fce17a8a9 feat(cli): add --flow-path on executor to preload some flows
close kestra-io/kestra-ee#5721
2025-11-06 19:26:04 +01:00
Loïc Mathieu
67d8509106 fix(system): killing a paused flow should kill the Pause task attempt
Fixes #12421
2025-11-06 15:34:19 +01:00
Piyush Bhaskar
01e92a6d79 Revert "fix(core): apply timeRange filter in triggers (#12721)" 2025-11-06 19:07:27 +05:30
Piyush Bhaskar
883b7c8610 fix(core): apply timeRange filter in triggers (#12721) 2025-11-06 16:31:48 +05:30
Piyush Bhaskar
11ef823567 fix(core): remove double info icon (#12623) 2025-11-06 11:54:07 +05:30
Loïc Mathieu
771cca1441 fix(system): trigger an execution once per condition on flow triggers
Fixes #12560
2025-11-05 15:33:44 +01:00
YannC.
53e8674dfc fix: set FlowWithSource as implementation for getFlow method 2025-11-04 16:14:51 +01:00
github-actions[bot]
59016ae1af chore(version): update to version '1.0.10' 2025-11-04 13:52:28 +00:00
Roman Acevedo
7503d6fa21 test: set retryWithFlowableErrors as FlakyTest 2025-11-04 13:46:49 +01:00
Roman Acevedo
0234a4c64c test(kv): only plain text header is sent now 2025-11-04 13:15:36 +01:00
Roman Acevedo
98c9c4d21f Fix/sdk changes (#12411)
* fix: kv controller remove namespace check

* clean(API): add query to filter parameter

* fix: flow update not deprecated

* clean(API): add deprecated on open api

* feat: executions annotations for skipping, follow method generation in sdk

* feat: add typing indication to validateTask

* fix(flowController): set correct hidden for json method in

* fix: optional params in delete executions endpoints

* fix: inputs/outputs as object

* change KV schema type to be object

* add back , deprecated = false on flow update, otherwise its marked as deprecated

* Revert "add back , deprecated = false on flow update, otherwise its marked as deprecated"

This reverts commit 3772404b68f14f0a80af9e0adb9952d58e9102b4.

* feat(API): add multipart to openAPI

* feat(API): add multipart to openAPI

* fix: only use plain-text for setKeyValue endpoint

* fix: KV command test

* chore: add multipart vendor annotations for custom generation on SDK

---------

Co-authored-by: YannC. <ycoornaert@kestra.io>
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-11-03 18:05:46 +01:00
github-actions[bot]
8e54183a44 chore(version): update to version '1.0.9' 2025-11-03 11:11:56 +00:00
github-actions[bot]
8aa332c629 chore(core): localize to languages other than english (#12550)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-11-03 10:22:11 +01:00
68 changed files with 2857 additions and 645 deletions

View File

@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
Duration ttl = expiration == null ? null : Duration.parse(expiration);
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
.contentType(MediaType.APPLICATION_JSON_TYPE);
.contentType(MediaType.TEXT_PLAIN);
if (ttl != null) {
request.header("ttl", ttl.toString());

View File

@@ -1,7 +1,9 @@
package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
@@ -10,6 +12,8 @@ import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -19,6 +23,9 @@ import java.util.Map;
description = "Start the Kestra executor"
)
public class ExecutorCommand extends AbstractServerCommand {
@CommandLine.Spec
CommandLine.Model.CommandSpec spec;
@Inject
private ApplicationContext applicationContext;
@@ -28,22 +35,28 @@ public class ExecutorCommand extends AbstractServerCommand {
@Inject
private StartExecutorService startExecutorService;
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "The list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "Tenant identifier required to load flows from the specified path")
private File flowPath;
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path")
private String tenantId;
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "List of execution IDs to skip, separated by commas; for troubleshooting only")
private List<String> skipExecutions = Collections.emptyList();
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "The list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "List of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting only")
private List<String> skipFlows = Collections.emptyList();
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "The list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "List of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting only")
private List<String> skipNamespaces = Collections.emptyList();
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "The list of tenants to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "List of tenants to skip, separated by a coma; for troubleshooting only")
private List<String> skipTenants = Collections.emptyList();
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "The list of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "List of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue; for debugging only")
private List<String> startExecutors = Collections.emptyList();
@CommandLine.Option(names = {"--not-start-executors"}, split=",", description = "The list of Kafka Stream executors to not start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
@CommandLine.Option(names = {"--not-start-executors"}, split=",", description = "Lst of Kafka Stream executors to not start, separated by a command. Use it only with the Kafka queue; for debugging only")
private List<String> notStartExecutors = Collections.emptyList();
@SuppressWarnings("unused")
@@ -64,6 +77,16 @@ public class ExecutorCommand extends AbstractServerCommand {
super.call();
if (flowPath != null) {
try {
LocalFlowRepositoryLoader localFlowRepositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
TenantIdSelectorService tenantIdSelectorService = applicationContext.getBean(TenantIdSelectorService.class);
localFlowRepositoryLoader.load(tenantIdSelectorService.getTenantId(this.tenantId), this.flowPath);
} catch (IOException e) {
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow path", e);
}
}
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
executorService.run();

View File

@@ -23,7 +23,7 @@ public class IndexerCommand extends AbstractServerCommand {
@Inject
private SkipExecutionService skipExecutionService;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
private List<String> skipIndexerRecords = Collections.emptyList();
@SuppressWarnings("unused")

View File

@@ -42,7 +42,7 @@ public class StandAloneCommand extends AbstractServerCommand {
@Nullable
private FileChangedEventListener fileWatcher;
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "the flow path containing flow to inject at startup (when running with a memory flow repository)")
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "Tenant identifier required to load flows from the specified path")
private File flowPath;
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
@@ -51,19 +51,19 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
private int workerThread = defaultWorkerThread();
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting only")
private List<String> skipExecutions = Collections.emptyList();
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting only")
private List<String> skipFlows = Collections.emptyList();
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting only")
private List<String> skipNamespaces = Collections.emptyList();
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting only")
private List<String> skipTenants = Collections.emptyList();
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
private List<String> skipIndexerRecords = Collections.emptyList();
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")

View File

@@ -40,7 +40,7 @@ public class WebServerCommand extends AbstractServerCommand {
@Option(names = {"--no-indexer"}, description = "Flag to disable starting an embedded indexer.")
private boolean indexerDisabled = false;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting only")
private List<String> skipIndexerRecords = Collections.emptyList();
@Override

View File

@@ -30,15 +30,15 @@ micronaut:
read-idle-timeout: 60m
write-idle-timeout: 60m
idle-timeout: 60m
netty:
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
max-chunk-size: 10MB
max-header-size: 32768 # increased from the default of 8k
responses:
file:
cache-seconds: 86400
cache-control:
public: true
netty:
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
max-chunk-size: 10MB
max-header-size: 32768 # increased from the default of 8k
# Access log configuration, see https://docs.micronaut.io/latest/guide/index.html#accessLogger
access-logger:

View File

@@ -1,16 +1,33 @@
package io.kestra.core.models;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Pattern;
import java.util.List;
import java.util.Map;
/**
* Interface that can be implemented by classes supporting plugin versioning.
*
* @see Plugin
*/
public interface PluginVersioning {
String TITLE = "Plugin Version";
String DESCRIPTION = """
Defines the version of the plugin to use.
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
@Schema(title = "The version of the plugin to use.")
The version must follow the Semantic Versioning (SemVer) specification:
- A single-digit MAJOR version (e.g., `1`).
- A MAJOR.MINOR version (e.g., `1.1`).
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
""";
@Schema(
title = TITLE,
description = DESCRIPTION
)
String getVersion();
}

View File

@@ -5,6 +5,8 @@ import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.filters.AbstractFilter;
import io.kestra.core.repositories.QueryBuilderInterface;
import io.kestra.plugin.core.dashboard.data.IData;
import jakarta.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -36,6 +38,8 @@ public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F
private Map<String, C> columns;
@Setter
@Valid
@Nullable
private List<AbstractFilter<F>> where;
private List<OrderBy> orderBy;

View File

@@ -28,6 +28,7 @@ import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -77,10 +78,12 @@ public class Execution implements DeletedInterface, TenantInterface {
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> inputs;
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> outputs;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@@ -88,6 +91,7 @@ public class Execution implements DeletedInterface, TenantInterface {
List<Label> labels;
@With
@Schema(implementation = Object.class)
Map<String, Object> variables;
@NotNull

View File

@@ -3,10 +3,13 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -53,6 +56,7 @@ public class TaskRun implements TenantInterface {
@With
@JsonInclude(JsonInclude.Include.ALWAYS)
@Nullable
@Schema(implementation = Object.class)
Variables outputs;
@NotNull

View File

@@ -1,5 +1,6 @@
package io.kestra.core.plugins;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Builder;
import java.io.File;
@@ -33,7 +34,7 @@ public record PluginArtifact(
String version,
URI uri
) implements Comparable<PluginArtifact> {
private static final Pattern ARTIFACT_PATTERN = Pattern.compile(
"([^: ]+):([^: ]+)(:([^: ]*)(:([^: ]+))?)?:([^: ]+)"
);
@@ -42,7 +43,8 @@ public record PluginArtifact(
);
public static final String JAR_EXTENSION = "jar";
public static final String KESTRA_GROUP_ID = "io.kestra";
/**
* Static helper method for constructing a new {@link PluginArtifact} from a JAR file.
*
@@ -135,6 +137,11 @@ public record PluginArtifact(
public String toString() {
return toCoordinates();
}
@JsonIgnore
public boolean isOfficial() {
return groupId.startsWith(KESTRA_GROUP_ID);
}
public String toCoordinates() {
return Stream.of(groupId, artifactId, extension, classifier, version)

View File

@@ -1,9 +1,13 @@
package io.kestra.core.plugins;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Version;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.HttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,9 +19,12 @@ import java.util.Base64;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Services for retrieving available plugin artifacts for Kestra.
@@ -39,6 +46,8 @@ public class PluginCatalogService {
private final boolean icons;
private final boolean oss;
private final Version currentStableVersion;
/**
* Creates a new {@link PluginCatalogService} instance.
@@ -53,11 +62,55 @@ public class PluginCatalogService {
this.httpClient = httpClient;
this.icons = icons;
this.oss = communityOnly;
Version version = Version.of(KestraContext.getContext().getVersion());
this.currentStableVersion = new Version(version.majorVersion(), version.minorVersion(), version.patchVersion(), null);
// Immediately trigger an async load of plugin artifacts.
this.isLoaded.set(true);
this.plugins = CompletableFuture.supplyAsync(this::load);
}
/**
* Resolves the version for the given artifacts.
*
* @param artifacts The list of artifacts to resolve.
* @return The list of results.
*/
public List<PluginResolutionResult> resolveVersions(List<PluginArtifact> artifacts) {
if (ListUtils.isEmpty(artifacts)) {
return List.of();
}
final Map<String, ApiPluginArtifact> pluginsByGroupAndArtifactId = getAllCompatiblePlugins().stream()
.collect(Collectors.toMap(it -> it.groupId() + ":" + it.artifactId(), Function.identity()));
return artifacts.stream().map(it -> {
// Get all compatible versions for current artifact
List<String> versions = Optional
.ofNullable(pluginsByGroupAndArtifactId.get(it.groupId() + ":" + it.artifactId()))
.map(ApiPluginArtifact::versions)
.orElse(List.of());
// Try to resolve the version
String resolvedVersion = null;
if (!versions.isEmpty()) {
if (it.version().equalsIgnoreCase("LATEST")) {
resolvedVersion = versions.getFirst();
} else {
resolvedVersion = versions.contains(it.version()) ? it.version() : null;
}
}
// Build the PluginResolutionResult
return new PluginResolutionResult(
it,
resolvedVersion,
versions,
resolvedVersion != null
);
}).toList();
}
public synchronized List<PluginManifest> get() {
try {
@@ -140,7 +193,27 @@ public class PluginCatalogService {
isLoaded.set(false);
}
}
private List<ApiPluginArtifact> getAllCompatiblePlugins() {
MutableHttpRequest<Object> request = HttpRequest.create(
HttpMethod.GET,
"/v1/plugins/artifacts/core-compatibility/" + currentStableVersion
);
if (oss) {
request.getParameters().add("license", "OPENSOURCE");
}
try {
return httpClient
.toBlocking()
.exchange(request, Argument.listOf(ApiPluginArtifact.class))
.body();
} catch (Exception e) {
log.debug("Failed to retrieve available plugins from Kestra API. Cause: ", e);
return List.of();
}
}
public record PluginManifest(
String title,
String icon,
@@ -153,4 +226,11 @@ public class PluginCatalogService {
return groupId + ":" + artifactId + ":LATEST";
}
}
public record ApiPluginArtifact(
String groupId,
String artifactId,
String license,
List<String> versions
) {}
}

View File

@@ -144,7 +144,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::asText).orElse(null);
if (type == null || type.isEmpty()) {
return null;

View File

@@ -82,8 +82,7 @@ public abstract class FilesService {
}
private static String resolveUniqueNameForFile(final Path path) {
String filename = path.getFileName().toString();
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
return IdUtils.from(path.toString()) + "-" + encodedFilename;
String filename = path.getFileName().toString().replace(' ', '+');
return IdUtils.from(path.toString()) + "-" + filename;
}
}

View File

@@ -151,10 +151,7 @@ abstract class AbstractFileFunction implements Function {
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);
if (!isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path)) {
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the parent execution");
}
return true;
return isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path);
}
return false;
}

View File

@@ -383,6 +383,7 @@ public class ExecutionService {
if (!isFlowable || s.equals(taskRunId)) {
TaskRun newTaskRun;
State.Type targetState = newState;
if (task instanceof Pause pauseTask) {
State.Type terminalState = newState == State.Type.RUNNING ? State.Type.SUCCESS : newState;
Pause.Resumed _resumed = resumed != null ? resumed : Pause.Resumed.now(terminalState);
@@ -392,23 +393,23 @@ public class ExecutionService {
// if it's a Pause task with no subtask, we terminate the task
if (ListUtils.isEmpty(pauseTask.getTasks()) && ListUtils.isEmpty(pauseTask.getErrors()) && ListUtils.isEmpty(pauseTask.getFinally())) {
if (newState == State.Type.RUNNING) {
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
targetState = State.Type.SUCCESS;
} else if (newState == State.Type.KILLING) {
newTaskRun = newTaskRun.withState(State.Type.KILLED);
} else {
newTaskRun = newTaskRun.withState(newState);
targetState = State.Type.KILLED;
}
} else {
// we should set the state to RUNNING so that subtasks are executed
newTaskRun = newTaskRun.withState(State.Type.RUNNING);
targetState = State.Type.RUNNING;
}
newTaskRun = newTaskRun.withState(targetState);
} else {
newTaskRun = originalTaskRun.withState(newState);
newTaskRun = originalTaskRun.withState(targetState);
}
if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {
ArrayList<TaskRunAttempt> attempts = new ArrayList<>(originalTaskRun.getAttempts());
attempts.set(attempts.size() - 1, attempts.getLast().withState(newState));
attempts.set(attempts.size() - 1, attempts.getLast().withState(targetState));
newTaskRun = newTaskRun.withAttempts(attempts);
}

View File

@@ -32,48 +32,84 @@ public class Version implements Comparable<Version> {
* @param version the version.
* @return a new {@link Version} instance.
*/
public static Version of(String version) {
public static Version of(final Object version) {
if (version.startsWith("v")) {
version = version.substring(1);
if (Objects.isNull(version)) {
throw new IllegalArgumentException("Invalid version, cannot parse null version");
}
String strVersion = version.toString();
if (strVersion.startsWith("v")) {
strVersion = strVersion.substring(1);
}
int qualifier = version.indexOf("-");
int qualifier = strVersion.indexOf("-");
final String[] versions = qualifier > 0 ?
version.substring(0, qualifier).split("\\.") :
version.split("\\.");
strVersion.substring(0, qualifier).split("\\.") :
strVersion.split("\\.");
try {
final int majorVersion = Integer.parseInt(versions[0]);
final int minorVersion = versions.length > 1 ? Integer.parseInt(versions[1]) : 0;
final int incrementalVersion = versions.length > 2 ? Integer.parseInt(versions[2]) : 0;
final Integer minorVersion = versions.length > 1 ? Integer.parseInt(versions[1]) : null;
final Integer incrementalVersion = versions.length > 2 ? Integer.parseInt(versions[2]) : null;
return new Version(
majorVersion,
minorVersion,
incrementalVersion,
qualifier > 0 ? version.substring(qualifier + 1) : null,
version
qualifier > 0 ? strVersion.substring(qualifier + 1) : null,
strVersion
);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid version, cannot parse '" + version + "'");
}
}
/**
* Static helper method for returning the most recent stable version for a current {@link Version}.
* Resolves the most appropriate stable version from a collection, based on a given input version.
* <p>
* The matching rules are:
* <ul>
* <li>If {@code from} specifies only a major version (e.g. {@code 1}), return the latest stable version
* with the same major (e.g. {@code 1.2.3}).</li>
* <li>If {@code from} specifies a major and minor version only (e.g. {@code 1.2}), return the latest
* stable version with the same major and minor (e.g. {@code 1.2.3}).</li>
* <li>If {@code from} specifies a full version with major, minor, and patch (e.g. {@code 1.2.2}),
* then only return it if it is exactly present (and stable) in {@code versions}.
* No "upgrade" is performed in this case.</li>
* <li>If no suitable version is found, returns {@code null}.</li>
* </ul>
*
* @param from the current version.
* @param versions the list of version.
*
* @return the last stable version.
* @param from the reference version (may specify only major, or major+minor, or major+minor+patch).
* @param versions the collection of candidate versions to resolve against.
* @return the best matching stable version, or {@code null} if none match.
*/
public static Version getStable(final Version from, final Collection<Version> versions) {
List<Version> compatibleVersions = versions.stream()
.filter(v -> v.majorVersion() == from.majorVersion() && v.minorVersion() == from.minorVersion())
.toList();
if (compatibleVersions.isEmpty()) return null;
return Version.getLatest(compatibleVersions);
// Case 1: "from" is only a major (e.g. 1)
if (from.hasOnlyMajor()) {
List<Version> sameMajor = versions.stream()
.filter(v -> v.majorVersion() == from.majorVersion())
.toList();
return sameMajor.isEmpty() ? null : Version.getLatest(sameMajor);
}
// Case 2: "from" is major+minor only (e.g. 1.2)
if (from.hasMajorAndMinorOnly()) {
List<Version> sameMinor = versions.stream()
.filter(v -> v.majorVersion() == from.majorVersion()
&& v.minorVersion() == from.minorVersion())
.toList();
return sameMinor.isEmpty() ? null : Version.getLatest(sameMinor);
}
// Case 3: "from" is full version (major+minor+patch)
if (versions.contains(from)) {
return from;
}
// No match
return null;
}
/**
@@ -123,8 +159,8 @@ public class Version implements Comparable<Version> {
}
private final int majorVersion;
private final int minorVersion;
private final int incrementalVersion;
private final Integer minorVersion;
private final Integer patchVersion;
private final Qualifier qualifier;
private final String originalVersion;
@@ -134,14 +170,14 @@ public class Version implements Comparable<Version> {
*
* @param majorVersion the major version (must be superior or equal to 0).
* @param minorVersion the minor version (must be superior or equal to 0).
* @param incrementalVersion the incremental version (must be superior or equal to 0).
* @param patchVersion the incremental version (must be superior or equal to 0).
* @param qualifier the qualifier.
*/
public Version(final int majorVersion,
final int minorVersion,
final int incrementalVersion,
final int patchVersion,
final String qualifier) {
this(majorVersion, minorVersion, incrementalVersion, qualifier, null);
this(majorVersion, minorVersion, patchVersion, qualifier, null);
}
/**
@@ -149,25 +185,25 @@ public class Version implements Comparable<Version> {
*
* @param majorVersion the major version (must be superior or equal to 0).
* @param minorVersion the minor version (must be superior or equal to 0).
* @param incrementalVersion the incremental version (must be superior or equal to 0).
* @param patchVersion the incremental version (must be superior or equal to 0).
* @param qualifier the qualifier.
* @param originalVersion the original string version.
*/
private Version(final int majorVersion,
final int minorVersion,
final int incrementalVersion,
private Version(final Integer majorVersion,
final Integer minorVersion,
final Integer patchVersion,
final String qualifier,
final String originalVersion) {
this.majorVersion = requirePositive(majorVersion, "major");
this.minorVersion = requirePositive(minorVersion, "minor");
this.incrementalVersion = requirePositive(incrementalVersion, "incremental");
this.patchVersion = requirePositive(patchVersion, "incremental");
this.qualifier = qualifier != null ? new Qualifier(qualifier) : null;
this.originalVersion = originalVersion;
}
private static int requirePositive(int version, final String message) {
if (version < 0) {
private static Integer requirePositive(Integer version, final String message) {
if (version != null && version < 0) {
throw new IllegalArgumentException(String.format("The '%s' version must super or equal to 0", message));
}
return version;
@@ -178,11 +214,11 @@ public class Version implements Comparable<Version> {
}
public int minorVersion() {
return minorVersion;
return minorVersion != null ? minorVersion : 0;
}
public int incrementalVersion() {
return incrementalVersion;
public int patchVersion() {
return patchVersion != null ? patchVersion : 0;
}
public Qualifier qualifier() {
@@ -197,9 +233,9 @@ public class Version implements Comparable<Version> {
if (this == o) return true;
if (!(o instanceof Version)) return false;
Version version = (Version) o;
return majorVersion == version.majorVersion &&
minorVersion == version.minorVersion &&
incrementalVersion == version.incrementalVersion &&
return Objects.equals(majorVersion,version.majorVersion) &&
Objects.equals(minorVersion, version.minorVersion) &&
Objects.equals(patchVersion,version.patchVersion) &&
Objects.equals(qualifier, version.qualifier);
}
@@ -208,7 +244,7 @@ public class Version implements Comparable<Version> {
*/
@Override
public int hashCode() {
return Objects.hash(majorVersion, minorVersion, incrementalVersion, qualifier);
return Objects.hash(majorVersion, minorVersion, patchVersion, qualifier);
}
/**
@@ -218,7 +254,7 @@ public class Version implements Comparable<Version> {
public String toString() {
if (originalVersion != null) return originalVersion;
String version = majorVersion + "." + minorVersion + "." + incrementalVersion;
String version = majorVersion + "." + minorVersion + "." + patchVersion;
return (qualifier != null) ? version +"-" + qualifier : version;
}
@@ -238,7 +274,7 @@ public class Version implements Comparable<Version> {
return compareMinor;
}
int compareIncremental = Integer.compare(that.incrementalVersion, this.incrementalVersion);
int compareIncremental = Integer.compare(that.patchVersion, this.patchVersion);
if (compareIncremental != 0) {
return compareIncremental;
}
@@ -253,6 +289,21 @@ public class Version implements Comparable<Version> {
return this.qualifier.compareTo(that.qualifier);
}
/**
* @return true if only major is specified (e.g. "1")
*/
private boolean hasOnlyMajor() {
return minorVersion == null && patchVersion == null;
}
/**
* @return true if major+minor are specified, but no patch (e.g. "1.2")
*/
private boolean hasMajorAndMinorOnly() {
return minorVersion != null && patchVersion == null;
}
/**
* Checks whether this version is before the given one.

View File

@@ -33,11 +33,13 @@ public class ExecutionsDataFilterValidator implements ConstraintValidator<Execut
}
});
executionsDataFilter.getWhere().forEach(filter -> {
if (filter.getField() == Executions.Fields.LABELS && filter.getLabelKey() == null) {
violations.add("Label filters must have a `labelKey`.");
}
});
if (executionsDataFilter.getWhere() != null) {
executionsDataFilter.getWhere().forEach(filter -> {
if (filter.getField() == Executions.Fields.LABELS && filter.getLabelKey() == null) {
violations.add("Label filters must have a `labelKey`.");
}
});
}
if (!violations.isEmpty()) {
context.disableDefaultConstraintViolation();

View File

@@ -20,8 +20,6 @@ import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@@ -60,7 +58,15 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
public class Download extends AbstractHttp implements RunnableTask<Download.Output> {
@Schema(title = "Should the task fail when downloading an empty file.")
@Builder.Default
private final Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
private Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
@Schema(
title = "Name of the file inside the output.",
description = """
If not provided, the filename will be extracted from the `Content-Disposition` header.
If no `Content-Disposition` header, a name would be generated."""
)
private Property<String> saveAs;
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
@@ -111,20 +117,22 @@ public class Download extends AbstractHttp implements RunnableTask<Download.Outp
}
}
String filename = null;
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
filename = filenameFromHeader(runContext, contentDisposition);
}
if (filename != null) {
filename = URLEncoder.encode(filename, StandardCharsets.UTF_8);
String rFilename = runContext.render(this.saveAs).as(String.class).orElse(null);
if (rFilename == null) {
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
rFilename = filenameFromHeader(runContext, contentDisposition);
if (rFilename != null) {
rFilename = rFilename.replace(' ', '+');
}
}
}
logger.debug("File '{}' downloaded with size '{}'", from, size);
return Output.builder()
.code(response.getStatus().getCode())
.uri(runContext.storage().putFile(tempFile, filename))
.uri(runContext.storage().putFile(tempFile, rFilename))
.headers(response.getHeaders().map())
.length(size.get())
.build();

View File

@@ -330,7 +330,7 @@ class ExecutionServiceTest {
assertThat(restart.findTaskRunByTaskIdAndValue("1_each", List.of()).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getHistories()).hasSize(4);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getAttempts()).isNull();
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
restart = executionService.markAs(execution, flow, execution.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getId(), State.Type.FAILED);
@@ -441,6 +441,7 @@ class ExecutionServiceTest {
assertThat(killed.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(killed.getState().getHistories()).hasSize(5);
}

View File

@@ -103,28 +103,28 @@ class FilesServiceTest {
var runContext = runContextFactory.of();
Path fileWithSpace = tempDir.resolve("with space.txt");
Path fileWithUnicode = tempDir.resolve("สวัสดี.txt");
Path fileWithUnicode = tempDir.resolve("สวัสดี&.txt");
Files.writeString(fileWithSpace, "content");
Files.writeString(fileWithUnicode, "content");
Path targetFileWithSpace = runContext.workingDir().path().resolve("with space.txt");
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี.txt");
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี&.txt");
Files.copy(fileWithSpace, targetFileWithSpace);
Files.copy(fileWithUnicode, targetFileWithUnicode);
Map<String, URI> outputFiles = FilesService.outputFiles(
runContext,
List.of("with space.txt", "สวัสดี.txt")
List.of("with space.txt", "สวัสดี&.txt")
);
assertThat(outputFiles).hasSize(2);
assertThat(outputFiles).containsKey("with space.txt");
assertThat(outputFiles).containsKey("สวัสดี.txt");
assertThat(outputFiles).containsKey("สวัสดี&.txt");
assertThat(runContext.storage().getFile(outputFiles.get("with space.txt"))).isNotNull();
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี.txt"))).isNotNull();
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี&.txt"))).isNotNull();
}
private URI createFile() throws IOException {

View File

@@ -109,33 +109,6 @@ public class FileSizeFunctionTest {
assertThat(size).isEqualTo(FILE_SIZE);
}
@Test
void shouldThrowIllegalArgumentException_givenTrigger_andParentExecution_andMissingNamespace() throws IOException {
String executionId = IdUtils.create();
URI internalStorageURI = getInternalStorageURI(executionId);
URI internalStorageFile = getInternalStorageFile(internalStorageURI);
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "subflow",
"namespace", NAMESPACE,
"tenantId", MAIN_TENANT),
"execution", Map.of("id", IdUtils.create()),
"trigger", Map.of(
"flowId", FLOW,
"executionId", executionId,
"tenantId", MAIN_TENANT
)
);
Exception ex = assertThrows(
IllegalArgumentException.class,
() -> variableRenderer.render("{{ fileSize('" + internalStorageFile + "') }}", variables)
);
assertTrue(ex.getMessage().startsWith("Unable to read the file"), "Exception message doesn't match expected one");
}
@Test
void returnsCorrectSize_givenUri_andCurrentExecution() throws IOException, IllegalVariableEvaluationException {
String executionId = IdUtils.create();

View File

@@ -256,6 +256,27 @@ class ReadFileFunctionTest {
assertThat(variableRenderer.render("{{ read(nsfile) }}", variables)).isEqualTo("Hello World");
}
@Test
void shouldReadChildFileEvenIfTrigger() throws IOException, IllegalVariableEvaluationException {
String namespace = "my.namespace";
String flowId = "flow";
String executionId = IdUtils.create();
URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "flow",
"namespace", "notme",
"tenantId", MAIN_TENANT),
"execution", Map.of("id", "notme"),
"trigger", Map.of("namespace", "notme", "flowId", "parent", "executionId", "parent")
);
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
assertThat(render).isEqualTo("Hello from a task output");
}
private URI createFile() throws IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), "Hello World".getBytes());

View File

@@ -5,8 +5,17 @@ import org.junit.jupiter.api.Test;
import java.util.List;
class VersionTest {
import static org.assertj.core.api.Assertions.assertThatObject;
import static org.hamcrest.MatcherAssert.assertThat;
class VersionTest {
@Test
void shouldCreateVersionFromIntegerGivenMajorVersion() {
Version version = Version.of(1);
Assertions.assertEquals(1, version.majorVersion());
}
@Test
void shouldCreateVersionFromStringGivenMajorVersion() {
Version version = Version.of("1");
@@ -21,27 +30,27 @@ class VersionTest {
}
@Test
void shouldCreateVersionFromStringGivenMajorMinorIncrementVersion() {
void shouldCreateVersionFromStringGivenMajorMinorPatchVersion() {
Version version = Version.of("1.2.3");
Assertions.assertEquals(1, version.majorVersion());
Assertions.assertEquals(2, version.minorVersion());
Assertions.assertEquals(3, version.incrementalVersion());
Assertions.assertEquals(3, version.patchVersion());
}
@Test
void shouldCreateVersionFromPrefixedStringGivenMajorMinorIncrementVersion() {
void shouldCreateVersionFromPrefixedStringGivenMajorMinorPatchVersion() {
Version version = Version.of("v1.2.3");
Assertions.assertEquals(1, version.majorVersion());
Assertions.assertEquals(2, version.minorVersion());
Assertions.assertEquals(3, version.incrementalVersion());
Assertions.assertEquals(3, version.patchVersion());
}
@Test
void shouldCreateVersionFromStringGivenMajorMinorIncrementAndQualifierVersion() {
void shouldCreateVersionFromStringGivenMajorMinorPatchAndQualifierVersion() {
Version version = Version.of("1.2.3-SNAPSHOT");
Assertions.assertEquals(1, version.majorVersion());
Assertions.assertEquals(2, version.minorVersion());
Assertions.assertEquals(3, version.incrementalVersion());
Assertions.assertEquals(3, version.patchVersion());
Assertions.assertEquals("SNAPSHOT", version.qualifier().toString());
}
@@ -50,7 +59,7 @@ class VersionTest {
Version version = Version.of("1.2.3-RC0-SNAPSHOT");
Assertions.assertEquals(1, version.majorVersion());
Assertions.assertEquals(2, version.minorVersion());
Assertions.assertEquals(3, version.incrementalVersion());
Assertions.assertEquals(3, version.patchVersion());
Assertions.assertEquals("RC0-SNAPSHOT", version.qualifier().toString());
}
@@ -76,13 +85,13 @@ class VersionTest {
}
@Test
void shouldGetLatestVersionGivenMajorMinorIncrementalVersions() {
void shouldGetLatestVersionGivenMajorMinorPatchVersions() {
Version result = Version.getLatest(Version.of("1.0.9"), Version.of("1.0.10"), Version.of("1.0.11"));
Assertions.assertEquals(Version.of("1.0.11"), result);
}
@Test
public void shouldGetOldestVersionGivenMajorMinorIncrementalVersions() {
public void shouldGetOldestVersionGivenMajorMinorPatchVersions() {
Version result = Version.getOldest(Version.of("1.0.9"), Version.of("1.0.10"), Version.of("1.0.11"));
Assertions.assertEquals(Version.of("1.0.9"), result);
}
@@ -135,14 +144,50 @@ class VersionTest {
}
@Test
public void shouldGetStableVersionGivenMajorMinorVersions() {
Version result = Version.getStable(Version.of("1.2.0"), List.of(Version.of("1.2.1"), Version.of("1.2.2"), Version.of("0.99.0")));
Assertions.assertEquals(Version.of("1.2.2"), result);
public void shouldGetStableVersionGivenMajorMinorPatchVersion() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("1.2.1"), versions)).isEqualTo(Version.of("1.2.1"));
assertThatObject(Version.getStable(Version.of("1.2.0"), versions)).isNull();
assertThatObject(Version.getStable(Version.of("1.2.4"), versions)).isNull();
}
@Test
public void shouldGetStableGivenMajorAndMinorVersionOnly() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("1.2"), versions)).isEqualTo(Version.of("1.2.3"));
}
@Test
public void shouldGetStableGivenMajorVersionOnly() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("1"), versions)).isEqualTo(Version.of("1.2.3"));
}
@Test
public void shouldGetNullForStableVersionGivenNoCompatibleVersions() {
Version result = Version.getStable(Version.of("1.2.0"), List.of(Version.of("1.3.0"), Version.of("2.0.0"), Version.of("0.99.0")));
Assertions.assertNull(result);
public void shouldGetNullForStableGivenMajorAndMinorVersionOnly() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("2.0"), versions)).isNull();
assertThatObject(Version.getStable(Version.of("0.1"), versions)).isNull();
}
@Test
public void shouldGetNullForStableGivenMajorVersionOnly() {
// Given
List<Version> versions = List.of(Version.of("1.2.1"), Version.of("1.2.3"), Version.of("0.99.0"));
// When - Then
assertThatObject(Version.getStable(Version.of("2"), versions)).isNull();
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.RunnerUtils;
@@ -11,6 +12,7 @@ import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@@ -28,11 +30,15 @@ class IfTest {
void ifTruthy() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120));
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("if").getFirst().getAttempts();
assertThat(execution.getTaskRunList()).hasSize(2);
assertThat(execution.findTaskRunsByTaskId("when-true").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(flowableAttempts).isNotNull();
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", "true") , Duration.ofSeconds(120));

View File

@@ -5,22 +5,32 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import java.util.List;
@KestraTest(startRunner = true)
class SequentialTest {
@Test
@ExecuteFlow("flows/valids/sequential.yaml")
void sequential(Execution execution) {
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("1-seq").getFirst().getAttempts();
assertThat(execution.getTaskRunList()).hasSize(11);
assertThat(flowableAttempts).isNotNull();
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
@Test
@ExecuteFlow("flows/valids/sequential-with-global-errors.yaml")
void sequentialWithGlobalErrors(Execution execution) {
List<TaskRunAttempt> flowableAttempts=execution.findTaskRunsByTaskId("parent-seq").getFirst().getAttempts();
assertThat(execution.getTaskRunList()).hasSize(6);
assertThat(flowableAttempts).isNotNull();
assertThat(flowableAttempts.getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
}

View File

@@ -156,6 +156,26 @@ class DownloadTest {
assertThat(output.getUri().toString()).endsWith("filename.jpg");
}
@Test
void fileNameShouldOverrideContentDisposition() throws Exception {
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
embeddedServer.start();
Download task = Download.builder()
.id(DownloadTest.class.getSimpleName())
.type(DownloadTest.class.getName())
.uri(Property.ofValue(embeddedServer.getURI() + "/content-disposition"))
.saveAs(Property.ofValue("hardcoded-filename.jpg"))
.build();
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
Download.Output output = task.run(runContext);
assertThat(output.getUri().toString()).endsWith("hardcoded-filename.jpg");
}
@Test
void contentDispositionWithPath() throws Exception {
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);

View File

@@ -247,7 +247,7 @@ public class ExecutorService {
// first find the normal ended child tasks and send result
Optional<State.Type> state;
try {
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
} catch (Exception e) {
// This will lead to the next task being still executed, but at least Kestra will not crash.
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
@@ -268,9 +268,17 @@ public class ExecutorService {
Output outputs = flowableParent.outputs(runContext);
Map<String, Object> outputMap = MapUtils.merge(workerTaskResult.getTaskRun().getOutputs(), outputs == null ? null : outputs.toMap());
Variables variables = variablesService.of(StorageContext.forTask(workerTaskResult.getTaskRun()), outputMap);
/// flowable attempt state transition to terminated
List<TaskRunAttempt> attempts = Optional.ofNullable(parentTaskRun.getAttempts())
.map(ArrayList::new)
.orElseGet(ArrayList::new);
State.Type endedState=endedTask.get().getTaskRun().getState().getCurrent();
TaskRunAttempt updated = attempts.getLast().withState(endedState);
attempts.set( attempts.size() - 1, updated);
return Optional.of(new WorkerTaskResult(workerTaskResult
.getTaskRun()
.withOutputs(variables)
.withAttempts(attempts)
));
} catch (Exception e) {
runContext.logger().error("Unable to resolve outputs from the Flowable task: {}", e.getMessage(), e);
@@ -320,7 +328,6 @@ public class ExecutorService {
private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun) throws InternalException {
Task parent = executor.getFlow().findTaskByTaskId(parentTaskRun.getTaskId());
if (parent instanceof FlowableTask<?> flowableParent) {
// Count the number of flowable tasks executions, some flowable are being called multiple times,
// so this is not exactly the number of flowable taskruns but the number of times they are executed.
@@ -375,6 +382,7 @@ public class ExecutorService {
Output outputs = flowableTask.outputs(runContext);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
taskRun = taskRun.withOutputs(variables);
} catch (Exception e) {
runContext.logger().warn("Unable to save output on taskRun '{}'", taskRun, e);
}

View File

@@ -50,16 +50,147 @@ public class FlowTriggerService {
.map(io.kestra.plugin.core.trigger.Flow.class::cast);
}
public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<? extends Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
/**
* This method computes executions to trigger from flow triggers from a given execution.
* It only computes those depending on standard (non-multiple / non-preconditions) conditions, so it must be used
* in conjunction with {@link #computeExecutionsFromFlowTriggerPreconditions(Execution, Flow, MultipleConditionStorageInterface)}.
*/
public List<Execution> computeExecutionsFromFlowTriggerConditions(Execution execution, Flow flow) {
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
.stream()
// we must filter on no multiple conditions and no preconditions to avoid evaluating two times triggers that have standard conditions and multiple conditions
.filter(it -> it.getTrigger().getPreconditions() == null && ListUtils.emptyOnNull(it.getTrigger().getConditions()).stream().noneMatch(MultipleCondition.class::isInstance))
.toList();
// short-circuit empty triggers to evaluate
if (flowWithFlowTriggers.isEmpty()) {
return Collections.emptyList();
}
// compute all executions to create from flow triggers without taken into account multiple conditions
return flowWithFlowTriggers.stream()
.map(f -> f.getTrigger().evaluate(
Optional.empty(),
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
}
/**
* This method computes executions to trigger from flow triggers from a given execution.
* It only computes those depending on multiple conditions and preconditions, so it must be used
* in conjunction with {@link #computeExecutionsFromFlowTriggerConditions(Execution, Flow)}.
*/
public List<Execution> computeExecutionsFromFlowTriggerPreconditions(Execution execution, Flow flow, MultipleConditionStorageInterface multipleConditionStorage) {
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
.stream()
// we must filter on multiple conditions or preconditions to avoid evaluating two times triggers that only have standard conditions
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getPreconditions() != null || ListUtils.emptyOnNull(flowWithFlowTrigger.getTrigger().getConditions()).stream().anyMatch(MultipleCondition.class::isInstance))
.toList();
// short-circuit empty triggers to evaluate
if (flowWithFlowTriggers.isEmpty()) {
return Collections.emptyList();
}
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = flowWithFlowTriggers.stream()
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
)
// avoid evaluating expired windows (for ex for daily time window or deadline)
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
.toList();
// evaluate multiple conditions
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// persist results
multipleConditionStorage.save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = flowWithFlowTriggers.stream()
// will evaluate conditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage
)
)
// will evaluate preconditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger().getPreconditions(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage
)
)
.map(f -> f.getTrigger().evaluate(
Optional.of(multipleConditionStorage),
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.entrySet().stream()
.map(e -> Map.entry(
e.getKey().getMultipleCondition(),
e.getValue()
))
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
)
.map(Map.Entry::getValue),
multipleConditionStorage.expired(execution.getTenantId()).stream()
).forEach(multipleConditionStorage::delete);
return executions;
}
private List<FlowWithFlowTrigger> computeFlowTriggers(Execution execution, Flow flow) {
if (
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// filter out Test Executions
.filter(flow -> execution.getKind() == null)
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
!flowService.removeUnwanted(flow, execution) ||
// filter out Test Executions
execution.getKind() != null ||
// ensure flow & triggers are enabled
flow.isDisabled() || flow instanceof FlowWithException ||
flow.getTriggers() == null || flow.getTriggers().isEmpty()) {
return Collections.emptyList();
}
return flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))
// filter on the execution state the flow listen to
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getStates().contains(execution.getState().getCurrent()))
// validate flow triggers conditions excluding multiple conditions
@@ -74,96 +205,6 @@ public class FlowTriggerService {
execution
)
)).toList();
// short-circuit empty triggers to evaluate
if (validTriggersBeforeMultipleConditionEval.isEmpty()) {
return Collections.emptyList();
}
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = null;
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
)
// avoid evaluating expired windows (for ex for daily time window or deadline)
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
.toList();
// evaluate multiple conditions
multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// persist results
multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
}
// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = validTriggersBeforeMultipleConditionEval.stream()
// will evaluate conditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
)
// will evaluate preconditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger().getPreconditions(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
)
.map(f -> f.getTrigger().evaluate(
multipleConditionStorage,
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
if (multipleConditionStorage.isPresent()) {
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.entrySet().stream()
.map(e -> Map.entry(
e.getKey().getMultipleCondition(),
e.getValue()
))
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
)
.map(Map.Entry::getValue),
multipleConditionStorage.get().expired(execution.getTenantId()).stream()
).forEach(multipleConditionStorage.get()::delete);
}
return executions;
}
private Stream<MultipleCondition> flowTriggerMultipleConditions(FlowWithFlowTrigger flowWithFlowTrigger) {

View File

@@ -26,8 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class FlowTriggerServiceTest {
public static final List<Label> EMPTY_LABELS = List.of();
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
@Inject
private TestRunContextFactory runContextFactory;
@Inject
@@ -56,10 +55,9 @@ class FlowTriggerServiceTest {
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
flowWithFlowTrigger
);
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
@@ -81,10 +79,9 @@ class FlowTriggerServiceTest {
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
flowWithFlowTrigger
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
@@ -109,10 +106,9 @@ class FlowTriggerServiceTest {
.kind(ExecutionKind.TEST)
.build();
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecutionComingFromATest,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
flowWithFlowTrigger
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);

View File

@@ -1,4 +1,4 @@
version=1.0.8
version=1.0.13
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
org.gradle.parallel=true

View File

@@ -1,8 +1,10 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.ScheduleContextInterface;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.jdbc.runner.JdbcSchedulerContext;
import io.kestra.jdbc.services.JdbcFilterService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -11,6 +13,10 @@ import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.Date;
import java.util.List;
@@ -45,4 +51,11 @@ public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
}
}
@Override
protected Temporal toNextExecutionTime(ZonedDateTime now) {
// next_execution_date in the table is stored in UTC
// convert 'now' to UTC LocalDateTime to avoid any timezone/offset interpretation by the database.
return now.withZoneSameInstant(ZoneOffset.UTC).toLocalDateTime();
}
}

View File

@@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -151,7 +152,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
(field("next_execution_date").lessThan(now.toOffsetDateTime())
(field("next_execution_date").lessThan(toNextExecutionTime(now))
// we check for null for backwards compatibility
.or(field("next_execution_date").isNull()))
.and(field("execution_id").isNull())
@@ -162,14 +163,14 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
.fetch()
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class)));
}
public List<Trigger> findByNextExecutionDateReadyButLockedTriggers(ZonedDateTime now) {
return this.jdbcRepository.getDslContextWrapper()
.transactionResult(configuration -> DSL.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
(field("next_execution_date").lessThan(now.toOffsetDateTime())
(field("next_execution_date").lessThan(toNextExecutionTime(now))
// we check for null for backwards compatibility
.or(field("next_execution_date").isNull()))
.and(field("execution_id").isNotNull())
@@ -178,6 +179,10 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
.fetch()
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class))));
}
protected Temporal toNextExecutionTime(ZonedDateTime now) {
return now.toOffsetDateTime();
}
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;

View File

@@ -24,10 +24,10 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
}
/**
* Fetch the concurrency limit counter then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
* Fetch the concurrency limit counter, then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction, so the consumer should use the provided dslContext for any database access.
* <p>
* Note that to avoid a race when no concurrency limit counter exists, it first always try to insert a 0 counter.
* Note that to avoid a race when no concurrency limit counter exists, it first always tries to insert a 0 counter.
*/
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, ConcurrencyLimit, Pair<ExecutionRunning, ConcurrencyLimit>> consumer) {
return this.jdbcRepository
@@ -106,8 +106,7 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
.and(field("namespace").eq(flow.getNamespace()))
.and(field("flow_id").eq(flow.getId()));
return Optional.ofNullable(select.forUpdate().fetchOne())
.map(record -> this.jdbcRepository.map(record));
return this.jdbcRepository.fetchOne(select.forUpdate());
}
private void save(DSLContext dslContext, ConcurrencyLimit concurrencyLimit) {

View File

@@ -12,7 +12,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRepository {
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutionQueued> jdbcRepository;
@@ -70,18 +69,12 @@ public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRep
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
var select = DSL
.using(configuration)
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(buildTenantCondition(execution.getTenantId()))
.and(field("key").eq(IdUtils.fromParts(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
.forUpdate();
Optional<ExecutionQueued> maybeExecution = this.jdbcRepository.fetchOne(select);
if (maybeExecution.isPresent()) {
this.jdbcRepository.delete(maybeExecution.get());
}
DSL
.using(configuration)
.deleteFrom(this.jdbcRepository.getTable())
.where(buildTenantCondition(execution.getTenantId()))
.and(field("key").eq(IdUtils.fromParts(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
.execute();
});
}
}

View File

@@ -423,7 +423,7 @@ public class JdbcExecutor implements ExecutorInterface {
MultipleConditionEvent multipleConditionEvent = either.getLeft();
flowTriggerService.computeExecutionsFromFlowTriggers(multipleConditionEvent.execution(), List.of(multipleConditionEvent.flow()), Optional.of(multipleConditionStorage))
flowTriggerService.computeExecutionsFromFlowTriggerPreconditions(multipleConditionEvent.execution(), multipleConditionEvent.flow(), multipleConditionStorage)
.forEach(exec -> {
try {
executionQueue.emit(exec);
@@ -650,8 +650,24 @@ public class JdbcExecutor implements ExecutorInterface {
.orElse(null);
workerJobQueue.emit(workerGroupKey, workerTask);
}
/// flowable attempt state transition to running
if (workerTask.getTask().isFlowable()) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
List<TaskRunAttempt> attempts = Optional.ofNullable(workerTask.getTaskRun().getAttempts())
.map(ArrayList::new)
.orElseGet(ArrayList::new);
attempts.add(
TaskRunAttempt.builder()
.state(new State().withState(State.Type.RUNNING))
.build()
);
TaskRun updatedTaskRun = workerTask.getTaskRun()
.withAttempts(attempts)
.withState(State.Type.RUNNING);
workerTaskResults.add(new WorkerTaskResult(updatedTaskRun));
}
}
} catch (Exception e) {
@@ -1196,8 +1212,10 @@ public class JdbcExecutor implements ExecutorInterface {
private void processFlowTriggers(Execution execution) throws QueueException {
// directly process simple conditions
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
.filter(f ->ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f.getFlow()), Optional.empty()).stream())
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.map(f -> f.getFlow())
.distinct() // as computeExecutionsFromFlowTriggers is based on flow, we must map FlowWithFlowTrigger to a flow and distinct to avoid multiple execution for the same flow
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggerConditions(execution, f).stream())
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
// send multiple conditions to the multiple condition queue for later processing

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.ScheduleContextInterface;
import io.kestra.core.runners.Scheduler;
import io.kestra.core.runners.SchedulerTriggerStateInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.FlowService;
@@ -56,6 +57,9 @@ public class JdbcScheduler extends AbstractScheduler {
.forEach(abstractTrigger -> triggerRepository.delete(Trigger.of(flow, abstractTrigger)));
}
});
// No-op consumption of the trigger queue, so the events are purged from the queue
this.triggerQueue.receive(Scheduler.class, trigger -> { });
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.jdbc.runner;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
@@ -138,6 +139,7 @@ public abstract class JdbcRunnerRetryTest {
retryCaseTest.retryDynamicTask(execution);
}
@FlakyTest(description = "it seems this flow sometimes stay stuck in RUNNING")
@Test
@ExecuteFlow("flows/valids/retry-with-flowable-errors.yaml")
void retryWithFlowableErrors(Execution execution){

View File

@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Singleton
public class TestRunner implements Runnable, AutoCloseable {
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors()) * 16;
@Setter private boolean schedulerEnabled = true;
@Setter private boolean workerEnabled = true;

View File

@@ -272,6 +272,7 @@
import Id from "../Id.vue";
import SelectTableActions from "../../mixins/selectTableActions";
import _merge from "lodash/merge";
import moment from "moment";
import LogsWrapper from "../logs/LogsWrapper.vue";
import KestraFilter from "../filter/KestraFilter.vue"
import {mapStores} from "pinia";
@@ -500,6 +501,15 @@
loadQuery(base) {
let queryFilter = this.queryWithFilter();
const timeRange = queryFilter["filters[timeRange][EQUALS]"];
if (timeRange) {
const end = new Date();
const start = new Date(end.getTime() - moment.duration(timeRange).asMilliseconds());
queryFilter["filters[startDate][GREATER_THAN_OR_EQUAL_TO]"] = start.toISOString();
queryFilter["filters[endDate][LESS_THAN_OR_EQUAL_TO]"] = end.toISOString();
delete queryFilter["filters[timeRange][EQUALS]"];
}
return _merge(base, queryFilter)
},
},

View File

@@ -56,7 +56,7 @@
this.follow();
window.addEventListener("popstate", this.follow)
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId})).count;
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId}, true)).count;
},
mounted() {
this.previousExecutionId = this.$route.params.id

View File

@@ -528,27 +528,25 @@
}
.content-container {
height: calc(100vh - 0px);
overflow-y: auto !important;
overflow-y: scroll;
overflow-x: hidden;
scrollbar-gutter: stable;
word-wrap: break-word;
word-break: break-word;
}
:deep(.el-collapse) {
.el-collapse-item__wrap {
overflow-y: auto !important;
max-height: none !important;
}
.el-collapse-item__content {
overflow-y: auto !important;
word-wrap: break-word;
word-break: break-word;
}
}
:deep(.var-value) {
overflow-y: auto !important;
word-wrap: break-word;
word-break: break-word;
}

View File

@@ -247,6 +247,7 @@
const suggestWidgetResizeObserver = ref<MutationObserver>()
const suggestWidgetObserver = ref<MutationObserver>()
const suggestWidget = ref<HTMLElement>()
const resizeObserver = ref<ResizeObserver>()
@@ -796,6 +797,20 @@
setTimeout(() => monaco.editor.remeasureFonts(), 1)
emit("editorDidMount", editorResolved.value);
/* Hhandle resizing. */
resizeObserver.value = new ResizeObserver(() => {
if (localEditor.value) {
localEditor.value.layout();
}
if (localDiffEditor.value) {
localDiffEditor.value.getModifiedEditor().layout();
localDiffEditor.value.getOriginalEditor().layout();
}
});
if (editorRef.value) {
resizeObserver.value.observe(editorRef.value);
}
highlightLine();
}
@@ -853,6 +868,8 @@
function destroy() {
disposeObservers();
disposeCompletions.value?.();
resizeObserver.value?.disconnect();
resizeObserver.value = undefined;
if (localDiffEditor.value !== undefined) {
localDiffEditor.value?.dispose();
localDiffEditor.value?.getModel()?.modified?.dispose();

View File

@@ -475,7 +475,7 @@
return this.namespacesStore
.createKv({
...this.kv,
contentType: ["DATE", "DATETIME"].includes(type) ? "text/plain" : "application/json",
contentType: "text/plain",
value
})
.then(() => {

View File

@@ -100,7 +100,7 @@
>
<NamespaceSelect
v-model="secret.namespace"
:readonly="secret.update"
:read-only="secret.update"
:include-system-namespace="true"
all
/>

View File

@@ -506,7 +506,7 @@ export const useFlowStore = defineStore("flow", () => {
}
function loadDependencies(options: { namespace: string, id: string, subtype: "FLOW" | "EXECUTION" }, onlyCount = false) {
return store.$http.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies?expandAll=true`).then(response => {
return store.$http.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies?expandAll=${onlyCount ? false : true}`).then(response => {
return {
...(!onlyCount ? {data: transformResponse(response.data, options.subtype)} : {}),
count: response.data.nodes ? [...new Set(response.data.nodes.map((r:{uid:string}) => r.uid))].length : 0

View File

@@ -107,25 +107,11 @@
:deep(.alert-info) {
display: flex;
gap: 12px;
padding: 16px 16px 0 16px;
padding: .5rem !important;
background-color: var(--ks-background-info);
border: 1px solid var(--ks-border-info);
border-left-width: 5px;
border-radius: 8px;
&::before {
content: '!';
min-width: 20px;
height: 20px;
margin-top: 4px;
border-radius: 50%;
background: var(--ks-content-info);
border: 1px solid var(--ks-border-info);
color: var(--ks-content-inverse);
font: 600 13px/20px sans-serif;
text-align: center;
}
border-left-width: 0.25rem;
border-radius: 0.5rem;
p { color: var(--ks-content-info); }
}
@@ -135,7 +121,7 @@
color: var(--ks-content-info);
border: 1px solid var(--ks-border-info);
font-family: 'Courier New', Courier, monospace;
white-space: nowrap; // Prevent button text from wrapping
white-space: nowrap;
.material-design-icon {
position: absolute;

View File

@@ -870,19 +870,10 @@
"adding": "+ {what} hinzufügen",
"adding_default": "+ Neuen Wert hinzufügen",
"clearSelection": "Auswahl aufheben",
"close": {
"afterExecution": "Nach Ausführung Task schließen",
"conditions": "Bedingung schließen",
"errors": "Fehlerbehandler schließen",
"finally": "Task schließen",
"input": "Eingabe schließen",
"pluginDefaults": "Plugin-Standard schließen",
"tasks": "Task schließen",
"triggers": "Trigger schließen"
},
"creation": {
"afterExecution": "Fügen Sie einen Block nach der Ausführung hinzu",
"conditions": "Bedingung hinzufügen",
"default": "Hinzufügen",
"errors": "Einen Fehler-Handler hinzufügen",
"finally": "Fügen Sie einen Finally-Block hinzu",
"inputs": "Ein Eingabefeld hinzufügen",
@@ -916,6 +907,10 @@
"variable": "Variable",
"yaml": "YAML-Editor"
},
"remove": {
"cases": "Diesen Fall entfernen",
"default": "Diesen Eintrag entfernen"
},
"sections": {
"afterExecution": "Nach Ausführung",
"connection": "Verbindungseigenschaften",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Wählen Sie eine Task aus",
"conditions": "Wählen Sie eine Bedingung aus",
"default": "Wählen Sie einen Typ aus",
"errors": "Wählen Sie eine Task aus",
"finally": "Wählen Sie eine Task aus",
"inputs": "Wählen Sie einen Input-Feldtyp aus",

View File

@@ -870,19 +870,10 @@
"adding": "+ Agregar un {what}",
"adding_default": "+ Añadir un nuevo value",
"clearSelection": "Borrar selección",
"close": {
"afterExecution": "Cerrar después de la ejecución de la task",
"conditions": "Condición de cierre",
"errors": "Cerrar el manejador de errores",
"finally": "Cerrar task",
"input": "Cerrar input",
"pluginDefaults": "Cerrar plugin predeterminado",
"tasks": "Cerrar task",
"triggers": "Cerrar trigger"
},
"creation": {
"afterExecution": "Agregar un bloque después de la ejecución",
"conditions": "Agregar una condición",
"default": "Agregar",
"errors": "Agregar un manejador de errores",
"finally": "Agregar un bloque finally",
"inputs": "Agregar un campo de input",
@@ -916,6 +907,10 @@
"variable": "Variable",
"yaml": "Editor YAML"
},
"remove": {
"cases": "Eliminar este caso",
"default": "Eliminar esta entrada"
},
"sections": {
"afterExecution": "Después de la Ejecución",
"connection": "Propiedades de conexión",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Seleccionar una task",
"conditions": "Seleccione una condición",
"default": "Seleccionar un tipo",
"errors": "Selecciona una task",
"finally": "Seleccionar una task",
"inputs": "Seleccione un tipo de campo de input",

View File

@@ -870,19 +870,10 @@
"adding": "+ Ajouter un {what}",
"adding_default": "+ Ajouter une nouvelle valeur",
"clearSelection": "Effacer la sélection",
"close": {
"afterExecution": "Fermer après l'exécution de la task",
"conditions": "Condition de fermeture",
"errors": "Fermer le gestionnaire d'erreurs",
"finally": "Fermer la task",
"input": "Fermer l'input",
"pluginDefaults": "Fermer le plugin par défaut",
"tasks": "Fermer la task",
"triggers": "Fermer le trigger"
},
"creation": {
"afterExecution": "Ajouter un bloc après l'exécution",
"conditions": "Ajouter une condition",
"default": "Ajouter",
"errors": "Ajouter un gestionnaire d'erreurs",
"finally": "Ajouter un bloc finally",
"inputs": "Ajouter un champ d'input",
@@ -916,6 +907,10 @@
"variable": "Variable",
"yaml": "Éditeur YAML"
},
"remove": {
"cases": "Supprimer ce cas",
"default": "Supprimer cette entrée"
},
"sections": {
"afterExecution": "Après l'Exécution",
"connection": "Propriétés de connexion",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Sélectionnez une task",
"conditions": "Sélectionner une condition",
"default": "Sélectionner un type",
"errors": "Sélectionner une task",
"finally": "Sélectionnez une task",
"inputs": "Sélectionnez un type de champ input",

View File

@@ -870,19 +870,10 @@
"adding": "+ एक {what} जोड़ें",
"adding_default": "+ एक नया value जोड़ें",
"clearSelection": "चयन साफ़ करें",
"close": {
"afterExecution": "कार्य पूरा होने के बाद बंद करें",
"conditions": "बंद करने की शर्त",
"errors": "त्रुटि हैंडलर बंद करें",
"finally": "टास्क बंद करें",
"input": "इनपुट बंद करें",
"pluginDefaults": "प्लगइन डिफ़ॉल्ट बंद करें",
"tasks": "टास्क बंद करें",
"triggers": "ट्रिगर बंद करें"
},
"creation": {
"afterExecution": "एक निष्पादन के बाद ब्लॉक जोड़ें",
"conditions": "एक शर्त जोड़ें",
"default": "जोड़ें",
"errors": "त्रुटि हैंडलर जोड़ें",
"finally": "अंत में एक finally ब्लॉक जोड़ें",
"inputs": "इनपुट फ़ील्ड जोड़ें",
@@ -916,6 +907,10 @@
"variable": "वेरिएबल",
"yaml": "YAML संपादक"
},
"remove": {
"cases": "इस केस को हटाएं",
"default": "इस प्रविष्टि को हटाएं"
},
"sections": {
"afterExecution": "Execution के बाद",
"connection": "कनेक्शन गुणधर्म",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "एक task चुनें",
"conditions": "कंडीशन चुनें",
"default": "एक प्रकार चुनें",
"errors": "एक task चुनें",
"finally": "एक task चुनें",
"inputs": "इनपुट फ़ील्ड प्रकार चुनें",

View File

@@ -870,19 +870,10 @@
"adding": "+ Aggiungi un {what}",
"adding_default": "+ Aggiungi un nuovo value",
"clearSelection": "Cancella selezione",
"close": {
"afterExecution": "Chiudi dopo l'esecuzione del task",
"conditions": "Condizione di chiusura",
"errors": "Chiudi error handler",
"finally": "Chiudi task",
"input": "Chiudi input",
"pluginDefaults": "Chiudi plugin predefinito",
"tasks": "Chiudi task",
"triggers": "Chiudi trigger"
},
"creation": {
"afterExecution": "Aggiungi un blocco after execution",
"conditions": "Aggiungi una condizione",
"default": "Aggiungi",
"errors": "Aggiungi un gestore degli errori",
"finally": "Aggiungi un blocco finally",
"inputs": "Aggiungi un campo di input",
@@ -916,6 +907,10 @@
"variable": "Variabile",
"yaml": "Editor YAML"
},
"remove": {
"cases": "Rimuovi questo caso",
"default": "Rimuovi questa voce"
},
"sections": {
"afterExecution": "Dopo l'Esecuzione",
"connection": "Proprietà di connessione",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Seleziona un task",
"conditions": "Seleziona una condizione",
"default": "Seleziona un tipo",
"errors": "Seleziona un task",
"finally": "Seleziona un task",
"inputs": "Seleziona un tipo di input field",

View File

@@ -870,19 +870,10 @@
"adding": "+ {what}を追加",
"adding_default": "+ 新しいvalueを追加",
"clearSelection": "選択をクリア",
"close": {
"afterExecution": "実行後にタスクを閉じる",
"conditions": "クローズ条件",
"errors": "エラーハンドラーを閉じる",
"finally": "タスクを閉じる",
"input": "入力を閉じる",
"pluginDefaults": "プラグインのデフォルトを閉じる",
"tasks": "タスクを閉じる",
"triggers": "トリガーを閉じる"
},
"creation": {
"afterExecution": "実行後ブロックを追加",
"conditions": "条件を追加",
"default": "追加",
"errors": "エラーハンドラーを追加",
"finally": "finally ブロックを追加",
"inputs": "入力フィールドを追加",
@@ -916,6 +907,10 @@
"variable": "変数",
"yaml": "YAMLエディター"
},
"remove": {
"cases": "このケースを削除",
"default": "このエントリを削除"
},
"sections": {
"afterExecution": "実行後",
"connection": "接続プロパティ",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "タスクを選択",
"conditions": "条件を選択",
"default": "タイプを選択",
"errors": "タスクを選択",
"finally": "タスクを選択",
"inputs": "入力フィールドタイプを選択",

View File

@@ -870,19 +870,10 @@
"adding": "+ {what} 추가",
"adding_default": "+ 새 value 추가",
"clearSelection": "선택 해제",
"close": {
"afterExecution": "실행 task 후 닫기",
"conditions": "닫기 조건",
"errors": "오류 처리기 닫기",
"finally": "작업 닫기",
"input": "입력 닫기",
"pluginDefaults": "플러그인 기본값 닫기",
"tasks": "작업 닫기",
"triggers": "트리거 닫기"
},
"creation": {
"afterExecution": "실행 후 블록 추가",
"conditions": "조건 추가",
"default": "추가",
"errors": "오류 처리기 추가",
"finally": "마지막 블록 추가",
"inputs": "입력 필드 추가",
@@ -916,6 +907,10 @@
"variable": "변수",
"yaml": "YAML 편집기"
},
"remove": {
"cases": "이 케이스를 제거하십시오",
"default": "이 항목 제거"
},
"sections": {
"afterExecution": "실행 후",
"connection": "연결 속성",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "작업 선택",
"conditions": "조건 선택",
"default": "유형 선택",
"errors": "작업 선택",
"finally": "작업 선택",
"inputs": "입력 필드 유형 선택",

View File

@@ -870,19 +870,10 @@
"adding": "+ Dodaj {what}",
"adding_default": "+ Dodaj nową wartość",
"clearSelection": "Wyczyść zaznaczenie",
"close": {
"afterExecution": "Zamknij po wykonaniu task",
"conditions": "Zamknij warunek",
"errors": "Zamknij obsługę błędów",
"finally": "Zamknij task",
"input": "Zamknij input",
"pluginDefaults": "Zamknij domyślną wtyczkę",
"tasks": "Zamknij task",
"triggers": "Zamknij trigger"
},
"creation": {
"afterExecution": "Dodaj blok po wykonaniu",
"conditions": "Dodaj warunek",
"default": "Dodaj",
"errors": "Dodaj obsługę błędów",
"finally": "Dodaj blok finally",
"inputs": "Dodaj pole input",
@@ -916,6 +907,10 @@
"variable": "Zmienna",
"yaml": "Edytor YAML"
},
"remove": {
"cases": "Usuń ten przypadek",
"default": "Usuń ten wpis"
},
"sections": {
"afterExecution": "Po wykonaniu",
"connection": "Właściwości połączenia",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Wybierz task",
"conditions": "Wybierz warunek",
"default": "Wybierz typ",
"errors": "Wybierz task",
"finally": "Wybierz task",
"inputs": "Wybierz typ pola input",

View File

@@ -870,19 +870,10 @@
"adding": "+ Adicionar um {what}",
"adding_default": "+ Adicionar um novo value",
"clearSelection": "Limpar seleção",
"close": {
"afterExecution": "Fechar após execução da task",
"conditions": "Condição de fechamento",
"errors": "Fechar manipulador de erro",
"finally": "Fechar task",
"input": "Fechar input",
"pluginDefaults": "Fechar plugin padrão",
"tasks": "Fechar task",
"triggers": "Fechar trigger"
},
"creation": {
"afterExecution": "Adicionar um bloco após a execução",
"conditions": "Adicionar uma condição",
"default": "Adicionar",
"errors": "Adicionar um manipulador de erro",
"finally": "Adicionar um bloco finally",
"inputs": "Adicionar um campo de input",
@@ -916,6 +907,10 @@
"variable": "Variável",
"yaml": "Editor YAML"
},
"remove": {
"cases": "Remover este caso",
"default": "Remover esta entrada"
},
"sections": {
"afterExecution": "Após a Execução",
"connection": "Propriedades de conexão",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Selecione uma task",
"conditions": "Selecione uma condição",
"default": "Selecione um tipo",
"errors": "Selecione uma task",
"finally": "Selecione uma task",
"inputs": "Selecione um tipo de campo de input",

File diff suppressed because it is too large Load Diff

View File

@@ -870,19 +870,10 @@
"adding": "+ Добавить {what}",
"adding_default": "+ Добавить новое value",
"clearSelection": "Очистить выбор",
"close": {
"afterExecution": "Закрыть после выполнения task",
"conditions": "Закрыть условие",
"errors": "Закрыть обработчик ошибок",
"finally": "Закрыть task",
"input": "Закрыть input",
"pluginDefaults": "Закрыть плагин по умолчанию",
"tasks": "Закрыть task",
"triggers": "Закрыть trigger"
},
"creation": {
"afterExecution": "Добавить блок после выполнения",
"conditions": "Добавить условие",
"default": "Добавить",
"errors": "Добавить обработчик ошибок",
"finally": "Добавить блок finally",
"inputs": "Добавить поле input",
@@ -916,6 +907,10 @@
"variable": "Переменная",
"yaml": "Редактор YAML"
},
"remove": {
"cases": "Удалить этот кейс",
"default": "Удалить эту запись"
},
"sections": {
"afterExecution": "После выполнения",
"connection": "Свойства подключения",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "Выберите task",
"conditions": "Выберите условие",
"default": "Выберите тип",
"errors": "Выберите task",
"finally": "Выберите task",
"inputs": "Выберите тип поля input",

View File

@@ -870,19 +870,10 @@
"adding": "+ 添加{what}",
"adding_default": "+ 添加新value",
"clearSelection": "清除选择",
"close": {
"afterExecution": "执行任务后关闭",
"conditions": "关闭条件",
"errors": "关闭错误处理程序",
"finally": "关闭 task",
"input": "关闭输入",
"pluginDefaults": "关闭插件默认",
"tasks": "关闭task",
"triggers": "关闭 trigger"
},
"creation": {
"afterExecution": "添加执行后块",
"conditions": "添加条件",
"default": "添加",
"errors": "添加错误处理程序",
"finally": "添加一个finally块",
"inputs": "添加一个input字段",
@@ -916,6 +907,10 @@
"variable": "变量",
"yaml": "YAML编辑器"
},
"remove": {
"cases": "删除此案例",
"default": "删除此条目"
},
"sections": {
"afterExecution": "执行后",
"connection": "连接属性",
@@ -932,6 +927,7 @@
"select": {
"afterExecution": "选择一个task",
"conditions": "选择条件",
"default": "选择类型",
"errors": "选择一个task",
"finally": "选择一个task",
"inputs": "选择一个input字段类型",

View File

@@ -71,6 +71,9 @@ import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.extensions.Extension;
import io.swagger.v3.oas.annotations.extensions.ExtensionProperty;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.ExampleObject;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -121,7 +124,7 @@ public class ExecutionController {
@Nullable
@Value("${micronaut.server.context-path}")
protected String basePath;
@Inject
private FlowRepositoryInterface flowRepository;
@@ -168,7 +171,7 @@ public class ExecutionController {
@Inject
private ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
@Inject
private RunContextFactory runContextFactory;
@@ -186,7 +189,7 @@ public class ExecutionController {
@Inject
private Optional<OpenTelemetry> openTelemetry;
@Inject
private ExecutionStreamingService executionStreamingService;
@@ -206,7 +209,7 @@ public class ExecutionController {
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
//Deprecated params
@Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@@ -355,9 +358,9 @@ public class ExecutionController {
@ApiResponse(responseCode = "204", description = "On success")
public HttpResponse<Void> deleteExecution(
@Parameter(description = "The execution id") @PathVariable String executionId,
@Parameter(description = "Whether to delete execution logs") @QueryValue(defaultValue = "true") Boolean deleteLogs,
@Parameter(description = "Whether to delete execution metrics") @QueryValue(defaultValue = "true") Boolean deleteMetrics,
@Parameter(description = "Whether to delete execution files in the internal storage") @QueryValue(defaultValue = "true") Boolean deleteStorage
@Parameter(description = "Whether to delete execution logs", required = false) @QueryValue(defaultValue = "true") Boolean deleteLogs,
@Parameter(description = "Whether to delete execution metrics", required = false) @QueryValue(defaultValue = "true") Boolean deleteMetrics,
@Parameter(description = "Whether to delete execution files in the internal storage", required = false) @QueryValue(defaultValue = "true") Boolean deleteStorage
) throws IOException {
Optional<Execution> execution = executionRepository.findById(tenantService.resolveTenant(), executionId);
if (execution.isPresent()) {
@@ -376,9 +379,9 @@ public class ExecutionController {
public MutableHttpResponse<?> deleteExecutionsByIds(
@RequestBody(description = "The execution id") @Body List<String> executionsId,
@Parameter(description = "Whether to delete non-terminated executions") @Nullable @QueryValue(defaultValue = "false") Boolean includeNonTerminated,
@Parameter(description = "Whether to delete execution logs") @QueryValue(defaultValue = "true") Boolean deleteLogs,
@Parameter(description = "Whether to delete execution metrics") @QueryValue(defaultValue = "true") Boolean deleteMetrics,
@Parameter(description = "Whether to delete execution files in the internal storage") @QueryValue(defaultValue = "true") Boolean deleteStorage
@Parameter(description = "Whether to delete execution logs", required = false) @QueryValue(defaultValue = "true") Boolean deleteLogs,
@Parameter(description = "Whether to delete execution metrics", required = false) @QueryValue(defaultValue = "true") Boolean deleteMetrics,
@Parameter(description = "Whether to delete execution files in the internal storage", required = false) @QueryValue(defaultValue = "true") Boolean deleteStorage
) throws IOException {
List<Execution> executions = new ArrayList<>();
Set<ManualConstraintViolation<String>> invalids = new HashSet<>();
@@ -417,27 +420,27 @@ public class ExecutionController {
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Executions"}, summary = "Delete executions filter by query parameters")
public HttpResponse<?> deleteExecutionsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@Parameter(description = "Whether to delete non-terminated executions") @Nullable @QueryValue(defaultValue = "false") Boolean includeNonTerminated,
@Parameter(description = "Whether to delete execution logs") @QueryValue(defaultValue = "true") Boolean deleteLogs,
@Parameter(description = "Whether to delete execution metrics") @QueryValue(defaultValue = "true") Boolean deleteMetrics,
@Parameter(description = "Whether to delete execution files in the internal storage") @QueryValue(defaultValue = "true") Boolean deleteStorage
@Parameter(description = "Whether to delete execution logs", required = false) @QueryValue(defaultValue = "true") Boolean deleteLogs,
@Parameter(description = "Whether to delete execution metrics", required = false) @QueryValue(defaultValue = "true") Boolean deleteMetrics,
@Parameter(description = "Whether to delete execution files in the internal storage", required = false) @QueryValue(defaultValue = "true") Boolean deleteStorage
) throws IOException {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
@@ -666,7 +669,16 @@ public class ExecutionController {
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "/{namespace}/{id}", consumes = MediaType.MULTIPART_FORM_DATA)
@Operation(tags = {"Executions"}, summary = "Create a new execution for a flow")
@Operation(
tags = {"Executions"},
summary = "Create a new execution for a flow",
extensions = @Extension(
name = "x-sdk-customization",
properties = {
@ExtensionProperty(name = "x-multipart", value = "true")
}
)
)
@ApiResponse(responseCode = "409", description = "if the flow is disabled")
@ApiResponse(responseCode = "200", description = "On execution created", content = {@Content(schema = @Schema(implementation = ExecutionResponse.class))})
@SingleResult
@@ -996,22 +1008,22 @@ public class ExecutionController {
@Post(uri = "/restart/by-query")
@Operation(tags = {"Executions"}, summary = "Restart executions filter by query parameters")
public HttpResponse<?> restartExecutionsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
) throws Exception {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
@@ -1056,13 +1068,32 @@ public class ExecutionController {
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "/{executionId}/replay-with-inputs", consumes = MediaType.MULTIPART_FORM_DATA)
@Operation(tags = {"Executions"}, summary = "Create a new execution from an old one and start it from a specified task run id")
@Operation(
tags = {"Executions"},
summary = "Create a new execution from an old one and start it from a specified task run id",
extensions = @Extension(
name = "x-sdk-customization",
properties = {
@ExtensionProperty(name = "x-multipart", value = "true")
}
)
)
public Mono<Execution> replayExecutionWithinputs(
@Parameter(description = "the original execution id to clone") @PathVariable String executionId,
@Parameter(description = "The taskrun id") @Nullable @QueryValue String taskRunId,
@Parameter(description = "The flow revision to use for new execution") @Nullable @QueryValue Integer revision,
@Parameter(description = "Set a list of breakpoints at specific tasks 'id.value', separated by a coma.") @QueryValue Optional<String> breakpoints,
@RequestBody(description = "The inputs") @Body MultipartBody inputs
@RequestBody(
description = "The inputs (multipart map)",
content = @Content(
mediaType = MediaType.MULTIPART_FORM_DATA,
schema = @Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.TRUE,
additionalPropertiesSchema = Object.class
)
)
) @Body MultipartBody inputs
) {
Optional<Execution> execution = executionRepository.findById(tenantService.resolveTenant(), executionId);
if (execution.isEmpty()) {
@@ -1239,22 +1270,22 @@ public class ExecutionController {
@ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = BulkResponse.class))})
@ApiResponse(responseCode = "422", description = "Changed state with errors", content = {@Content(schema = @Schema(implementation = BulkErrorResponse.class))})
public HttpResponse<?> updateExecutionsStatusByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@Parameter(description = "The new state of the executions") @NotNull @QueryValue State.Type newStatus
) throws QueueException {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
@@ -1302,7 +1333,7 @@ public class ExecutionController {
if (execution.getState().isTerminated() && !isOnKillCascade) {
throw new IllegalStateException("Execution is already finished, can't kill it");
}
eventPublisher.publishEvent(CrudEvent.of(execution, execution.withState(State.Type.KILLING)));
killQueue.emit(ExecutionKilledExecution
.builder()
@@ -1512,22 +1543,22 @@ public class ExecutionController {
@Post(uri = "/resume/by-query")
@Operation(tags = {"Executions"}, summary = "Resume executions filter by query parameters")
public HttpResponse<?> resumeExecutionsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
) throws Exception {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
@@ -1621,22 +1652,22 @@ public class ExecutionController {
@Post(uri = "/pause/by-query")
@Operation(tags = {"Executions"}, summary = "Pause executions filter by query parameters")
public HttpResponse<?> pauseExecutionsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
) throws Exception {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
@@ -1665,22 +1696,22 @@ public class ExecutionController {
@Delete(uri = "/kill/by-query")
@Operation(tags = {"Executions"}, summary = "Kill executions filter by query parameters")
public HttpResponse<?> killExecutionsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
) throws QueueException {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
@@ -1709,22 +1740,22 @@ public class ExecutionController {
@Post(uri = "/replay/by-query")
@Operation(tags = {"Executions"}, summary = "Create new executions from old ones filter by query parameters. Keep the flow revision")
public HttpResponse<?> replayExecutionsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@Parameter(description = "If latest revision should be used") @Nullable @QueryValue(defaultValue = "false") Boolean latestRevision
) throws Exception {
@@ -1800,7 +1831,17 @@ public class ExecutionController {
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/{executionId}/follow", produces = MediaType.TEXT_EVENT_STREAM)
@Operation(tags = {"Executions"}, summary = "Follow an execution")
@Operation(
tags = {"Executions"},
summary = "Follow an execution",
extensions = @Extension(
name = "x-sdk-customization",
properties = {
@ExtensionProperty(name = "x-replace-follow-execution", value = "true"),
@ExtensionProperty(name = "x-skipped", value = "true")
}
)
)
public Flux<Event<Execution>> followExecution(
@Parameter(description = "The execution id") @PathVariable String executionId
) {
@@ -2017,22 +2058,22 @@ public class ExecutionController {
@Post(uri = "/labels/by-query")
@Operation(tags = {"Executions"}, summary = "Set label on executions filter by query parameters")
public HttpResponse<?> setLabelsOnTerminatedExecutionsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@RequestBody(description = "The labels to add to the execution") @Body @NotNull @Valid List<Label> setLabels
) {
@@ -2134,22 +2175,22 @@ public class ExecutionController {
@Post(uri = "/unqueue/by-query")
@Operation(tags = {"Executions"}, summary = "Unqueue executions filter by query parameters")
public HttpResponse<?> unqueueExecutionsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter,
@Parameter(description = "The new state of the unqueued executions") @Nullable @QueryValue State.Type newState
) throws Exception {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
@@ -2248,22 +2289,22 @@ public class ExecutionController {
@Post(uri = "/force-run/by-query")
@Operation(tags = {"Executions"}, summary = "Force run executions filter by query parameters")
public HttpResponse<?> forceRunExecutionsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include") @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", examples = {
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the executions to include", deprecated = true) @Nullable @QueryValue(value = "scope") List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A flow id filter", deprecated = true) @Nullable @QueryValue String flowId,
@Deprecated @Parameter(description = "The start datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate,
@Deprecated @Parameter(description = "The end datetime", deprecated = true) @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate,
@Deprecated @Parameter(description = "A time range filter relative to the current time", deprecated = true, examples = {
@ExampleObject(name = "Filter last 5 minutes", value = "PT5M"),
@ExampleObject(name = "Filter last 24 hours", value = "P1D")
}) @Nullable @QueryValue Duration timeRange,
@Deprecated @Parameter(description = "A state filter") @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id") @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter") @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
@Deprecated @Parameter(description = "A state filter", deprecated = true) @Nullable @QueryValue List<State.Type> state,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels,
@Deprecated @Parameter(description = "The trigger execution id", deprecated = true) @Nullable @QueryValue String triggerExecutionId,
@Deprecated @Parameter(description = "A execution child filter", deprecated = true) @Nullable @QueryValue ExecutionRepositoryInterface.ChildFilter childFilter
) throws Exception {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
@@ -2338,7 +2379,17 @@ public class ExecutionController {
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/{executionId}/follow-dependencies", produces = MediaType.TEXT_EVENT_STREAM)
@Operation(tags = {"Executions"}, summary = "Follow all execution dependencies executions")
@Operation(
tags = {"Executions"},
summary = "Follow all execution dependencies executions",
extensions = @Extension(
name = "x-sdk-customization",
properties = {
@ExtensionProperty(name = "x-replace-follow-dependencies-execution", value = "true"),
@ExtensionProperty(name = "x-skipped", value = "true")
}
)
)
public Flux<Event<ExecutionStatusEvent>> followDependenciesExecutions(
@Parameter(description = "The execution id") @PathVariable String executionId,
@Parameter(description = "If true, list only destination dependencies, otherwise list also source dependencies") @QueryValue(defaultValue = "false") boolean destinationOnly,

View File

@@ -46,6 +46,7 @@ import io.micronaut.validation.Validated;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
@@ -163,7 +164,7 @@ public class FlowController {
@Get(uri = "{namespace}/{id}")
@Operation(tags = {"Flows"}, summary = "Get a flow")
@Schema(
oneOf = {FlowWithSource.class, Flow.class}
implementation = FlowWithSource.class
)
//FIXME we return Object instead of Flow as Micronaut, since 4, has an issue with subtypes serialization, see https://github.com/micronaut-projects/micronaut-core/issues/10294.
public Object getFlow(
@@ -222,7 +223,7 @@ public class FlowController {
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat() List<QueryFilter> filters,
// Deprecated params
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
@@ -277,7 +278,7 @@ public class FlowController {
*/
@ExecuteOn(TaskExecutors.IO)
@Post(consumes = MediaType.ALL)
@Operation(tags = {"Flows"}, summary = "Create a flow from json object", deprecated = true)
@Operation(tags = {"Flows"}, summary = "Create a flow from json object", deprecated = true, hidden = true)
@Deprecated(forRemoval = true, since = "0.18")
@Hidden // we hide it otherwise this is the one that will be included in the OpenAPI spec instead of the YAML one.
public HttpResponse<Flow> createFlowFromJson(
@@ -334,7 +335,8 @@ public class FlowController {
summary = "Update a complete namespace from json object",
description = "All flow will be created / updated for this namespace.\n" +
"Flow that already created but not in `flows` will be deleted if the query delete is `true`",
deprecated = true
deprecated = true,
hidden = true
)
@Deprecated(forRemoval = true, since = "0.18")
@Hidden // we hide it otherwise this is the one that will be included in the OpenAPI spec instead of the YAML one.
@@ -437,7 +439,7 @@ public class FlowController {
@Put(uri = "{namespace}/{id}", consumes = MediaType.APPLICATION_YAML)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Flows"}, summary = "Update a flow")
@Operation(tags = {"Flows"}, summary = "Update a flow")// force deprecated = false otherwise it is marked as deprecated, dont know why
@ApiResponse(responseCode = "200", description = "On success", content = {@Content(schema = @Schema(implementation = FlowWithSource.class))})
public HttpResponse<FlowWithSource> updateFlow(
@Parameter(description = "The flow namespace") @PathVariable String namespace,
@@ -476,9 +478,9 @@ public class FlowController {
/**
* @deprecated use {@link #updateFlow(String, String, String)} instead
*/
@Put(uri = "{namespace}/{id}", consumes = MediaType.ALL)
@Put(uri = "{namespace}/{id}", consumes = MediaType.APPLICATION_JSON)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Flows"}, summary = "Update a flow", deprecated = true)
@Operation(tags = {"Flows"}, operationId = "updateFlowFromJson", summary = "Update a flow", deprecated = true, hidden = true)
@Deprecated(forRemoval = true, since = "0.18")
@Hidden // we hide it otherwise this is the one that will be included in the OpenAPI spec instead of the JSON one.
public HttpResponse<Flow> updateFlowFromJson(
@@ -666,7 +668,7 @@ public class FlowController {
@Post(uri = "/validate/task", consumes = MediaType.APPLICATION_YAML)
@Operation(tags = {"Flows"}, summary = "Validate a task")
public ValidateConstraintViolation validateTask(
@RequestBody(description = "A task definition that can be from tasks or triggers") @Body String task,
@RequestBody(description = "A task definition that can be from tasks or triggers") @Schema(implementation = Object.class) @Body String task,
@Parameter(description = "The type of task") @QueryValue TaskValidationType section
) {
ValidateConstraintViolation.ValidateConstraintViolationBuilder<?, ?> validateConstraintViolationBuilder = ValidateConstraintViolation.builder();
@@ -703,12 +705,12 @@ public class FlowController {
summary = "Export flows as a ZIP archive of yaml sources."
)
public HttpResponse<byte[]> exportFlowsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat() List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the flows to include") @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
) throws IOException {
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
@@ -741,12 +743,12 @@ public class FlowController {
summary = "Delete flows returned by the query parameters."
)
public HttpResponse<BulkResponse> deleteFlowsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the flows to include") @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
) {
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
@@ -784,12 +786,12 @@ public class FlowController {
summary = "Disable flows returned by the query parameters."
)
public HttpResponse<BulkResponse> disableFlowsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat() List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the flows to include") @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
) {
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
@@ -816,12 +818,12 @@ public class FlowController {
summary = "Enable flows returned by the query parameters."
)
public HttpResponse<BulkResponse> enableFlowsByQuery(
@Parameter(description = "Filters") @QueryFilterFormat() List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat() List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the flows to include") @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'") @Nullable @QueryValue @Format("MULTI") List<String> labels
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
) {
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);

View File

@@ -22,8 +22,8 @@ import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import jakarta.inject.Inject;
import java.io.*;
import java.time.*;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
@Validated
@@ -91,7 +91,7 @@ public class KVController {
}
@ExecuteOn(TaskExecutors.IO)
@Put(uri = "{key}", consumes = {MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN})
@Put(uri = "{key}", consumes = {MediaType.TEXT_PLAIN})
@Operation(tags = {"KV"}, summary = "Puts a key-value pair in store")
public void setKeyValue(
HttpHeaders httpHeaders,

View File

@@ -27,6 +27,7 @@ import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import jakarta.inject.Inject;
import jakarta.validation.constraints.Min;
import org.slf4j.event.Level;
@@ -66,7 +67,7 @@ public class LogController {
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
@Parameter(description = "Filters") @Nullable @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @Nullable @QueryFilterFormat List<QueryFilter> filters,
// Deprecated params
@Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix",deprecated = true) @Nullable @QueryValue String namespace,

View File

@@ -19,6 +19,7 @@ import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import jakarta.inject.Inject;
import java.io.IOException;
@@ -44,7 +45,7 @@ public class NamespaceSecretController {
@Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page,
@Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort,
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters
) throws IllegalArgumentException, IOException {
final String tenantId = this.tenantService.resolveTenant();
List<String> items = secretService.inheritedSecrets(tenantId, namespace).get(namespace).stream().toList();

View File

@@ -20,6 +20,7 @@ import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.media.ExampleObject;
import jakarta.inject.Inject;
import jakarta.validation.constraints.Min;
@@ -44,7 +45,7 @@ public class TaskRunController {
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
// Deprecated params
@Parameter(description = "A string filter",deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,

View File

@@ -32,6 +32,7 @@ import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import jakarta.inject.Inject;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
@@ -82,7 +83,7 @@ public class TriggerController {
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") @Min(1) int page,
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") @Min(1) int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
// Deprecated params
@Parameter(description = "A string filter",deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@@ -205,10 +206,10 @@ public class TriggerController {
@Post(uri = "/unlock/by-query")
@Operation(tags = {"Triggers"}, summary = "Unlock triggers by query parameters")
public MutableHttpResponse<?> unlockTriggersByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace
) {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
@@ -280,13 +281,13 @@ public class TriggerController {
if (abstractTrigger == null) {
throw new HttpStatusException(HttpStatus.NOT_FOUND, String.format("Flow %s has no trigger %s", newTrigger.getFlowId(), newTrigger.getTriggerId()));
}
if (abstractTrigger instanceof RealtimeTriggerInterface) {
throw new IllegalArgumentException("Realtime triggers can not be updated through the API, please edit the trigger from the flow.");
}
Trigger updatedTrigger;
if (newTrigger.getBackfill() != null) {
try {
updatedTrigger = setTriggerBackfill(newTrigger, maybeFlow.get(), abstractTrigger);
@@ -296,13 +297,13 @@ public class TriggerController {
} else {
updatedTrigger = setTriggerDisabled(newTrigger.uid(), newTrigger.getDisabled(), abstractTrigger, maybeFlow.get());
}
if (updatedTrigger == null) {
return HttpResponse.notFound();
}
return HttpResponse.ok(updatedTrigger);
}
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "/{namespace}/{flowId}/{triggerId}/restart")
@Operation(tags = {"Triggers"}, summary = "Restart a trigger")
@@ -369,10 +370,10 @@ public class TriggerController {
@Post(uri = "/backfill/pause/by-query")
@Operation(tags = {"Triggers"}, summary = "Pause backfill for given triggers")
public MutableHttpResponse<?> pauseBackfillByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace
) throws QueueException {
// Updating the backfill within the flux does not works
List<Trigger> triggers = triggerRepository
@@ -408,10 +409,10 @@ public class TriggerController {
@Post(uri = "/backfill/unpause/by-query")
@Operation(tags = {"Triggers"}, summary = "Unpause backfill for given triggers")
public MutableHttpResponse<?> unpauseBackfillByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace
) throws QueueException {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
@@ -477,10 +478,10 @@ public class TriggerController {
@Post(uri = "/backfill/delete/by-query")
@Operation(tags = {"Triggers"}, summary = "Delete backfill for given triggers")
public MutableHttpResponse<?> deleteBackfillByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace
) throws QueueException {
filters = RequestUtils.getFiltersOrDefaultToLegacyMapping(
filters,
@@ -521,10 +522,10 @@ public class TriggerController {
@Post(uri = "/set-disabled/by-query")
@Operation(tags = {"Triggers"}, summary = "Disable/enable triggers by query parameters")
public MutableHttpResponse<?> disabledTriggersByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters,
@Parameter(description = "Filters", in = ParameterIn.QUERY) @QueryFilterFormat List<QueryFilter> filters,
@Deprecated @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A string filter", deprecated = true) @Nullable @QueryValue(value = "q") String query,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Parameter(description = "The disabled state") @QueryValue(defaultValue = "true") Boolean disabled
) throws QueueException {
@@ -557,24 +558,24 @@ public class TriggerController {
public void setTriggerDisabled(Trigger trigger, Boolean disabled) throws QueueException {
Optional<Flow> maybeFlow = this.flowRepository.findById(this.tenantService.resolveTenant(), trigger.getNamespace(), trigger.getFlowId());
if (maybeFlow.isEmpty()) {
return; // Flow doesn't exist
}
Optional<AbstractTrigger> maybeAbstractTrigger = maybeFlow.flatMap(flow -> flow.getTriggers().stream().filter(t -> t.getId().equals(trigger.getTriggerId())).findFirst());
if (maybeAbstractTrigger.isEmpty()) {
return; // Trigger doesn't exist
}
if (maybeAbstractTrigger.get() instanceof RealtimeTriggerInterface) {
return; // RealTimeTriggers can't be disabled/enabled through API.
}
setTriggerDisabled(trigger.uid(), disabled, maybeAbstractTrigger.get(), maybeFlow.get());
}
private Trigger setTriggerDisabled(String triggerUID, Boolean disabled, AbstractTrigger triggerDefinition, Flow flow) throws QueueException {
return this.triggerRepository.lock(triggerUID, throwFunction(current -> {
if (disabled.equals(current.getDisabled())) {
@@ -583,46 +584,46 @@ public class TriggerController {
return doSetTriggerDisabled(current, disabled, flow, triggerDefinition);
}));
}
private Trigger setTriggerBackfill(Trigger newTrigger, Flow flow, AbstractTrigger abstractTrigger) throws Exception {
return this.triggerRepository.lock(newTrigger.uid(), throwFunction(current -> doSetTriggerBackfill(current, newTrigger.getBackfill(), flow, abstractTrigger)));
}
protected Trigger doSetTriggerDisabled(Trigger currentState, Boolean disabled, Flow flow, AbstractTrigger trigger) throws QueueException {
Trigger.TriggerBuilder<?, ?> builder = currentState.toBuilder().disabled(disabled);
if (disabled) {
builder = builder.nextExecutionDate(null);
}
Trigger updated = builder.build();
triggerQueue.emit(updated);
return updated;
}
protected Trigger doSetTriggerBackfill(Trigger currentState, Backfill backfill, Flow flow, AbstractTrigger trigger) throws Exception {
Trigger updated;
ZonedDateTime nextExecutionDate = null;
RunContext runContext = runContextFactory.of(flow, trigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
// We must set up the backfill before the update to calculate the next execution date
updated = currentState.withBackfill(backfill);
if (trigger instanceof PollingTriggerInterface pollingTriggerInterface) {
nextExecutionDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.of(updated));
}
updated = updated
.toBuilder()
.nextExecutionDate(nextExecutionDate)
.build();
triggerQueue.emit(updated);
return updated;
}
public int backfillsAction(List<Trigger> triggers, BACKFILL_ACTION action) throws QueueException {
AtomicInteger count = new AtomicInteger();
triggers.forEach(throwConsumer(trigger -> {

View File

@@ -686,7 +686,7 @@ class ExecutionControllerRunnerTest {
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
assertThat(restartedExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNull();
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
});
},
@@ -700,7 +700,7 @@ class ExecutionControllerRunnerTest {
assertThat(finishedRestartedExecution.getTaskRunList().getFirst().getAttempts().size()).isEqualTo(1);
assertThat(finishedRestartedExecution.getTaskRunList().get(1).getAttempts().size()).isEqualTo(1);
assertThat(finishedRestartedExecution.getTaskRunList().get(2).getAttempts()).isNull();
assertThat(finishedRestartedExecution.getTaskRunList().get(2).getAttempts()).isNotNull();
assertThat(finishedRestartedExecution.getTaskRunList().get(2).getState().getHistories().stream().filter(state -> state.getState() == State.Type.PAUSED).count()).isEqualTo(1L);
assertThat(finishedRestartedExecution.getTaskRunList().get(3).getAttempts().size()).isEqualTo(2);
assertThat(finishedRestartedExecution.getTaskRunList().get(4).getAttempts().size()).isEqualTo(1);

View File

@@ -163,16 +163,16 @@ class KVControllerTest {
static Stream<Arguments> kvSetKeyValueArgs() {
return Stream.of(
Arguments.of(MediaType.APPLICATION_JSON, "{\"hello\":\"world\"}", Map.class),
Arguments.of(MediaType.APPLICATION_JSON, "[\"hello\",\"world\"]", List.class),
Arguments.of(MediaType.APPLICATION_JSON, "\"hello\"", String.class),
Arguments.of(MediaType.APPLICATION_JSON, "1", Integer.class),
Arguments.of(MediaType.APPLICATION_JSON, "1.0", BigDecimal.class),
Arguments.of(MediaType.APPLICATION_JSON, "true", Boolean.class),
Arguments.of(MediaType.APPLICATION_JSON, "false", Boolean.class),
Arguments.of(MediaType.APPLICATION_JSON, "2021-09-01", LocalDate.class),
Arguments.of(MediaType.APPLICATION_JSON, "2021-09-01T01:02:03Z", Instant.class),
Arguments.of(MediaType.APPLICATION_JSON, "\"PT5S\"", Duration.class)
Arguments.of(MediaType.TEXT_PLAIN, "{\"hello\":\"world\"}", Map.class),
Arguments.of(MediaType.TEXT_PLAIN, "[\"hello\",\"world\"]", List.class),
Arguments.of(MediaType.TEXT_PLAIN, "\"hello\"", String.class),
Arguments.of(MediaType.TEXT_PLAIN, "1", Integer.class),
Arguments.of(MediaType.TEXT_PLAIN, "1.0", BigDecimal.class),
Arguments.of(MediaType.TEXT_PLAIN, "true", Boolean.class),
Arguments.of(MediaType.TEXT_PLAIN, "false", Boolean.class),
Arguments.of(MediaType.TEXT_PLAIN, "2021-09-01", LocalDate.class),
Arguments.of(MediaType.TEXT_PLAIN, "2021-09-01T01:02:03Z", Instant.class),
Arguments.of(MediaType.TEXT_PLAIN, "\"PT5S\"", Duration.class)
);
}
@@ -256,7 +256,7 @@ class KVControllerTest {
assertThat(httpClientResponseException.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
assertThat(httpClientResponseException.getMessage()).isEqualTo(expectedErrorMessage);
httpClientResponseException = Assertions.assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/bad$key", "\"content\"").contentType(MediaType.APPLICATION_JSON)));
httpClientResponseException = Assertions.assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/bad$key", "\"content\"").contentType(MediaType.TEXT_PLAIN)));
assertThat(httpClientResponseException.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
assertThat(httpClientResponseException.getMessage()).isEqualTo(expectedErrorMessage);