Compare commits

..

8 Commits

Author SHA1 Message Date
Loïc Mathieu
8746c24170 chore(system): extract queue consumers processing into message handlers 2025-11-19 11:40:56 +01:00
Loïc Mathieu
a09f61fcfd fix(flow): flow trigger with both conditions and preconditions
When a flow have both a condition and a precondition, the condition was evaluated twice which lead to double execution triggered.

Fixes
2025-11-14 18:16:32 +01:00
Loïc Mathieu
687ce00d33 fix(test): increase indexing waiting sleep 2025-11-13 18:08:39 +01:00
Loïc Mathieu
133828bdf1 feat(core): remove deprecated runner property in favor or taskRunner
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
94d0975b78 feat(core): remove Property deprecated methdso and constructors
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
b6e44954c6 feat(flow): remove FILE input extension
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
0167f5f806 feat(flow): remove JSON flow support
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
97769faba7 feat(flow): remove state store
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
107 changed files with 1121 additions and 3811 deletions

View File

@@ -31,8 +31,6 @@ dependencies {
implementation project(":jdbc-mysql")
implementation project(":jdbc-postgres")
implementation project(":queue")
implementation project(":storage-local")
// Kestra server components

View File

@@ -1,7 +1,6 @@
package io.kestra.cli.commands.sys;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -16,7 +15,6 @@ import picocli.CommandLine;
ReindexCommand.class,
DatabaseCommand.class,
SubmitQueuedCommand.class,
StateStoreCommand.class
}
)
@Slf4j

View File

@@ -1,27 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine;
@CommandLine.Command(
name = "state-store",
description = "Manage Kestra State Store",
mixinStandardHelpOptions = true,
subcommands = {
StateStoreMigrateCommand.class,
}
)
public class StateStoreCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "sys", "state-store", "--help");
return 0;
}
}

View File

@@ -1,81 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@CommandLine.Command(
name = "migrate",
description = "Migrate old state store files to use the new KV Store implementation.",
mixinStandardHelpOptions = true
)
@Slf4j
public class StateStoreMigrateCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@Override
public Integer call() throws Exception {
super.call();
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
try {
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
} catch (IOException e) {
return Stream.empty();
}
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
Flow flow = stateStoreFileUrisForAFlow.getKey();
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
String stateName = statesUriPart[0];
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
stdOut("Successfully ran the state-store migration.");
return 0;
}
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
Map<String, String> flowVariables = new HashMap<>();
flowVariables.put("tenantId", flow.getTenantId());
flowVariables.put("id", flow.getId());
flowVariables.put("namespace", flow.getNamespace());
return runContextFactory.of(flow, Map.of("flow", flowVariables));
}
}

View File

@@ -1,27 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys state-store");
}
}
}

View File

@@ -1,75 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreMigrateCommandTest {
@Test
void runMigration() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
Flow flow = Flow.builder()
.tenantId("my-tenant")
.id("a-flow")
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build();
flowRepository.create(GenericFlow.of(flow));
StorageInterface storage = ctx.getBean(StorageInterface.class);
String tenantId = flow.getTenantId();
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
storage.put(
tenantId,
flow.getNamespace(),
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
"id", flow.getId(),
"namespace", flow.getNamespace()
)));
StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
String[] args = {};
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(call).isZero();
}
}
}

View File

@@ -127,8 +127,6 @@ public class MetricRegistry {
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
public static final String METRIC_QUEUE_PRODUCE_COUNT = "queue.produce.count";
public static final String METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION = "Total number of produced messages";
public static final String METRIC_QUEUE_RECEIVE_COUNT = "queue.receive.count";
public static final String METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION = "Total number of received messages";
public static final String METRIC_QUEUE_RECEIVE_DURATION = "queue.receive.duration";
public static final String METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION = "Queue duration to receive and consume a batch of messages";
public static final String METRIC_QUEUE_POLL_SIZE = "queue.poll.size";

View File

@@ -4,8 +4,6 @@ import java.util.Set;
import io.kestra.core.models.flows.Input;
import io.kestra.core.validations.FileInputValidation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -19,17 +17,14 @@ import java.util.List;
@FileInputValidation
public class FileInput extends Input<URI> {
private static final String DEFAULT_EXTENSION = ".upl";
public static final String DEFAULT_EXTENSION = ".upl";
@Deprecated(since = "0.24", forRemoval = true)
public String extension;
/**
* List of allowed file extensions (e.g., [".csv", ".txt", ".pdf"]).
* Each extension must start with a dot.
*/
private List<String> allowedFileExtensions;
/**
* Gets the file extension from the URI's path
*/
@@ -53,15 +48,4 @@ public class FileInput extends Input<URI> {
);
}
}
public static String findFileInputExtension(@NotNull final List<Input<?>> inputs, @NotNull final String fileName) {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);
return res.startsWith(".") ? res : "." + res;
}
}

View File

@@ -54,12 +54,7 @@ public class Property<T> {
private String expression;
private T value;
/**
* @deprecated use {@link #ofExpression(String)} instead.
*/
@Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) {
private Property(String expression) {
this.expression = expression;
}
@@ -123,14 +118,6 @@ public class Property<T> {
return p;
}
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@Deprecated
public static <V> Property<V> of(V value) {
return ofValue(value);
}
/**
* Build a new Property object with a Pebble expression.<br>
* <p>

View File

@@ -1,9 +1,11 @@
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import jakarta.inject.Singleton;
import java.util.Objects;
import lombok.Setter;
@@ -15,12 +17,14 @@ import java.util.Optional;
@Singleton
public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
private final FlowRepositoryInterface flowRepository;
private final PluginDefaultService pluginDefaultService;
@Setter
private List<FlowWithSource> allFlows;
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository) {
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
this.flowRepository = flowRepository;
this.pluginDefaultService = pluginDefaultService;
flowListeners.listen(flows -> allFlows = flows);
}
@@ -53,4 +57,9 @@ public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
public Boolean isReady() {
return true;
}
@Override
public Optional<FlowWithSource> findByExecutionThenInjectDefaults(Execution execution) {
return findByExecution(execution).map(it -> pluginDefaultService.injectDefaults(it, execution));
}
}

View File

@@ -142,7 +142,7 @@ public class FlowInputOutput {
runContext.logger().warn("Using a deprecated way to upload a FILE input. You must set the input 'id' as part name and set the name of the file using the regular 'filename' part attribute.");
}
String inputId = oldStyleInput ? fileUpload.getFilename() : fileUpload.getName();
String fileName = oldStyleInput ? FileInput.findFileInputExtension(inputs, fileUpload.getFilename()) : fileUpload.getFilename();
String fileName = oldStyleInput ? FileInput.DEFAULT_EXTENSION : fileUpload.getFilename();
if (!uploadFiles) {
URI from = URI.create("kestra://" + StorageContext
@@ -153,7 +153,7 @@ public class FlowInputOutput {
sink.next(Map.entry(inputId, from.toString()));
} else {
try {
final String fileExtension = FileInput.findFileInputExtension(inputs, fileName);
final String fileExtension = FileInput.DEFAULT_EXTENSION;
String prefix = StringUtils.leftPad(fileName + "_", 3, "_");
File tempFile = File.createTempFile(prefix, fileExtension);

View File

@@ -49,4 +49,6 @@ public interface FlowMetaStoreInterface {
Optional.of(execution.getFlowRevision())
);
}
Optional<FlowWithSource> findByExecutionThenInjectDefaults(Execution execution);
}

View File

@@ -7,7 +7,6 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.kv.KVStore;
import org.slf4j.Logger;
@@ -170,14 +169,6 @@ public abstract class RunContext implements PropertyContext {
*/
public abstract KVStore namespaceKv(String namespace);
/**
* @deprecated use #namespaceKv(String) instead
*/
@Deprecated(since = "1.1.0", forRemoval = true)
public StateStore stateStore() {
return new StateStore(this, true);
}
/**
* Get access to local paths of the host machine.
*/

View File

@@ -1,114 +0,0 @@
package io.kestra.core.storages;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.storages.kv.KVValue;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
import jakarta.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
/**
* @deprecated use KVStore instead
*/
@Deprecated(since = "1.1.0", forRemoval = true)
public record StateStore(RunContext runContext, boolean hashTaskRunValue) {
public InputStream getState(String stateName, @Nullable String stateSubName, String taskRunValue) throws IOException, ResourceExpiredException {
return this.getState(true, stateName, stateSubName, taskRunValue);
}
/**
* Gets the state for the given state name, sub-name and task run value.
*
* @param flowScoped if true, will scope it to flow, otherwise to namespace
* @param stateName state name
* @param stateSubName state sub-name (optional)
* @param taskRunValue task run value
* @return an InputStream of the state data
*/
public InputStream getState(boolean flowScoped, String stateName, @Nullable String stateSubName, String taskRunValue) throws IOException, ResourceExpiredException {
RunContext.FlowInfo flowInfo = runContext.flowInfo();
// We check if a file containing the state exists in the old state store
URI oldStateStoreUri = this.oldStateStoreUri(flowInfo.namespace(), flowScoped, flowInfo.id(), stateName, taskRunValue, stateSubName);
if (runContext.storage().isFileExist(oldStateStoreUri)) {
throw new MigrationRequiredException("State Store", "sys state-store migrate");
}
String key = this.statePrefix("_", flowScoped, flowInfo.id(), stateName + nameSuffix(stateSubName), taskRunValue);
Optional<KVValue> kvStateValue = runContext.namespaceKv(flowInfo.namespace()).getValue(key);
if (kvStateValue.isEmpty()) {
throw new FileNotFoundException("State " + key + " not found");
}
Object value = kvStateValue.get().value();
if (value instanceof String string) {
return new ByteArrayInputStream(string.getBytes());
} else {
return new ByteArrayInputStream(((byte[]) Objects.requireNonNull(value)));
}
}
public String putState(String stateName, String stateSubName, String taskRunValue, byte[] value) throws IOException {
return this.putState(true, stateName, stateSubName, taskRunValue, value);
}
/**
* Sets the state for the given state name, sub-name and task run value.
*
* @param flowScoped if true, will scope it to flow, otherwise to namespace
* @param stateName state name
* @param stateSubName state sub-name (optional)
* @param taskRunValue task run value
* @param value the state value to store
* @return the KV Store key at which the state is stored
*/
public String putState(boolean flowScoped, String stateName, String stateSubName, String taskRunValue, byte[] value) throws IOException {
RunContext.FlowInfo flowInfo = runContext.flowInfo();
String key = this.statePrefix("_", flowScoped, flowInfo.id(), stateName + nameSuffix(stateSubName), taskRunValue);
runContext.namespaceKv(flowInfo.namespace()).put(key, new KVValueAndMetadata(null, value));
return key;
}
public boolean deleteState(String stateName, String stateSubName, String taskRunValue) throws IOException {
return this.deleteState(true, stateName, stateSubName, taskRunValue);
}
/**
* Deletes the stateName for the given name, sub-name and task run value.
* @param flowScoped if true, will scope it to flow, otherwise to namespace
* @param stateName state name
* @param stateSubName state sub-name (optional)
* @param taskRunValue task run value
* @return true if the state exists and was deleted, false otherwise
*/
public boolean deleteState(boolean flowScoped, String stateName, String stateSubName, String taskRunValue) throws IOException {
RunContext.FlowInfo flowInfo = runContext.flowInfo();
return runContext.namespaceKv(flowInfo.namespace()).delete(
this.statePrefix("_", flowScoped, flowInfo.id(), stateName + nameSuffix(stateSubName), taskRunValue)
);
}
private URI oldStateStoreUri(String namespace, boolean flowScoped, String flowId, String stateName, @Nullable String taskRunValue, String name) {
return URI.create("kestra:/" + namespace.replace(".", "/") + "/" + this.statePrefix("/", flowScoped, flowId, stateName, taskRunValue) + (name == null ? "" : ("/" + name)));
}
private String statePrefix(String separator, boolean flowScoped, String flowId, String stateName, @Nullable String taskRunValue) {
String flowIdPrefix = (!flowScoped || flowId == null) ? "" : (Slugify.of(flowId) + separator);
return flowIdPrefix + "states" + separator + stateName + (taskRunValue == null ? "" : (separator + (hashTaskRunValue ? Hashing.hashToString(taskRunValue) : taskRunValue)));
}
private static String nameSuffix(String name) {
return Optional.ofNullable(name).map(n -> "_" + n).orElse("");
}
}

View File

@@ -1,98 +0,0 @@
package io.kestra.plugin.core.state;
import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import jakarta.validation.constraints.NotNull;
import org.apache.commons.lang3.tuple.Pair;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Deprecated(since = "1.1.0", forRemoval = true)
public abstract class AbstractState extends Task {
private static final TypeReference<Map<String, Object>> TYPE_REFERENCE = new TypeReference<>() {};
public static final String TASKS_STATES = "tasks-states";
@Schema(
title = "The name of the state file"
)
@NotNull
@Builder.Default
protected Property<String> name = Property.ofValue("default");
@Schema(
title = "Share state for the current namespace.",
description = "By default, the state is isolated by namespace **and** flow, setting to `true` will share the state between the **same** namespace"
)
@Builder.Default
private final Property<Boolean> namespace = Property.ofValue(false);
@Schema(
title = "Isolate the state with `taskrun.value`.",
description = "By default, the state will be isolated with `taskrun.value` (during iteration with each). Setting to `false` will use the same state for every run of the iteration."
)
@Builder.Default
private final Property<Boolean> taskrunValue = Property.ofValue(true);
protected Map<String, Object> get(RunContext runContext) throws IllegalVariableEvaluationException, IOException, ResourceExpiredException {
return JacksonMapper.ofJson(false).readValue(runContext.stateStore().getState(
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext)
), TYPE_REFERENCE);
}
protected Pair<String, Map<String, Object>> merge(RunContext runContext, Map<String, Object> map) throws IllegalVariableEvaluationException, IOException, ResourceExpiredException {
Map<String, Object> current;
try {
current = this.get(runContext);
} catch (FileNotFoundException e) {
current = Map.of();
}
Map<String, Object> merge = MapUtils.deepMerge(current, runContext.render(map));
String key = runContext.stateStore().putState(
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext),
JacksonMapper.ofJson(false).writeValueAsBytes(merge)
);
return Pair.of(key, merge);
}
protected boolean delete(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
return runContext.stateStore().deleteState(
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
TASKS_STATES,
runContext.render(this.name).as(String.class).orElse(null),
taskRunValue(runContext)
);
}
private String taskRunValue(RunContext runContext) throws IllegalVariableEvaluationException {
return Boolean.TRUE.equals(runContext.render(this.taskrunValue).as(Boolean.class).orElseThrow()) ?
runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null) : null;
}
}

View File

@@ -1,74 +0,0 @@
package io.kestra.plugin.core.state;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.io.FileNotFoundException;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Delete a state from the state store (Deprecated, use KV store instead)."
)
@Plugin(
examples = {
@Example(
title = "Delete the default state for the current flow.",
code = {
"id: delete_state",
"type: io.kestra.plugin.core.state.Delete",
},
full = true
),
@Example(
title = "Delete the `myState` state for the current flow.",
code = {
"id: delete_state",
"type: io.kestra.plugin.core.state.Delete",
"name: myState",
},
full = true
)
},
aliases = "io.kestra.core.tasks.states.Delete"
)
@Deprecated(since = "1.1.0", forRemoval = true)
public class Delete extends AbstractState implements RunnableTask<Delete.Output> {
@Schema(
title = "Raise an error if the state is not found."
)
@Builder.Default
private final Property<Boolean> errorOnMissing = Property.ofValue(false);
@Override
public Output run(RunContext runContext) throws Exception {
boolean delete = this.delete(runContext);
if (Boolean.TRUE.equals(runContext.render(errorOnMissing).as(Boolean.class).orElseThrow()) && !delete) {
throw new FileNotFoundException("Unable to find the state file '" + runContext.render(this.name).as(String.class).orElseThrow() + "'");
}
return Output.builder()
.deleted(delete)
.build();
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "Flag specifying whether the state file was deleted"
)
private final Boolean deleted;
}
}

View File

@@ -1,86 +0,0 @@
package io.kestra.plugin.core.state;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.io.FileNotFoundException;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Get a state from the state store (Deprecated, use KV store instead)."
)
@Plugin(
examples = {
@Example(
title = "Get the default state file for the current flow.",
code = {
"id: get_state",
"type: io.kestra.plugin.core.state.Get",
},
full = true
),
@Example(
title = "Get the `myState` state for the current flow.",
code = {
"id: get_state",
"type: io.kestra.plugin.core.state.Get",
"name: myState",
},
full = true
)
},
aliases = "io.kestra.core.tasks.states.Get"
)
@Deprecated(since = "1.1.0", forRemoval = true)
public class Get extends AbstractState implements RunnableTask<Get.Output> {
@Schema(
title = "Raise an error if the state file is not found."
)
@Builder.Default
private final Property<Boolean> errorOnMissing = Property.ofValue(false);
@Override
public Output run(RunContext runContext) throws Exception {
Map<String, Object> data;
try {
data = this.get(runContext);
} catch (FileNotFoundException e) {
if (Boolean.TRUE.equals(runContext.render(this.errorOnMissing).as(Boolean.class).orElseThrow())) {
throw e;
}
data = Map.of();
}
return Output.builder()
.count(data.size())
.data(data)
.build();
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The count of properties found in the state"
)
private final int count;
@Schema(
title = "The data extracted from the state"
)
private final Map<String, Object> data;
}
}

View File

@@ -1,89 +0,0 @@
package io.kestra.plugin.core.state;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.tuple.Pair;
import java.net.URI;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Set a state in the state store (Deprecated, use KV store instead).",
description = "Values will be merged: \n" +
"* If you provide a new key, the new key will be added.\n" +
"* If you provide an existing key, the previous key will be overwrite.\n" +
"\n" +
"::alert{type=\"warning\"}\n" +
"This method is not concurrency safe. If many executions for the same flow are concurrent, there is no guarantee on isolation on the value.\n" +
"The value can be overwritten by other executions.\n" +
"::\n"
)
@Plugin(
examples = {
@Example(
title = "Set the default state for the current flow.",
code = {
"id: set_state",
"type: io.kestra.plugin.core.state.Set",
"data:",
" '{{ inputs.store }}': '{{ outputs.download.md5 }}'",
},
full = true
),
@Example(
title = "Set the `myState` state for the current flow.",
code = {
"id: set_state",
"type: io.kestra.plugin.core.state.Set",
"name: myState",
"data:",
" '{{ inputs.store }}': '{{ outputs.download.md5 }}'",
},
full = true
)
},
aliases = "io.kestra.core.tasks.states.Set"
)
@Deprecated(since = "1.1.0", forRemoval = true)
public class Set extends AbstractState implements RunnableTask<Set.Output> {
@Schema(
title = "The data to be stored in the state store"
)
private Property<Map<String, Object>> data;
@Override
public Output run(RunContext runContext) throws Exception {
Pair<String, Map<String, Object>> dataRendered = this.merge(runContext, runContext.render(this.data).asMap(String.class, Object.class));
return Output.builder()
.count(dataRendered.getRight().size())
.key(dataRendered.getLeft())
.build();
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The count of properties found in the state"
)
private final int count;
@Schema(
title = "The key of the current state"
)
private final String key;
}
}

View File

@@ -1,4 +0,0 @@
@PluginSubGroup(categories = { PluginSubGroup.PluginCategory.STORAGE, PluginSubGroup.PluginCategory.CORE})
package io.kestra.plugin.core.state;
import io.kestra.core.models.annotations.PluginSubGroup;

View File

@@ -1,6 +1,10 @@
package io.kestra.core.docs;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.plugins.PluginClassAndMetadata;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.core.runner.Process;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.plugins.PluginScanner;
@@ -8,9 +12,14 @@ import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.plugin.core.debug.Return;
import io.kestra.plugin.core.flow.Dag;
import io.kestra.plugin.core.flow.Subflow;
import io.kestra.plugin.core.state.Set;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import jakarta.validation.constraints.NotBlank;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -126,17 +135,17 @@ class DocumentationGeneratorTest {
@SuppressWarnings("unchecked")
@Test
void state() throws IOException {
void deprecated() throws IOException {
PluginScanner pluginScanner = new PluginScanner(ClassPluginDocumentationTest.class.getClassLoader());
RegisteredPlugin scan = pluginScanner.scan();
Class<Set> set = scan.findClass(Set.class.getName()).orElseThrow();
Class<DeprecatedTask> set = scan.findClass(DeprecatedTask.class.getName()).orElseThrow();
PluginClassAndMetadata<Task> metadata = PluginClassAndMetadata.create(scan, set, Task.class, null);
ClassPluginDocumentation<? extends Task> doc = ClassPluginDocumentation.of(jsonSchemaGenerator, metadata, scan.version(), false);
String render = DocumentationGenerator.render(doc);
assertThat(render).contains("Set");
assertThat(render).contains("DeprecatedTask");
assertThat(render).contains("::alert{type=\"warning\"}\n");
}
@@ -178,4 +187,26 @@ class DocumentationGeneratorTest {
assertThat(render).contains("title: Process");
assertThat(render).contains("Task runner that executes a task as a subprocess on the Kestra host.");
}
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Deprecated
public static class DeprecatedTask extends Task implements RunnableTask<VoidOutput> {
@PluginProperty(dynamic = true)
@Deprecated
private String someProperty;
@NotBlank
@PluginProperty(dynamic = true)
@Deprecated
private String additionalProperty;
@Override
public VoidOutput run(RunContext runContext) {
return null;
}
}
}

View File

@@ -429,7 +429,7 @@ class HttpClientTest {
@Test
void shouldReturnResponseForAllowedResponseCode() throws IOException, IllegalVariableEvaluationException, HttpClientException {
try (HttpClient client = client(b -> b.configuration(HttpConfiguration.builder().allowedResponseCodes(Property.of(List.of(404))).build()))) {
try (HttpClient client = client(b -> b.configuration(HttpConfiguration.builder().allowedResponseCodes(Property.ofValue(List.of(404))).build()))) {
HttpResponse<Map<String, String>> response = client.request(HttpRequest.of(URI.create(embeddedServerUri + "/http/error?status=404")));
assertThat(response.getStatus().getCode()).isEqualTo(404);
@@ -438,7 +438,7 @@ class HttpClientTest {
@Test
void shouldThrowExceptionForNotAllowedResponseCode() throws IOException, IllegalVariableEvaluationException {
try (HttpClient client = client(b -> b.configuration(HttpConfiguration.builder().allowedResponseCodes(Property.of(List.of(404))).build()))) {
try (HttpClient client = client(b -> b.configuration(HttpConfiguration.builder().allowedResponseCodes(Property.ofValue(List.of(404))).build()))) {
URI uri = URI.create(embeddedServerUri + "/http/error?status=405");
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> {

View File

@@ -1,13 +1,10 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.net.URI;
import java.util.List;
import java.util.Arrays;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@@ -15,28 +12,6 @@ import static org.junit.jupiter.api.Assertions.*;
class FileInputTest {
@Test
void shouldGetExtensionWhenFindingFileExtensionForExistingFile() {
List<Input<?>> inputs = List.of(
FileInput.builder().id("test-file1").extension(".zip").build(),
FileInput.builder().id("test-file2").extension(".gz").build()
);
String result = FileInput.findFileInputExtension(inputs, "test-file1");
Assertions.assertEquals(".zip", result);
}
@Test
void shouldReturnDefaultExtensionWhenFindingExtensionForUnknownFile() {
List<Input<?>> inputs = List.of(
FileInput.builder().id("test-file1").extension(".zip").build(),
FileInput.builder().id("test-file2").extension(".gz").build()
);
String result = FileInput.findFileInputExtension(inputs, "???");
Assertions.assertEquals(".upl", result);
}
@Test
void validateValidFileTypes() {
final FileInput csvInput = FileInput.builder()

View File

@@ -74,7 +74,7 @@ public abstract class AbstractLockRepositoryTest {
// In Elasticsearch, as we search then delete we need to wait for the refresh period
// This would not be an issue in real-case scenario as when we delete by owner the service would be down for a certain amount of time
Thread.sleep(500);
Thread.sleep(1000);
List<Lock> deleted = lockRepository.deleteByOwner("me");
assertThat(deleted).hasSize(1);

View File

@@ -258,6 +258,12 @@ public abstract class AbstractRunnerTest {
multipleConditionTriggerCaseTest.flowTriggerMultipleConditions();
}
@Test
@LoadFlows({"flows/valids/flow-trigger-mixed-conditions-flow-a.yaml", "flows/valids/flow-trigger-mixed-conditions-flow-listen.yaml"})
void flowTriggerMixedConditions() throws Exception {
multipleConditionTriggerCaseTest.flowTriggerMixedConditions();
}
@Test
@LoadFlows({"flows/valids/each-null.yaml"})
void eachWithNull() throws Exception {

View File

@@ -170,4 +170,24 @@ public class MultipleConditionTriggerCaseTest {
e -> e.getState().getCurrent().equals(Type.SUCCESS),
MAIN_TENANT, "io.kestra.tests.trigger.multiple.conditions", "flow-trigger-multiple-conditions-flow-listen", Duration.ofSeconds(1)));
}
public void flowTriggerMixedConditions() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.mixed.conditions",
"flow-trigger-mixed-conditions-flow-a");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// trigger is done
Execution triggerExecution = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS),
MAIN_TENANT, "io.kestra.tests.trigger.mixed.conditions", "flow-trigger-mixed-conditions-flow-listen");
executionRepository.delete(triggerExecution);
assertThat(triggerExecution.getTaskRunList().size()).isEqualTo(1);
assertThat(triggerExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// we assert that we didn't have any other flow triggered
assertThrows(RuntimeException.class, () -> runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS),
MAIN_TENANT, "io.kestra.tests.trigger.mixed.conditions", "flow-trigger-mixed-conditions-flow-listen", Duration.ofSeconds(1)));
}
}

View File

@@ -220,7 +220,7 @@ class YamlParserTest {
);
assertThat(exception.getConstraintViolations().size()).isEqualTo(1);
assertThat(new ArrayList<>(exception.getConstraintViolations()).getFirst().getMessage()).contains("Duplicate field 'variables.tf'");
assertThat(new ArrayList<>(exception.getConstraintViolations()).getFirst().getMessage()).contains("Duplicate field 'message'");
}
private Flow parse(String path) {

View File

@@ -1,94 +0,0 @@
package io.kestra.core.storages;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.log.Log;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest
public class StateStoreTest {
@Inject
private TestRunContextFactory runContextFactory;
@Test
void all() throws IOException, ResourceExpiredException {
RunContext runContext = runContext();
String state = IdUtils.create();
runContext.stateStore().putState(state, "some-name", "my-taskrun-value", "my-value".getBytes());
assertThat(runContext.stateStore().getState(state, "some-name", "my-taskrun-value").readAllBytes()).isEqualTo("my-value".getBytes());
RunContext.FlowInfo flowInfo = runContext.flowInfo();
String key = flowInfo.id() + "_states_" + state + "_some-name_" + Hashing.hashToString("my-taskrun-value");
assertThat(runContext.namespaceKv(flowInfo.namespace()).getValue(key).get().value()).isEqualTo("my-value".getBytes());
runContext.stateStore().deleteState(state, "some-name", "my-taskrun-value");
FileNotFoundException fileNotFoundException = Assertions.assertThrows(FileNotFoundException.class, () -> runContext.stateStore().getState(state, "some-name", "my-taskrun-value"));
assertThat(fileNotFoundException.getMessage()).isEqualTo("State " + key + " not found");
}
@Test
void getState_WithOldStateStore_ShouldThrowMigrationException() throws IOException, ResourceExpiredException {
RunContext runContext = runContext();
String state = IdUtils.create();
RunContext.FlowInfo flowInfo = runContext.flowInfo();
URI oldStateStoreFileUri = URI.create("kestra:/" + flowInfo.namespace().replace(".", "/") + "/" + flowInfo.id() + "/states/" + state + "/" + Hashing.hashToString("my-taskrun-value") + "/some-name");
byte[] expectedContent = "from-old-state".getBytes();
runContext.storage().putFile(new ByteArrayInputStream(expectedContent), oldStateStoreFileUri);
String key = flowInfo.id() + "_states_" + state + "_some-name_" + Hashing.hashToString("my-taskrun-value");
assertThat(runContext.storage().getFile(oldStateStoreFileUri).readAllBytes()).isEqualTo(expectedContent);
MigrationRequiredException migrationRequiredException = Assertions.assertThrows(MigrationRequiredException.class, () -> runContext.stateStore().getState(state, "some-name", "my-taskrun-value"));
assertThat(migrationRequiredException.getMessage()).isEqualTo("It looks like the State Store migration hasn't been run, please run the `/app/kestra sys state-store migrate` command before.");
assertThat(runContext.namespaceKv(flowInfo.namespace()).getValue(key).isEmpty()).isTrue();
}
@Test
void subNameAndTaskrunValueOptional() throws IOException, ResourceExpiredException {
RunContext runContext = runContext();
String state = IdUtils.create();
runContext.stateStore().putState(state, "a-name", "a-taskrun-value", "aa-value".getBytes());
runContext.stateStore().putState(state, "a-name", "b-taskrun-value", "ab-value".getBytes());
runContext.stateStore().putState(state, "b-name", "a-taskrun-value", "ba-value".getBytes());
runContext.stateStore().putState(state, "b-name", "b-taskrun-value", "bb-value".getBytes());
runContext.stateStore().putState(state, null, "a-taskrun-value", "0a-value".getBytes());
runContext.stateStore().putState(state, null, "b-taskrun-value", "0b-value".getBytes());
runContext.stateStore().putState(state, "a-name", null, "a0-value".getBytes());
runContext.stateStore().putState(state, "b-name", null, "b0-value".getBytes());
assertThat(runContext.stateStore().getState(state, "a-name", "a-taskrun-value").readAllBytes()).isEqualTo("aa-value".getBytes());
assertThat(runContext.stateStore().getState(state, "a-name", "b-taskrun-value").readAllBytes()).isEqualTo("ab-value".getBytes());
assertThat(runContext.stateStore().getState(state, "b-name", "a-taskrun-value").readAllBytes()).isEqualTo("ba-value".getBytes());
assertThat(runContext.stateStore().getState(state, "b-name", "b-taskrun-value").readAllBytes()).isEqualTo("bb-value".getBytes());
assertThat(runContext.stateStore().getState(state, null, "a-taskrun-value").readAllBytes()).isEqualTo("0a-value".getBytes());
assertThat(runContext.stateStore().getState(state, null, "b-taskrun-value").readAllBytes()).isEqualTo("0b-value".getBytes());
assertThat(runContext.stateStore().getState(state, "a-name", null).readAllBytes()).isEqualTo("a0-value".getBytes());
assertThat(runContext.stateStore().getState(state, "b-name", null).readAllBytes()).isEqualTo("b0-value".getBytes());
}
private RunContext runContext() {
return TestsUtils.mockRunContext(runContextFactory, Log.builder().id("log").type(Log.class.getName()).message("logging").build(), null);
}
}

View File

@@ -1,66 +0,0 @@
package io.kestra.plugin.core.flow;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
@KestraTest(startRunner = true)
class StateTest {
public static final String FLOW_ID = "state";
public static final String NAMESPACE = "io.kestra.tests";
@Inject
private TestRunnerUtils runnerUtils;
@SuppressWarnings("unchecked")
@Test
@LoadFlows({"flows/valids/state.yaml"})
void set() throws TimeoutException, QueueException {
String stateName = IdUtils.create();
Execution execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE,
FLOW_ID, null, (f, e) -> ImmutableMap.of(FLOW_ID, stateName));
assertThat(execution.getTaskRunList()).hasSize(5);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(((Map<String, Integer>) execution.findTaskRunsByTaskId("createGet").getFirst().getOutputs().get("data")).get("value")).isEqualTo(1);
execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE,
FLOW_ID, null, (f, e) -> ImmutableMap.of(FLOW_ID, stateName));
assertThat(execution.getTaskRunList()).hasSize(5);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(((Map<String, Object>) execution.findTaskRunsByTaskId("updateGet").getFirst().getOutputs().get("data")).get("value")).isEqualTo("2");
execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE,
FLOW_ID, null, (f, e) -> ImmutableMap.of(FLOW_ID, stateName));
assertThat(execution.getTaskRunList()).hasSize(5);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat((Integer) execution.findTaskRunsByTaskId("deleteGet").getFirst().getOutputs().get("count")).isZero();
}
@SuppressWarnings("unchecked")
@Test
@LoadFlows(value = {"flows/valids/state.yaml"}, tenantId = "tenant1")
void each() throws TimeoutException, InternalException, QueueException {
Execution execution = runnerUtils.runOne("tenant1", NAMESPACE,
FLOW_ID, null, (f, e) -> ImmutableMap.of(FLOW_ID, "each"));
assertThat(execution.getTaskRunList()).hasSize(19);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(((Map<String, String>) execution.findTaskRunByTaskIdAndValue("regetEach1", List.of("b")).getOutputs().get("data")).get("value")).isEqualTo("null-b");
assertThat(((Map<String, String>) execution.findTaskRunByTaskIdAndValue("regetEach2", List.of("b")).getOutputs().get("data")).get("value")).isEqualTo("null-a-b");
}
}

View File

@@ -1,59 +0,0 @@
package io.kestra.plugin.core.state;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class StateNamespaceTest {
@Inject
RunContextFactory runContextFactory;
private RunContext runContextFlow1(Task task) {
return TestsUtils.mockRunContext(runContextFactory, task, Map.of());
}
private RunContext runContextFlow2(Task task) {
return TestsUtils.mockRunContext(runContextFactory, task, Map.of());
}
@Test
void run() throws Exception {
Set set = Set.builder()
.id(IdUtils.create())
.type(Set.class.getSimpleName())
.namespace(Property.ofValue(true))
.data(Property.ofValue(Map.of(
"john", "doe"
)))
.build();
Set.Output setOutput = set.run(runContextFlow1(set));
assertThat(setOutput.getCount()).isEqualTo(1);
Get get = Get.builder()
.id(IdUtils.create())
.type(Get.class.getSimpleName())
.namespace(Property.ofValue(true))
.build();
Get.Output getOutput = get.run(runContextFlow2(get));
assertThat(getOutput.getCount()).isEqualTo(1);
assertThat(getOutput.getData().get("john")).isEqualTo("doe");
get = Get.builder()
.id(IdUtils.create())
.type(Get.class.getSimpleName())
.build();
getOutput = get.run(runContextFlow2(get));
assertThat(getOutput.getCount()).isZero();
}
}

View File

@@ -1,125 +0,0 @@
package io.kestra.plugin.core.state;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.io.FileNotFoundException;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest
class StateTest {
@Inject
RunContextFactory runContextFactory;
@Test
void run() throws Exception {
Get get = Get.builder()
.id(IdUtils.create())
.type(Get.class.getName())
.build();
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, get, Map.of(
"key", "test",
"inc", 1
));
Get.Output getOutput = get.run(runContext);
assertThat(getOutput.getCount()).isZero();
Set set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.data(Property.ofValue(Map.of(
"{{ inputs.key }}", "{{ inputs.inc }}"
)))
.build();
Set.Output setOutput = set.run(runContext);
assertThat(setOutput.getCount()).isEqualTo(1);
get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.build();
getOutput = get.run(runContext);
assertThat(getOutput.getCount()).isEqualTo(1);
assertThat(getOutput.getData().get("test")).isEqualTo("1");
set = Set.builder()
.id(IdUtils.create())
.type(Set.class.toString())
.data(Property.ofValue(Map.of(
"{{ inputs.key }}", "2",
"test2", "3"
)))
.build();
setOutput = set.run(runContext);
assertThat(setOutput.getCount()).isEqualTo(2);
get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.build();
getOutput = get.run(runContext);
assertThat(getOutput.getCount()).isEqualTo(2);
assertThat(getOutput.getData().get("test")).isEqualTo("2");
assertThat(getOutput.getData().get("test2")).isEqualTo("3");
Delete delete = Delete.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.build();
Delete.Output deleteRun = delete.run(runContext);
assertThat(deleteRun.getDeleted()).isTrue();
get = Get.builder()
.id(IdUtils.create())
.type(Get.class.toString())
.build();
getOutput = get.run(runContext);
assertThat(getOutput.getCount()).isZero();
}
@Test
void deleteThrow() {
Delete task = Delete.builder()
.id(IdUtils.create())
.type(Get.class.getName())
.name(Property.ofValue(IdUtils.create()))
.errorOnMissing(Property.ofValue(true))
.build();
assertThrows(FileNotFoundException.class, () -> {
task.run(TestsUtils.mockRunContext(runContextFactory, task, Map.of()));
});
}
@Test
void getThrow() {
Get task = Get.builder()
.id(IdUtils.create())
.type(Get.class.getName())
.name(Property.ofValue(IdUtils.create()))
.errorOnMissing(Property.ofValue(true))
.build();
assertThrows(FileNotFoundException.class, () -> {
task.run(TestsUtils.mockRunContext(runContextFactory, task, Map.of()));
});
}
}

View File

@@ -3,10 +3,6 @@ namespace: io.kestra.tests
tasks:
- id: bad-task
type: "io.kestra.plugin.core.state.Set"
data:
variables.tf: |
test
variables.tf: |
test
name: "unit"
type: io.kestra.plugin.core.log.Log
message: Hello World
message: Hello World

View File

@@ -0,0 +1,10 @@
id: flow-trigger-mixed-conditions-flow-a
namespace: io.kestra.tests.trigger.mixed.conditions
labels:
some: label
tasks:
- id: only
type: io.kestra.plugin.core.debug.Return
format: "from parents: {{execution.id}}"

View File

@@ -0,0 +1,25 @@
id: flow-trigger-mixed-conditions-flow-listen
namespace: io.kestra.tests.trigger.mixed.conditions
triggers:
- id: on_completion
type: io.kestra.plugin.core.trigger.Flow
states: [ SUCCESS ]
conditions:
- type: io.kestra.plugin.core.condition.ExecutionFlow
namespace: io.kestra.tests.trigger.mixed.conditions
flowId: flow-trigger-mixed-conditions-flow-a
- id: on_failure
type: io.kestra.plugin.core.trigger.Flow
states: [ FAILED ]
preconditions:
id: flowsFailure
flows:
- namespace: io.kestra.tests.trigger.multiple.conditions
flowId: flow-trigger-multiple-conditions-flow-a
states: [FAILED]
tasks:
- id: only
type: io.kestra.plugin.core.debug.Return
format: "It works"

View File

@@ -1,99 +0,0 @@
id: state
namespace: io.kestra.tests
inputs:
- id: state
type: STRING
tasks:
- id: if
type: io.kestra.plugin.core.flow.If
condition: "{{ inputs.state == 'each' }}"
then:
- id: delete1
type: io.kestra.plugin.core.state.Delete
name: "{{ inputs.state }}"
- id: each1
type: io.kestra.plugin.core.flow.ForEach
values: '["a", "b"]'
tasks:
- id: getEach1
type: io.kestra.plugin.core.state.Get
name: "{{ inputs.state }}"
- id: setEach1
type: io.kestra.plugin.core.state.Set
name: "{{ inputs.state }}"
data:
value: "{{ (currentEachOutput(outputs.getEach1).data.value ?? 'null') ~ '-' ~ taskrun.value }}"
- id: regetEach1
type: io.kestra.plugin.core.state.Get
name: "{{ inputs.state }}"
- id: delete2
type: io.kestra.plugin.core.state.Delete
name: "{{ inputs.state }}"
- id: each2
type: io.kestra.plugin.core.flow.ForEach
values: '["a", "b"]'
tasks:
- id: getEach2
type: io.kestra.plugin.core.state.Get
taskrunValue: false
name: "{{ inputs.state }}"
- id: setEach2
type: io.kestra.plugin.core.state.Set
name: "{{ inputs.state }}"
taskrunValue: false
data:
value: "{{ (currentEachOutput(outputs.getEach2).data.value ?? 'null') ~ '-' ~ taskrun.value }}"
- id: regetEach2
type: io.kestra.plugin.core.state.Get
taskrunValue: false
name: "{{ inputs.state }}"
- id: delete3
type: io.kestra.plugin.core.state.Delete
name: "{{ inputs.state }}"
else:
- id: state
type: io.kestra.plugin.core.state.Get
name: "{{ inputs.state }}"
- id: switch
type: io.kestra.plugin.core.flow.Switch
value: "{{ (outputs.state.data.value ?? 0) == 0 ? 'create' : ( outputs.state.data.value == 1 ? 'update' : 'delete') }}"
cases:
"create":
- id: create
type: io.kestra.plugin.core.state.Set
name: "{{ inputs.state }}"
data:
value: 1
- id: createGet
type: io.kestra.plugin.core.state.Get
name: "{{ inputs.state }}"
"update":
- id: update
type: io.kestra.plugin.core.state.Set
name: "{{ inputs.state }}"
data:
value: "{{ outputs.state.data.value + 1 }}"
- id: updateGet
type: io.kestra.plugin.core.state.Get
name: "{{ inputs.state }}"
"delete":
- id: delete
type: io.kestra.plugin.core.state.Delete
name: "{{ inputs.state }}"
- id: deleteGet
type: io.kestra.plugin.core.state.Get
name: "{{ inputs.state }}"

View File

@@ -2,19 +2,13 @@ package io.kestra.executor;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.sla.ExecutionMonitoringSLA;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.flows.sla.SLAMonitor;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
@@ -27,23 +21,16 @@ import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.trace.Tracer;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.*;
import io.kestra.plugin.core.flow.ForEachItem;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.kestra.executor.handler.*;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.event.Level;
import java.time.Duration;
import java.time.Instant;
@@ -83,9 +70,6 @@ public class DefaultExecutor implements Executor {
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
private QueueInterface<ExecutionKilled> killQueue;
@Inject
@@ -104,12 +88,8 @@ public class DefaultExecutor implements Executor {
@Inject
private SkipExecutionService skipExecutionService;
@Inject
private PluginDefaultService pluginDefaultService;
@Inject
private ExecutorService executorService;
@Inject
private WorkerGroupService workerGroupService;
@Inject
private ExecutionService executionService;
@Inject
private VariablesService variablesService;
@@ -128,8 +108,6 @@ public class DefaultExecutor implements Executor {
@Inject
private ExecutionQueuedStateStore executionQueuedStateStore;
@Inject
private MultipleConditionStorageInterface multipleConditionStorage;
@Inject
private ExecutionDelayStateStore executionDelayStateStore;
@Inject
private SLAMonitorStateStore slaMonitorStateStore;
@@ -145,6 +123,21 @@ public class DefaultExecutor implements Executor {
@Inject
private FlowListenersInterface flowListeners;
@Inject
private ExecutionMessageHandler executionMessageHandler;
@Inject
private ExecutionEventMessageHandler executionEventMessageHandler;
@Inject
private WorkerTaskResultMessageHandler workerTaskResultMessageHandler;
@Inject
private ExecutionKilledExecutionMessageHandler executionKilledExecutionMessageHandler;
@Inject
private SubflowExecutionResultMessageHandler subflowExecutionResultMessageHandler;
@Inject
private SubflowExecutionEndMessageHandler subflowExecutionEndMessageHandler;
@Inject
private MultipleConditionEventMessageHandler multipleConditionEventMessageHandler;
@Value("${kestra.executor.clean.execution-queue:true}")
private boolean cleanExecutionQueue;
@Value("${kestra.executor.clean.worker-queue:true}")
@@ -162,16 +155,13 @@ public class DefaultExecutor implements Executor {
private List<FlowWithSource> allFlows;
private final Tracer tracer;
private final java.util.concurrent.ExecutorService workerTaskResultExecutorService;
private final java.util.concurrent.ExecutorService executionExecutorService;
private final int numberOfThreads;
@Inject
public DefaultExecutor(TracerFactory tracerFactory, ExecutorsUtils executorsUtils, @Value("${kestra.executor.thread-count:0}") int threadCount) {
this.tracer = tracerFactory.getTracer(DefaultExecutor.class, "EXECUTOR");
public DefaultExecutor(ExecutorsUtils executorsUtils, @Value("${kestra.executor.thread-count:0}") int threadCount) {
// By default, we start available processors count threads with a minimum of 4 by executor service
// for the worker task result queue and the execution queue.
// Other queues would not benefit from more consumers.
@@ -322,22 +312,8 @@ public class DefaultExecutor implements Executor {
return;
}
try {
executionEventQueue.emit(new ExecutionEvent(message, ExecutionEventType.CREATED));
} catch (QueueException e) {
// If we cannot send the execution event, we fail the execution
executionStateStore.lock(message.getId(), execution -> {
try {
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.TERMINATED);
this.executionEventQueue.emit(event);
return new ExecutorContext(failed);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", execution.getId(), ex);
}
return null;
});
}
Optional<ExecutorContext> maybeExecutor = executionMessageHandler.handle(message);
maybeExecutor.ifPresent(this::toExecution);
}
private void executionEventQueue(Either<ExecutionEvent, DeserializationException> either) {
@@ -352,190 +328,8 @@ public class DefaultExecutor implements Executor {
return;
}
ExecutorContext result = executionStateStore.lock(message.executionId(), execution -> tracer.inCurrentContext(
execution,
FlowId.uidWithoutRevision(execution),
() -> {
try {
final FlowWithSource flow = findFlow(execution);
ExecutorContext executor = new ExecutorContext(execution, flow);
// schedule it for later if needed
if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
ExecutionDelay executionDelay = ExecutionDelay.builder()
.executionId(executor.getExecution().getId())
.date(execution.getScheduleDate())
.state(State.Type.RUNNING)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
executionDelayStateStore.save(executionDelay);
return executor;
}
// create an SLA monitor if needed
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty(flow.getSla())) {
List<SLAMonitor> monitors = flow.getSla().stream()
.filter(ExecutionMonitoringSLA.class::isInstance)
.map(ExecutionMonitoringSLA.class::cast)
.map(sla -> SLAMonitor.builder()
.executionId(execution.getId())
.slaId(((SLA) sla).getId())
.deadline(execution.getState().getStartDate().plus(sla.getDuration()))
.build()
)
.toList();
monitors.forEach(monitor -> slaMonitorStateStore.save(monitor));
}
// handle concurrency limit, we need to use a different queue to be sure that execution running
// are processed sequentially so inside a queue with no parallelism
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null) {
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(executor.getFlow().getTenantId())
.namespace(executor.getFlow().getNamespace())
.flowId(executor.getFlow().getId())
.execution(executor.getExecution())
.concurrencyState(ExecutionRunning.ConcurrencyState.CREATED)
.build();
ExecutionRunning processed = concurrencyLimitStateStore.countThenProcess(flow, (txContext, concurrencyLimit) -> {
ExecutionRunning computed = executorService.processExecutionRunning(flow, concurrencyLimit.getRunning(), executionRunning.withExecution(execution)); // be sure that the execution running contains the latest value of the execution
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
return Pair.of(computed, concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1));
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStateStore.save(txContext, ExecutionQueued.fromExecutionRunning(computed));
}
return Pair.of(computed, concurrencyLimit);
});
// if the execution is queued or terminated due to concurrency limit, we stop here
if (processed.getExecution().getState().isTerminated() || processed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
return executor.withExecution(processed.getExecution(), "handleConcurrencyLimit");
}
}
// handle execution changed SLA
executor = executorService.handleExecutionChangedSLA(executor);
// process the execution
if (log.isDebugEnabled()) {
executorService.log(log, true, executor);
}
executor = executorService.process(executor);
if (!executor.getNexts().isEmpty()) {
executor.withExecution(
executorService.onNexts(executor.getExecution(), executor.getNexts()),
"onNexts"
);
}
// worker task
if (!executor.getWorkerTasks().isEmpty()) {
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
executor
.getWorkerTasks()
.forEach(throwConsumer(workerTask -> {
try {
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
} else {
if (workerTask.getTask().isSendToWorkerTask()) {
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey())))
.orElse(null);
if (workerTask.getTask() instanceof WorkingDirectory) {
// WorkingDirectory is a flowable so it will be moved to RUNNING a few lines under
workerJobQueue.emit(workerGroupKey, workerTask);
} else {
TaskRun taskRun = workerTask.getTaskRun().withState(State.Type.SUBMITTED);
workerJobQueue.emit(workerGroupKey, workerTask.withTaskRun(taskRun));
workerTaskResults.add(new WorkerTaskResult(taskRun));
}
}
/// flowable attempt state transition to running
if (workerTask.getTask().isFlowable()) {
List<TaskRunAttempt> attempts = Optional.ofNullable(workerTask.getTaskRun().getAttempts())
.map(ArrayList::new)
.orElseGet(ArrayList::new);
attempts.add(
TaskRunAttempt.builder()
.state(new State().withState(State.Type.RUNNING))
.build()
);
TaskRun updatedTaskRun = workerTask.getTaskRun()
.withAttempts(attempts)
.withState(State.Type.RUNNING);
workerTaskResults.add(new WorkerTaskResult(updatedTaskRun));
}
}
} catch (Exception e) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", workerTask.getTask().getId(), e.getMessage(), e);
}
}));
try {
executorService.addWorkerTaskResults(executor, workerTaskResults);
} catch (InternalException e) {
log.error("Unable to add a worker task result to the execution", e);
}
}
// subflow execution results
if (!executor.getSubflowExecutionResults().isEmpty()) {
executor.getSubflowExecutionResults()
.forEach(throwConsumer(subflowExecutionResult -> subflowExecutionResultQueue.emit(subflowExecutionResult)));
}
// schedulerDelay
if (!executor.getExecutionDelays().isEmpty()) {
executor.getExecutionDelays()
.forEach(executionDelay -> executionDelayStateStore.save(executionDelay));
}
// subflow executions
if (!executor.getSubflowExecutions().isEmpty()) {
executor.getSubflowExecutions().forEach(throwConsumer(subflowExecution -> {
Execution subExecution = subflowExecution.getExecution();
String msg = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
log.info(msg);
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun(), subflowExecution.getExecution().getKind()).toBuilder()
.level(Level.INFO)
.message(msg)
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
.thread(Thread.currentThread().getName())
.build()
);
executionQueue.emit(subflowExecution.getExecution());
}));
}
return executor;
} catch (QueueException e) {
try {
Execution failedExecution = fail(execution, e);
this.executionQueue.emit(failedExecution);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", execution.getId(), ex);
}
Span.current().recordException(e).setStatus(StatusCode.ERROR);
return null;
}
}
));
if (result != null) {
this.toExecution(result);
}
Optional<ExecutorContext> maybeExecutor = executionEventMessageHandler.handle(message);
maybeExecutor.ifPresent(this::toExecution);
}
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
@@ -550,30 +344,8 @@ public class DefaultExecutor implements Executor {
return;
}
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
ExecutorContext executor = executionStateStore.lock(message.getTaskRun().getExecutionId(), execution -> {
ExecutorContext current = new ExecutorContext(execution);
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
try {
// process worker task result
executorService.addWorkerTaskResult(current, () -> findFlow(execution), message);
// join worker result
return current;
} catch (InternalException e) {
return handleFailedExecutionFromExecutor(current, e);
}
}
return null;
});
if (executor != null) {
this.toExecution(executor);
}
Optional<ExecutorContext> maybeExecutor = workerTaskResultMessageHandler.handle(message);
maybeExecutor.ifPresent(this::toExecution);
}
private void killQueue(Either<ExecutionKilled, DeserializationException> either) {
@@ -599,54 +371,12 @@ public class DefaultExecutor implements Executor {
return;
}
metricRegistry
.counter(MetricRegistry.METRIC_EXECUTOR_KILLED_COUNT, MetricRegistry.METRIC_EXECUTOR_KILLED_COUNT_DESCRIPTION, metricRegistry.tags(killedExecution))
.increment();
Optional<ExecutorContext> maybeExecutor = executionKilledExecutionMessageHandler.handle(killedExecution);
if (log.isDebugEnabled()) {
executorService.log(log, true, killedExecution);
}
// Immediately fire the event in EXECUTED state to notify the Workers to kill
// any remaining tasks for that executing regardless of if the execution exist or not.
// Note, that this event will be a noop if all tasks for that execution are already killed or completed.
try {
killQueue.emit(ExecutionKilledExecution
.builder()
.executionId(killedExecution.getExecutionId())
.isOnKillCascade(false)
.state(ExecutionKilled.State.EXECUTED)
.tenantId(killedExecution.getTenantId())
.build()
);
} catch (QueueException e) {
log.error("Unable to kill the execution {}", killedExecution.getExecutionId(), e);
}
ExecutorContext executor = killingOrAfterKillState(killedExecution.getExecutionId(), Optional.ofNullable(killedExecution.getExecutionState()));
// Check whether kill event should be propagated to downstream executions.
// By default, always propagate the ExecutionKill to sub-flows (for backward compatibility).
Boolean isOnKillCascade = Optional.ofNullable(killedExecution.getIsOnKillCascade()).orElse(true);
if (isOnKillCascade) {
executionService
.killSubflowExecutions(event.getTenantId(), killedExecution.getExecutionId())
.doOnNext(executionKilled -> {
try {
killQueue.emit(executionKilled);
} catch (QueueException e) {
log.error("Unable to kill the execution {}", executionKilled.getExecutionId(), e);
}
})
.blockLast();
}
if (executor != null) {
// Transmit the new execution state. Note that the execution
// will eventually transition to KILLED state before sub-flow executions are actually killed.
// This behavior is acceptable due to the fire-and-forget nature of the killing event.
this.toExecution(executor, true);
}
// Transmit the new execution state. Note that the execution
// will eventually transition to KILLED state before sub-flow executions are actually killed.
// This behavior is acceptable due to the fire-and-forget nature of the killing event.
maybeExecutor.ifPresent(executor -> this.toExecution(executor, true));
}
private void subflowExecutionResultQueue(Either<SubflowExecutionResult, DeserializationException> either) {
@@ -665,80 +395,8 @@ public class DefaultExecutor implements Executor {
return;
}
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
ExecutorContext executor = executionStateStore.lock(message.getParentTaskRun().getExecutionId(), execution -> {
ExecutorContext current = new ExecutorContext(execution);
if (execution.hasTaskRunJoinable(message.getParentTaskRun())) { // TODO if we remove this check, we can avoid adding 'iteration' on the 'isSame()' method
try {
FlowWithSource flow = findFlow(execution);
Task task = flow.findTaskByTaskId(message.getParentTaskRun().getTaskId());
TaskRun taskRun;
// iterative tasks
if (task instanceof ForEachItem.ForEachItemExecutable forEachItem) {
// For iterative tasks, we need to get the taskRun from the execution,
// move it to the state of the child flow, and merge the outputs.
// This is important to avoid races such as RUNNING that arrives after the first SUCCESS/FAILED.
RunContext runContext = runContextFactory.of(flow, task, current.getExecution(), message.getParentTaskRun());
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId());
if (taskRun.getState().getCurrent() != message.getState()) {
taskRun = taskRun.withState(message.getState());
}
Map<String, Object> outputs = MapUtils.deepMerge(taskRun.getOutputs(), message.getParentTaskRun().getOutputs());
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
taskRun = taskRun.withOutputs(variables);
taskRun = ExecutableUtils.manageIterations(
runContext.storage(),
taskRun,
current.getExecution(),
forEachItem.getTransmitFailed(),
forEachItem.isAllowFailure(),
forEachItem.isAllowWarning()
);
} else {
taskRun = message.getParentTaskRun();
}
Execution newExecution = current.getExecution().withTaskRun(taskRun);
// If the worker task result is killed, we must check if it has a parents to also kill them if not already done.
// Running flowable tasks that have child tasks running in the worker will be killed thanks to that.
if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
newExecution = executionService.killParentTaskruns(taskRun, newExecution);
}
current = current.withExecution(newExecution, "joinSubflowExecutionResult");
// send metrics on parent taskRun terminated
if (taskRun.getState().isTerminated()) {
metricRegistry
.counter(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION, metricRegistry.tags(message))
.increment();
metricRegistry
.timer(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION, metricRegistry.tags(message))
.record(taskRun.getState().getDuration());
log.trace("TaskRun terminated: {}", taskRun);
}
// join worker result
return current;
} catch (InternalException e) {
return handleFailedExecutionFromExecutor(current, e);
}
}
return null;
});
if (executor != null) {
this.toExecution(executor);
}
Optional<ExecutorContext> maybeExecutor = subflowExecutionResultMessageHandler.handle(message);
maybeExecutor.ifPresent(this::toExecution);
}
private void subflowExecutionEndQueue(Either<SubflowExecutionEnd, DeserializationException> either) {
@@ -757,44 +415,7 @@ public class DefaultExecutor implements Executor {
return;
}
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
executionStateStore.lock(message.getParentExecutionId(), execution -> {
if (execution == null) {
throw new IllegalStateException("Execution state don't exist for " + message.getParentExecutionId() + ", receive " + message);
}
FlowWithSource flow = findFlow(execution);
try {
ExecutableTask<?> executableTask = (ExecutableTask<?>) flow.findTaskByTaskId(message.getTaskId());
if (!executableTask.waitForExecution()) {
return null;
}
TaskRun taskRun = execution.findTaskRunByTaskRunId(message.getTaskRunId()).withState(message.getState()).withOutputs(message.getOutputs());
FlowInterface childFlow = flowMetaStore.findByExecution(message.getChildExecution()).orElseThrow();
RunContext runContext = runContextFactory.of(
childFlow,
(Task) executableTask,
message.getChildExecution(),
taskRun
);
SubflowExecutionResult subflowExecutionResult = ExecutableUtils.subflowExecutionResultFromChildExecution(runContext, childFlow, message.getChildExecution(), executableTask, taskRun);
if (subflowExecutionResult != null) {
try {
this.subflowExecutionResultQueue.emit(subflowExecutionResult);
} catch (QueueException ex) {
log.error("Unable to emit the subflow execution result", ex);
}
}
} catch (InternalException e) {
log.error("Unable to process the subflow execution end", e);
}
return null;
});
subflowExecutionEndMessageHandler.handle(message);
}
private void multipleConditionEventQueue(Either<MultipleConditionEvent, DeserializationException> either) {
@@ -805,14 +426,7 @@ public class DefaultExecutor implements Executor {
MultipleConditionEvent multipleConditionEvent = either.getLeft();
flowTriggerService.computeExecutionsFromFlowTriggers(multipleConditionEvent.execution(), List.of(multipleConditionEvent.flow()), Optional.of(multipleConditionStorage))
.forEach(exec -> {
try {
executionQueue.emit(exec);
} catch (QueueException e) {
log.error("Unable to emit the execution {}", exec.getId(), e);
}
});
multipleConditionEventMessageHandler.handle(multipleConditionEvent);
}
private void clusterEventQueue(Either<ClusterEvent, DeserializationException> either) {
@@ -842,7 +456,7 @@ public class DefaultExecutor implements Executor {
}
executionDelayStateStore.processExpired(Instant.now(), executionDelay -> {
ExecutorContext result = executionStateStore.lock(executionDelay.getExecutionId(), execution -> {
Optional<ExecutorContext> maybeExecutor = executionStateStore.lock(executionDelay.getExecutionId(), execution -> {
ExecutorContext executor = new ExecutorContext(execution);
metricRegistry
@@ -871,7 +485,7 @@ public class DefaultExecutor implements Executor {
}
// Handle failed task retries
else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESTART_FAILED_TASK)) {
FlowWithSource flow = findFlow(execution);
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
Execution newAttempt = executionService.retryTask(
execution,
flow,
@@ -890,15 +504,13 @@ public class DefaultExecutor implements Executor {
executor = executor.withExecution(newExecution, "continueLoop");
}
} catch (Exception e) {
executor = handleFailedExecutionFromExecutor(executor, e);
executor = executorService.handleFailedExecutionFromExecutor(executor, e);
}
return executor;
});
if (result != null) {
this.toExecution(result);
}
maybeExecutor.ifPresent(this::toExecution);
});
}
@@ -908,8 +520,8 @@ public class DefaultExecutor implements Executor {
}
slaMonitorStateStore.processExpired(Instant.now(), slaMonitor -> {
ExecutorContext result = executionStateStore.lock(slaMonitor.getExecutionId(), execution -> {
FlowWithSource flow = findFlow(execution);
Optional<ExecutorContext> maybeExecutor = executionStateStore.lock(slaMonitor.getExecutionId(), execution -> {
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();
if (sla.isEmpty()) {
// this can happen in case the flow has been updated and the SLA removed
@@ -940,15 +552,13 @@ public class DefaultExecutor implements Executor {
.increment();
}
} catch (Exception e) {
executor = handleFailedExecutionFromExecutor(executor, e);
executor = executorService.handleFailedExecutionFromExecutor(executor, e);
}
return executor;
});
if (result != null) {
this.toExecution(result);
}
maybeExecutor.ifPresent(this::toExecution);
});
}
@@ -972,31 +582,6 @@ public class DefaultExecutor implements Executor {
this.setState(ServiceState.RUNNING);
}
private Execution fail(Execution message, Exception e) {
var failedExecution = message.failedExecutionFromExecutor(e);
try {
logQueue.emitAsync(failedExecution.getLogs());
} catch (QueueException ex) {
// fail silently
}
return failedExecution.getExecution().getState().isFailed() ? failedExecution.getExecution() : failedExecution.getExecution().withState(State.Type.FAILED);
}
private ExecutorContext killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
return executionStateStore.lock(executionId, execution -> {
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
// remove it from the queued store if it was queued so it would not be restarted
if (execution.getState().isQueued()) {
executionQueuedStateStore.remove(execution);
}
Execution killing = executionService.kill(execution, flow, afterKillState);
return new ExecutorContext(execution)
.withExecution(killing, "joinKillingExecution");
});
}
private void toExecution(ExecutorContext executor) {
toExecution(executor, false);
}
@@ -1006,7 +591,7 @@ public class DefaultExecutor implements Executor {
boolean shouldSend = false;
if (executor.getException() != null) {
executor = handleFailedExecutionFromExecutor(executor, executor.getException());
executor = executorService.handleFailedExecutionFromExecutor(executor, executor.getException());
shouldSend = true;
} else if (executor.isExecutionUpdated()) {
shouldSend = true;
@@ -1037,7 +622,8 @@ public class DefaultExecutor implements Executor {
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
// or from a worker task in an afterExecution block, in this case we need to load the flow
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
executor = executor.withFlow(findFlow(executor.getExecution()));
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(executor.getExecution()).orElseThrow();
executor = executor.withFlow(flow);
}
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
@@ -1156,7 +742,7 @@ public class DefaultExecutor implements Executor {
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.map(f -> f.getFlow())
.distinct() // as computeExecutionsFromFlowTriggers is based on flow, we must map FlowWithFlowTrigger to a flow and distinct to avoid multiple execution for the same flow
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f), Optional.empty()).stream())
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggerConditions(execution, f).stream())
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
// send multiple conditions to the multiple condition queue for later processing
@@ -1167,23 +753,6 @@ public class DefaultExecutor implements Executor {
.forEach(throwConsumer(multipleCondition -> multipleConditionEventQueue.emit(multipleCondition)));
}
private FlowWithSource findFlow(Execution execution) {
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
return pluginDefaultService.injectDefaults(flow, execution);
}
private ExecutorContext handleFailedExecutionFromExecutor(ExecutorContext executor, Exception e) {
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
try {
logQueue.emitAsync(failedExecutionWithLog.getLogs());
} catch (QueueException ex) {
// fail silently
}
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
}
@Override
@PreDestroy
public void close() {

View File

@@ -2,6 +2,7 @@ package io.kestra.executor;
import io.kestra.core.models.executions.Execution;
import java.util.Optional;
import java.util.function.Function;
/**
@@ -11,5 +12,5 @@ public interface ExecutionStateStore {
/**
* Lock an execution for processing using the provided function.
*/
ExecutorContext lock(String executionId, Function<Execution, ExecutorContext> function);
Optional<ExecutorContext> lock(String executionId, Function<Execution, ExecutorContext> function);
}

View File

@@ -0,0 +1,19 @@
package io.kestra.executor;
import java.util.Optional;
/**
* Executor queue message handler that may return an {@link ExecutorContext} for updating the current execution.
*
* @see MessageHandler
*
* @param <T> the message type, this message should be tied to execution processing.
*/
public interface ExecutorMessageHandler<T> {
/**
* Handle a message then return an {@link ExecutorContext} if the current execution must be updated.
*
* @implNote implementors usually start by locking the current execution ,then process the message to avoid any concurrency issue.
*/
Optional<ExecutorContext> handle(T message);
}

View File

@@ -58,9 +58,6 @@ public class ExecutorService {
@Inject
private MetricRegistry metricRegistry;
@Inject
private ConditionService conditionService;
@Inject
private LogService logService;
@@ -98,7 +95,7 @@ public class ExecutorService {
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
protected FlowMetaStoreInterface flowExecutorInterface() {
private FlowMetaStoreInterface flowExecutorInterface() {
// bean is injected late, so we need to wait
if (this.flowExecutorInterface == null) {
this.flowExecutorInterface = applicationContext.getBean(FlowMetaStoreInterface.class);
@@ -237,6 +234,18 @@ public class ExecutorService {
return newExecution;
}
public ExecutorContext handleFailedExecutionFromExecutor(ExecutorContext executor, Exception e) {
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
try {
logQueue.emitAsync(failedExecutionWithLog.getLogs());
} catch (QueueException ex) {
// fail silently
}
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
}
private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution execution, TaskRun parentTaskRun) throws InternalException {
Task parent = flow.findTaskByTaskId(parentTaskRun.getTaskId());

View File

@@ -50,16 +50,147 @@ public class FlowTriggerService {
.map(io.kestra.plugin.core.trigger.Flow.class::cast);
}
public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<? extends Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
/**
* This method computes executions to trigger from flow triggers from a given execution.
* It only computes those depending on standard (non-multiple / non-preconditions) conditions, so it must be used
* in conjunction with {@link #computeExecutionsFromFlowTriggerPreconditions(Execution, Flow, MultipleConditionStorageInterface)}.
*/
public List<Execution> computeExecutionsFromFlowTriggerConditions(Execution execution, Flow flow) {
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
.stream()
// we must filter on no multiple conditions and no preconditions to avoid evaluating two times triggers that have standard conditions and multiple conditions
.filter(it -> it.getTrigger().getPreconditions() == null && ListUtils.emptyOnNull(it.getTrigger().getConditions()).stream().noneMatch(MultipleCondition.class::isInstance))
.toList();
// short-circuit empty triggers to evaluate
if (flowWithFlowTriggers.isEmpty()) {
return Collections.emptyList();
}
// compute all executions to create from flow triggers without taken into account multiple conditions
return flowWithFlowTriggers.stream()
.map(f -> f.getTrigger().evaluate(
Optional.empty(),
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
}
/**
* This method computes executions to trigger from flow triggers from a given execution.
* It only computes those depending on multiple conditions and preconditions, so it must be used
* in conjunction with {@link #computeExecutionsFromFlowTriggerConditions(Execution, Flow)}.
*/
public List<Execution> computeExecutionsFromFlowTriggerPreconditions(Execution execution, Flow flow, MultipleConditionStorageInterface multipleConditionStorage) {
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
.stream()
// we must filter on multiple conditions or preconditions to avoid evaluating two times triggers that only have standard conditions
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getPreconditions() != null || ListUtils.emptyOnNull(flowWithFlowTrigger.getTrigger().getConditions()).stream().anyMatch(MultipleCondition.class::isInstance))
.toList();
// short-circuit empty triggers to evaluate
if (flowWithFlowTriggers.isEmpty()) {
return Collections.emptyList();
}
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = flowWithFlowTriggers.stream()
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
)
// avoid evaluating expired windows (for ex for daily time window or deadline)
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
.toList();
// evaluate multiple conditions
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// persist results
multipleConditionStorage.save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = flowWithFlowTriggers.stream()
// will evaluate conditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage
)
)
// will evaluate preconditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger().getPreconditions(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage
)
)
.map(f -> f.getTrigger().evaluate(
Optional.of(multipleConditionStorage),
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.entrySet().stream()
.map(e -> Map.entry(
e.getKey().getMultipleCondition(),
e.getValue()
))
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
)
.map(Map.Entry::getValue),
multipleConditionStorage.expired(execution.getTenantId()).stream()
).forEach(multipleConditionStorage::delete);
return executions;
}
private List<FlowWithFlowTrigger> computeFlowTriggers(Execution execution, Flow flow) {
if (
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// filter out Test Executions
.filter(flow -> execution.getKind() == null)
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
!flowService.removeUnwanted(flow, execution) ||
// filter out Test Executions
execution.getKind() != null ||
// ensure flow & triggers are enabled
flow.isDisabled() || flow instanceof FlowWithException ||
flow.getTriggers() == null || flow.getTriggers().isEmpty()) {
return Collections.emptyList();
}
return flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))
// filter on the execution state the flow listen to
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getStates().contains(execution.getState().getCurrent()))
// validate flow triggers conditions excluding multiple conditions
@@ -74,96 +205,14 @@ public class FlowTriggerService {
execution
)
)).toList();
}
// short-circuit empty triggers to evaluate
if (validTriggersBeforeMultipleConditionEval.isEmpty()) {
return Collections.emptyList();
}
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = null;
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger -> Optional.ofNullable(flowWithFlowTrigger.getTrigger().getPreconditions()).stream()
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
)
// avoid evaluating expired windows (for ex for daily time window or deadline)
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
.toList();
// evaluate multiple conditions
multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// persist results
multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
}
// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = validTriggersBeforeMultipleConditionEval.stream()
// will evaluate conditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
)
// will evaluate preconditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger().getPreconditions(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
)
.map(f -> f.getTrigger().evaluate(
multipleConditionStorage,
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
if (multipleConditionStorage.isPresent()) {
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.entrySet().stream()
.map(e -> Map.entry(
e.getKey().getMultipleCondition(),
e.getValue()
))
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
)
.map(Map.Entry::getValue),
multipleConditionStorage.get().expired(execution.getTenantId()).stream()
).forEach(multipleConditionStorage.get()::delete);
}
return executions;
private Stream<MultipleCondition> flowTriggerMultipleConditions(FlowWithFlowTrigger flowWithFlowTrigger) {
Stream<MultipleCondition> legacyMultipleConditions = ListUtils.emptyOnNull(flowWithFlowTrigger.getTrigger().getConditions()).stream()
.filter(MultipleCondition.class::isInstance)
.map(MultipleCondition.class::cast);
Stream<io.kestra.plugin.core.trigger.Flow.Preconditions> preconditions = Optional.ofNullable(flowWithFlowTrigger.getTrigger().getPreconditions()).stream();
return Stream.concat(legacyMultipleConditions, preconditions);
}
@AllArgsConstructor

View File

@@ -0,0 +1,15 @@
package io.kestra.executor;
/**
* Executor queue message handler.
*
* @see ExecutorMessageHandler
*
* @param <T> the message type, this message should not be tied to execution processing.
*/
public interface MessageHandler<T> {
/**
* Handle a message.
*/
void handle(T message);
}

View File

@@ -0,0 +1,278 @@
package io.kestra.executor.handler;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.sla.ExecutionMonitoringSLA;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.flows.sla.SLAMonitor;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.trace.Tracer;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.TruthUtils;
import io.kestra.executor.*;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.event.Level;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton
@Slf4j
public class ExecutionEventMessageHandler implements ExecutorMessageHandler<ExecutionEvent> {
@Inject
private ExecutionStateStore executionStateStore;
@Inject
private ExecutionQueuedStateStore executionQueuedStateStore;
@Inject
private ExecutionDelayStateStore executionDelayStateStore;
@Inject
private SLAMonitorStateStore slaMonitorStateStore;
@Inject
private ConcurrencyLimitStateStore concurrencyLimitStateStore;
@Inject
private ExecutorService executorService;
@Inject
private WorkerGroupService workerGroupService;
@Inject
private FlowMetaStoreInterface flowMetaStore;
@Inject
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
private QueueInterface<WorkerJob> workerJobQueue;
@Inject
@Named(QueueFactoryInterface.SUBFLOWEXECUTIONRESULT_NAMED)
private QueueInterface<SubflowExecutionResult> subflowExecutionResultQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
private final Tracer tracer;
@Inject
public ExecutionEventMessageHandler(TracerFactory tracerFactory) {
this.tracer = tracerFactory.getTracer(DefaultExecutor.class, "EXECUTOR");
}
@Override
public Optional<ExecutorContext> handle(ExecutionEvent message) {
return executionStateStore.lock(message.executionId(), execution -> tracer.inCurrentContext(
execution,
FlowId.uidWithoutRevision(execution),
() -> {
try {
final FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
ExecutorContext executor = new ExecutorContext(execution, flow);
// schedule it for later if needed
if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
ExecutionDelay executionDelay = ExecutionDelay.builder()
.executionId(executor.getExecution().getId())
.date(execution.getScheduleDate())
.state(State.Type.RUNNING)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
executionDelayStateStore.save(executionDelay);
return executor;
}
// create an SLA monitor if needed
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty(flow.getSla())) {
List<SLAMonitor> monitors = flow.getSla().stream()
.filter(ExecutionMonitoringSLA.class::isInstance)
.map(ExecutionMonitoringSLA.class::cast)
.map(sla -> SLAMonitor.builder()
.executionId(execution.getId())
.slaId(((SLA) sla).getId())
.deadline(execution.getState().getStartDate().plus(sla.getDuration()))
.build()
)
.toList();
monitors.forEach(monitor -> slaMonitorStateStore.save(monitor));
}
// handle concurrency limit, we need to use a different queue to be sure that execution running
// are processed sequentially so inside a queue with no parallelism
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null) {
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(executor.getFlow().getTenantId())
.namespace(executor.getFlow().getNamespace())
.flowId(executor.getFlow().getId())
.execution(executor.getExecution())
.concurrencyState(ExecutionRunning.ConcurrencyState.CREATED)
.build();
ExecutionRunning processed = concurrencyLimitStateStore.countThenProcess(flow, (txContext, concurrencyLimit) -> {
ExecutionRunning computed = executorService.processExecutionRunning(flow, concurrencyLimit.getRunning(), executionRunning.withExecution(execution)); // be sure that the execution running contains the latest value of the execution
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
return Pair.of(computed, concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1));
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStateStore.save(txContext, ExecutionQueued.fromExecutionRunning(computed));
}
return Pair.of(computed, concurrencyLimit);
});
// if the execution is queued or terminated due to concurrency limit, we stop here
if (processed.getExecution().getState().isTerminated() || processed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
return executor.withExecution(processed.getExecution(), "handleConcurrencyLimit");
}
}
// handle execution changed SLA
executor = executorService.handleExecutionChangedSLA(executor);
// process the execution
if (log.isDebugEnabled()) {
executorService.log(log, true, executor);
}
executor = executorService.process(executor);
if (!executor.getNexts().isEmpty()) {
executor.withExecution(
executorService.onNexts(executor.getExecution(), executor.getNexts()),
"onNexts"
);
}
// worker task
if (!executor.getWorkerTasks().isEmpty()) {
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
executor
.getWorkerTasks()
.forEach(throwConsumer(workerTask -> {
try {
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
} else {
if (workerTask.getTask().isSendToWorkerTask()) {
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey())))
.orElse(null);
if (workerTask.getTask() instanceof WorkingDirectory) {
// WorkingDirectory is a flowable so it will be moved to RUNNING a few lines under
workerJobQueue.emit(workerGroupKey, workerTask);
} else {
TaskRun taskRun = workerTask.getTaskRun().withState(State.Type.SUBMITTED);
workerJobQueue.emit(workerGroupKey, workerTask.withTaskRun(taskRun));
workerTaskResults.add(new WorkerTaskResult(taskRun));
}
}
/// flowable attempt state transition to running
if (workerTask.getTask().isFlowable()) {
List<TaskRunAttempt> attempts = Optional.ofNullable(workerTask.getTaskRun().getAttempts())
.map(ArrayList::new)
.orElseGet(ArrayList::new);
attempts.add(
TaskRunAttempt.builder()
.state(new State().withState(State.Type.RUNNING))
.build()
);
TaskRun updatedTaskRun = workerTask.getTaskRun()
.withAttempts(attempts)
.withState(State.Type.RUNNING);
workerTaskResults.add(new WorkerTaskResult(updatedTaskRun));
}
}
} catch (Exception e) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", workerTask.getTask().getId(), e.getMessage(), e);
}
}));
try {
executorService.addWorkerTaskResults(executor, workerTaskResults);
} catch (InternalException e) {
log.error("Unable to add a worker task result to the execution", e);
}
}
// subflow execution results
if (!executor.getSubflowExecutionResults().isEmpty()) {
executor.getSubflowExecutionResults()
.forEach(throwConsumer(subflowExecutionResult -> subflowExecutionResultQueue.emit(subflowExecutionResult)));
}
// schedulerDelay
if (!executor.getExecutionDelays().isEmpty()) {
executor.getExecutionDelays()
.forEach(executionDelay -> executionDelayStateStore.save(executionDelay));
}
// subflow executions
if (!executor.getSubflowExecutions().isEmpty()) {
executor.getSubflowExecutions().forEach(throwConsumer(subflowExecution -> {
Execution subExecution = subflowExecution.getExecution();
String msg = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
log.info(msg);
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun(), subflowExecution.getExecution().getKind()).toBuilder()
.level(Level.INFO)
.message(msg)
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
.thread(Thread.currentThread().getName())
.build()
);
executionQueue.emit(subflowExecution.getExecution());
}));
}
return executor;
} catch (QueueException e) {
try {
Execution failedExecution = fail(execution, e);
this.executionQueue.emit(failedExecution);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", execution.getId(), ex);
}
Span.current().recordException(e).setStatus(StatusCode.ERROR);
return null;
}
}
));
}
private Execution fail(Execution message, Exception e) {
var failedExecution = message.failedExecutionFromExecutor(e);
try {
logQueue.emitAsync(failedExecution.getLogs());
} catch (QueueException ex) {
// fail silently
}
return failedExecution.getExecution().getState().isFailed() ? failedExecution.getExecution() : failedExecution.getExecution().withState(State.Type.FAILED);
}
}

View File

@@ -0,0 +1,110 @@
package io.kestra.executor.handler;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueuedStateStore;
import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.executor.ExecutionStateStore;
import io.kestra.executor.ExecutorContext;
import io.kestra.executor.ExecutorService;
import io.kestra.executor.ExecutorMessageHandler;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
@Singleton
@Slf4j
public class ExecutionKilledExecutionMessageHandler implements ExecutorMessageHandler<ExecutionKilledExecution> {
@Inject
private ExecutorService executorService;
@Inject
private ExecutionService executionService;
@Inject
private ExecutionStateStore executionStateStore;
@Inject
private ExecutionQueuedStateStore executionQueuedStateStore;
@Inject
private MetricRegistry metricRegistry;
@Inject
private FlowMetaStoreInterface flowMetaStore;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
private QueueInterface<ExecutionKilled> killQueue;
@Override
public Optional<ExecutorContext> handle(ExecutionKilledExecution message) {
metricRegistry
.counter(MetricRegistry.METRIC_EXECUTOR_KILLED_COUNT, MetricRegistry.METRIC_EXECUTOR_KILLED_COUNT_DESCRIPTION, metricRegistry.tags(message))
.increment();
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
// Immediately fire the event in EXECUTED state to notify the Workers to kill
// any remaining tasks for that executing regardless of if the execution exist or not.
// Note, that this event will be a noop if all tasks for that execution are already killed or completed.
try {
killQueue.emit(ExecutionKilledExecution
.builder()
.executionId(message.getExecutionId())
.isOnKillCascade(false)
.state(ExecutionKilled.State.EXECUTED)
.tenantId(message.getTenantId())
.build()
);
} catch (QueueException e) {
log.error("Unable to kill the execution {}", message.getExecutionId(), e);
}
Optional<ExecutorContext> maybeExecutor = killingOrAfterKillState(message.getExecutionId(), Optional.ofNullable(message.getExecutionState()));
// Check whether kill event should be propagated to downstream executions.
// By default, always propagate the ExecutionKill to sub-flows (for backward compatibility).
Boolean isOnKillCascade = Optional.ofNullable(message.getIsOnKillCascade()).orElse(true);
if (isOnKillCascade) {
executionService
.killSubflowExecutions(message.getTenantId(), message.getExecutionId())
.doOnNext(executionKilled -> {
try {
killQueue.emit(executionKilled);
} catch (QueueException e) {
log.error("Unable to kill the execution {}", executionKilled.getExecutionId(), e);
}
})
.blockLast();
}
return maybeExecutor;
}
private Optional<ExecutorContext> killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
return executionStateStore.lock(executionId, execution -> {
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
// remove it from the queued store if it was queued so it would not be restarted
if (execution.getState().isQueued()) {
executionQueuedStateStore.remove(execution);
}
Execution killing = executionService.kill(execution, flow, afterKillState);
return new ExecutorContext(execution)
.withExecution(killing, "joinKillingExecution");
});
}
}

View File

@@ -0,0 +1,50 @@
package io.kestra.executor.handler;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionEvent;
import io.kestra.core.runners.ExecutionEventType;
import io.kestra.executor.ExecutionStateStore;
import io.kestra.executor.ExecutorContext;
import io.kestra.executor.ExecutorMessageHandler;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
@Singleton
@Slf4j
public class ExecutionMessageHandler implements ExecutorMessageHandler<Execution> {
@Inject
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
private QueueInterface<ExecutionEvent> executionEventQueue;
@Inject
private ExecutionStateStore executionStateStore;
@Override
public Optional<ExecutorContext> handle(Execution message) {
try {
executionEventQueue.emit(new ExecutionEvent(message, ExecutionEventType.CREATED));
return Optional.empty();
} catch (QueueException e) {
// If we cannot send the execution event, we fail the execution
return executionStateStore.lock(message.getId(), execution -> {
try {
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.TERMINATED);
this.executionEventQueue.emit(event);
return new ExecutorContext(failed);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", execution.getId(), ex);
}
return null;
});
}
}
}

View File

@@ -0,0 +1,40 @@
package io.kestra.executor.handler;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.MultipleConditionEvent;
import io.kestra.executor.FlowTriggerService;
import io.kestra.executor.MessageHandler;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class MultipleConditionEventMessageHandler implements MessageHandler<MultipleConditionEvent> {
@Inject
private FlowTriggerService flowTriggerService;
@Inject
private MultipleConditionStorageInterface multipleConditionStorage;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Override
public void handle(MultipleConditionEvent message) {
flowTriggerService.computeExecutionsFromFlowTriggerPreconditions(message.execution(), message.flow(), multipleConditionStorage)
.forEach(exec -> {
try {
executionQueue.emit(exec);
} catch (QueueException e) {
log.error("Unable to emit the execution {}", exec.getId(), e);
}
});
}
}

View File

@@ -0,0 +1,79 @@
package io.kestra.executor.handler;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.executor.*;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class SubflowExecutionEndMessageHandler implements MessageHandler<SubflowExecutionEnd> {
@Inject
private ExecutorService executorService;
@Inject
private ExecutionStateStore executionStateStore;
@Inject
private FlowMetaStoreInterface flowMetaStore;
@Inject
private RunContextFactory runContextFactory;
@Inject
@Named(QueueFactoryInterface.SUBFLOWEXECUTIONRESULT_NAMED)
private QueueInterface<SubflowExecutionResult> subflowExecutionResultQueue;
@Override
public void handle(SubflowExecutionEnd message) {
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
executionStateStore.lock(message.getParentExecutionId(), execution -> {
if (execution == null) {
throw new IllegalStateException("Execution state don't exist for " + message.getParentExecutionId() + ", receive " + message);
}
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
try {
ExecutableTask<?> executableTask = (ExecutableTask<?>) flow.findTaskByTaskId(message.getTaskId());
if (!executableTask.waitForExecution()) {
return null;
}
TaskRun taskRun = execution.findTaskRunByTaskRunId(message.getTaskRunId()).withState(message.getState()).withOutputs(message.getOutputs());
FlowInterface childFlow = flowMetaStore.findByExecution(message.getChildExecution()).orElseThrow();
RunContext runContext = runContextFactory.of(
childFlow,
(Task) executableTask,
message.getChildExecution(),
taskRun
);
SubflowExecutionResult subflowExecutionResult = ExecutableUtils.subflowExecutionResultFromChildExecution(runContext, childFlow, message.getChildExecution(), executableTask, taskRun);
if (subflowExecutionResult != null) {
try {
this.subflowExecutionResultQueue.emit(subflowExecutionResult);
} catch (QueueException ex) {
log.error("Unable to emit the subflow execution result", ex);
}
}
} catch (InternalException e) {
log.error("Unable to process the subflow execution end", e);
}
return null;
});
}
}

View File

@@ -0,0 +1,121 @@
package io.kestra.executor.handler;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.*;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.VariablesService;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.utils.MapUtils;
import io.kestra.executor.ExecutionStateStore;
import io.kestra.executor.ExecutorContext;
import io.kestra.executor.ExecutorService;
import io.kestra.executor.ExecutorMessageHandler;
import io.kestra.plugin.core.flow.ForEachItem;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Optional;
@Singleton
@Slf4j
public class SubflowExecutionResultMessageHandler implements ExecutorMessageHandler<SubflowExecutionResult> {
@Inject
private ExecutorService executorService;
@Inject
private RunContextFactory runContextFactory;
@Inject
private MetricRegistry metricRegistry;
@Inject
private VariablesService variablesService;
@Inject
private ExecutionService executionService;
@Inject
private FlowMetaStoreInterface flowMetaStore;
@Inject
private ExecutionStateStore executionStateStore;
@Override
public Optional<ExecutorContext> handle(SubflowExecutionResult message) {
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
return executionStateStore.lock(message.getParentTaskRun().getExecutionId(), execution -> {
ExecutorContext current = new ExecutorContext(execution);
if (execution.hasTaskRunJoinable(message.getParentTaskRun())) { // TODO if we remove this check, we can avoid adding 'iteration' on the 'isSame()' method
try {
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
Task task = flow.findTaskByTaskId(message.getParentTaskRun().getTaskId());
TaskRun taskRun;
// iterative tasks
if (task instanceof ForEachItem.ForEachItemExecutable forEachItem) {
// For iterative tasks, we need to get the taskRun from the execution,
// move it to the state of the child flow, and merge the outputs.
// This is important to avoid races such as RUNNING that arrives after the first SUCCESS/FAILED.
RunContext runContext = runContextFactory.of(flow, task, current.getExecution(), message.getParentTaskRun());
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId());
if (taskRun.getState().getCurrent() != message.getState()) {
taskRun = taskRun.withState(message.getState());
}
Map<String, Object> outputs = MapUtils.deepMerge(taskRun.getOutputs(), message.getParentTaskRun().getOutputs());
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
taskRun = taskRun.withOutputs(variables);
taskRun = ExecutableUtils.manageIterations(
runContext.storage(),
taskRun,
current.getExecution(),
forEachItem.getTransmitFailed(),
forEachItem.isAllowFailure(),
forEachItem.isAllowWarning()
);
} else {
taskRun = message.getParentTaskRun();
}
Execution newExecution = current.getExecution().withTaskRun(taskRun);
// If the worker task result is killed, we must check if it has a parents to also kill them if not already done.
// Running flowable tasks that have child tasks running in the worker will be killed thanks to that.
if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
newExecution = executionService.killParentTaskruns(taskRun, newExecution);
}
current = current.withExecution(newExecution, "joinSubflowExecutionResult");
// send metrics on parent taskRun terminated
if (taskRun.getState().isTerminated()) {
metricRegistry
.counter(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION, metricRegistry.tags(message))
.increment();
metricRegistry
.timer(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION, metricRegistry.tags(message))
.record(taskRun.getState().getDuration());
log.trace("TaskRun terminated: {}", taskRun);
}
// join worker result
return current;
} catch (InternalException e) {
return executorService.handleFailedExecutionFromExecutor(current, e);
}
}
return null;
});
}
}

View File

@@ -0,0 +1,51 @@
package io.kestra.executor.handler;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.executor.ExecutionStateStore;
import io.kestra.executor.ExecutorContext;
import io.kestra.executor.ExecutorService;
import io.kestra.executor.ExecutorMessageHandler;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
@Singleton
@Slf4j
public class WorkerTaskResultMessageHandler implements ExecutorMessageHandler<WorkerTaskResult> {
@Inject
private ExecutionStateStore executionStateStore;
@Inject
private ExecutorService executorService;
@Inject
private FlowMetaStoreInterface flowMetaStore;
@Override
public Optional<ExecutorContext> handle(WorkerTaskResult message) {
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
return executionStateStore.lock(message.getTaskRun().getExecutionId(), execution -> {
ExecutorContext current = new ExecutorContext(execution);
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
try {
// process worker task result
executorService.addWorkerTaskResult(current, () -> flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow(), message);
// join worker result
return current;
} catch (InternalException e) {
return executorService.handleFailedExecutionFromExecutor(current, e);
}
}
return null;
});
}
}

View File

@@ -25,8 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class FlowTriggerServiceTest {
public static final List<Label> EMPTY_LABELS = List.of();
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
private static final List<Label> EMPTY_LABELS = List.of();
@Inject
private TestRunContextFactory runContextFactory;
@@ -56,14 +55,27 @@ class FlowTriggerServiceTest {
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
flowWithFlowTrigger
);
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
assertThat(resultingExecutionsToRun.get(0).getFlowId()).isEqualTo(flowWithFlowTrigger.getId());
assertThat(resultingExecutionsToRun.getFirst().getFlowId()).isEqualTo(flowWithFlowTrigger.getId());
}
@Test
void computeExecutionsFromFlowTriggers_none() {
var simpleFlow = aSimpleFlow();
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecution,
simpleFlow
);
assertThat(resultingExecutionsToRun).isEmpty();
}
@Test
@@ -81,10 +93,9 @@ class FlowTriggerServiceTest {
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
flowWithFlowTrigger
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
@@ -109,10 +120,9 @@ class FlowTriggerServiceTest {
.kind(ExecutionKind.TEST)
.build();
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
simpleFlowExecutionComingFromATest,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
flowWithFlowTrigger
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);

View File

@@ -7,8 +7,6 @@ dependencies {
implementation project(":jdbc")
implementation project(":executor")
implementation project(":scheduler")
implementation project(":queue")
implementation project(":queue-jdbc")
implementation("io.micronaut.sql:micronaut-jooq")
runtimeOnly("com.h2database:h2")
@@ -18,5 +16,4 @@ dependencies {
testImplementation project(':executor').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':tests')
testImplementation project(':queue').sourceSets.test.output
}

View File

@@ -1,13 +0,0 @@
CREATE TABLE IF NOT EXISTS queue (
"offset" BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
"type" INT NOT NULL,
"routing_key" VARCHAR(50),
"key" VARCHAR(250) NOT NULL,
"value" TEXT NOT NULL,
"created" TIMESTAMP NOT NULL,
UNIQUE("type", "routing_key", "offset")
);
CREATE INDEX IF NOT EXISTS queue_type__offset ON queue ("type", "offset");
CREATE INDEX IF NOT EXISTS queue_created ON queue ("created");

View File

@@ -1,19 +0,0 @@
package io.kestra.queue.h2;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.queue.AbstractBroadcastQueueTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
@KestraTest(environments = {"test", "queue"})
class H2BroadcastQueueTest extends AbstractBroadcastQueueTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,23 +0,0 @@
package io.kestra.queue.h2;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.queues.QueueException;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.queue.AbstractDispatchQueueTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@KestraTest(environments = {"test", "queue"})
class H2DispatchQueueTest extends AbstractDispatchQueueTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,19 +0,0 @@
package io.kestra.queue.h2;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.queue.AbstractKeyedDispatchQueueTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
@KestraTest(environments = {"test", "queue"})
class H2KeyedDispatchQueueTest extends AbstractKeyedDispatchQueueTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,4 +0,0 @@
kestra:
server-type: WEBSERVER
queue:
type: jdbc

View File

@@ -7,8 +7,6 @@ dependencies {
implementation project(":jdbc")
implementation project(":executor")
implementation project(":scheduler")
implementation project(":queue")
implementation project(":queue-jdbc")
implementation("io.micronaut.sql:micronaut-jooq")
runtimeOnly("com.mysql:mysql-connector-j")
@@ -20,6 +18,5 @@ dependencies {
testImplementation project(':scheduler').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':tests')
testImplementation project(':queue').sourceSets.test.output
testImplementation("io.micronaut.validation:micronaut-validation") // MysqlServiceLivenessCoordinatorTest fail to init without that
}

View File

@@ -1,11 +0,0 @@
CREATE TABLE IF NOT EXISTS queue (
`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
`type` INT NOT NULL,
`routing_key` CHAR(50),
`key` VARCHAR(250) NOT NULL,
`value` JSON NOT NULL,
`created` TIMESTAMP NOT NULL,
UNIQUE(`type`, `routing_key`, `offset`),
INDEX `ix_type__offset` (`type`, `offset`),
INDEX `ix_created` (`created`)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@@ -1,19 +0,0 @@
package io.kestra.queue.mysql;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.queue.AbstractBroadcastQueueTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
@KestraTest(environments = {"test", "queue"})
class MysqlBroadcastQueueTest extends AbstractBroadcastQueueTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,19 +0,0 @@
package io.kestra.queue.mysql;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.queue.AbstractDispatchQueueTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
@KestraTest(environments = {"test", "queue"})
class MysqlDispatchQueueTest extends AbstractDispatchQueueTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,19 +0,0 @@
package io.kestra.queue.mysql;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.queue.AbstractKeyedDispatchQueueTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
@KestraTest(environments = {"test", "queue"})
class MysqlKeyedDispatchQueueTest extends AbstractKeyedDispatchQueueTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,4 +0,0 @@
kestra:
server-type: WEBSERVER
queue:
type: jdbc

View File

@@ -7,8 +7,6 @@ dependencies {
implementation project(":jdbc")
implementation project(":executor")
implementation project(":scheduler")
implementation project(":queue")
implementation project(":queue-jdbc")
implementation("io.micronaut.sql:micronaut-jooq")
runtimeOnly("org.postgresql:postgresql")
@@ -19,6 +17,5 @@ dependencies {
testImplementation project(':executor').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':tests')
testImplementation project(':queue').sourceSets.test.output
testImplementation("io.micronaut.validation:micronaut-validation") // PostgresServiceLivenessCoordinatorTest fail to init without that
}

View File

@@ -1,13 +0,0 @@
CREATE TABLE IF NOT EXISTS queue (
"offset" SERIAL PRIMARY KEY,
type INT NOT NULL,
routing_key VARCHAR(250),
key VARCHAR(250) NOT NULL,
value JSONB NOT NULL,
created TIMESTAMPTZ NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS queues_type__key__offset ON queues (type, routing_key, "offset");
CREATE INDEX IF NOT EXISTS queues_type__offset ON queues (type, "offset");
CREATE INDEX IF NOT EXISTS queues_created ON queues ("created");

View File

@@ -1,19 +0,0 @@
package io.kestra.queue.postgres;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.queue.AbstractBroadcastQueueTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
@KestraTest(environments = {"test", "queue"})
class PostgresBroadcastQueueTest extends AbstractBroadcastQueueTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,19 +0,0 @@
package io.kestra.queue.postgres;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.queue.AbstractDispatchQueueTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
@KestraTest(environments = {"test", "queue"})
class PostgresDispatchQueueTest extends AbstractDispatchQueueTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,19 +0,0 @@
package io.kestra.queue.postgres;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.queue.AbstractKeyedDispatchQueueTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
@KestraTest(environments = {"test", "queue"})
class PostgresKeyedDispatchQueueTest extends AbstractKeyedDispatchQueueTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,4 +0,0 @@
kestra:
server-type: WEBSERVER
queue:
type: jdbc

View File

@@ -1,17 +0,0 @@
package io.kestra.jdbc;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
@NoArgsConstructor
@Getter
public class JdbcQueueItem {
Long offset;
Integer type;
String routingKey;
String key;
String value;
Instant created;
}

View File

@@ -30,12 +30,6 @@ public class JdbcTableConfigsFactory {
return new InstantiableJdbcTableConfig("queues", null, "queues");
}
@Bean
@Named("queue")
public InstantiableJdbcTableConfig queue() {
return new InstantiableJdbcTableConfig("queue", JdbcQueueItem.class, "queue");
}
@Bean
@Named("flows")
public InstantiableJdbcTableConfig flows() {

View File

@@ -983,7 +983,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
}
@Override
public ExecutorContext lock(String executionId, Function<Execution, ExecutorContext> function) {
public Optional<ExecutorContext> lock(String executionId, Function<Execution, ExecutorContext> function) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
@@ -1000,17 +1000,17 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
// not ready for now, skip and wait for a first state
if (execution.isEmpty()) {
return null;
return Optional.empty();
}
ExecutorContext executor = function.apply(execution.get());
if (executor != null) {
this.jdbcRepository.persist(executor.getExecution(), context, null);
return executor;
return Optional.of(executor);
}
return null;
return Optional.empty();
});
}

View File

@@ -1,7 +0,0 @@
dependencies {
implementation project(':core')
implementation project(':queue')
implementation project(':jdbc')
implementation("io.micronaut.sql:micronaut-jooq")
}

View File

@@ -1,50 +0,0 @@
package io.kestra.queue.jdbc;
import io.kestra.core.queues.QueueException;
import io.kestra.queue.AbstractQueue;
import io.kestra.queue.GenericEvent;
import io.kestra.queue.QueueUtils;
import io.kestra.queue.jdbc.client.JdbcQueueClient;
import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Slf4j
public abstract class AbstractJdbcQueue<T extends GenericEvent> extends AbstractQueue<T> {
protected final JdbcQueueClient jdbcQueueClient;
public AbstractJdbcQueue(Class<T> cls, QueueUtils queueUtils, JdbcQueueClient jdbcQueueClient) {
super(cls, queueUtils);
this.jdbcQueueClient = jdbcQueueClient;
}
public void internalEmit(@Nullable String key, T message) throws QueueException {
String serialize = this.queueUtils.serialize(this.cls, message);
jdbcQueueClient.publish(this.queueName(), key, message.key(), serialize);
}
public void internalEmit(@Nullable String key, List<T> messages) throws QueueException {
jdbcQueueClient.publish(
this.queueName(),
key,
messages
.stream()
.map(throwFunction(e -> {
String serialize = this.queueUtils.serialize(this.cls, e);
return new AbstractMap.SimpleEntry<>(
e.key(),
serialize
);
}))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
}
}

View File

@@ -1,39 +0,0 @@
package io.kestra.queue.jdbc;
import io.kestra.core.queues.QueueException;
import io.kestra.queue.BroadcastEvent;
import io.kestra.queue.BroadcastQueueInterface;
import io.kestra.queue.QueueSubscriber;
import io.kestra.queue.QueueUtils;
import io.kestra.queue.jdbc.client.JdbcBroadcastSubscriber;
import io.kestra.queue.jdbc.client.JdbcQueueClient;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class JdbcBroadcastQueue<T extends BroadcastEvent> extends AbstractJdbcQueue<T> implements BroadcastQueueInterface<T> {
public JdbcBroadcastQueue(Class<T> cls, QueueUtils queueUtils, JdbcQueueClient jdbcQueueClient) {
super(cls, queueUtils, jdbcQueueClient);
}
@Override
public void emit(T message) throws QueueException {
this.internalEmit(null, message);
}
@Override
public void emit(List<T> messages) throws QueueException {
this.internalEmit(null, messages);
}
@Override
public QueueSubscriber<T> subscriber() {
return new JdbcBroadcastSubscriber<>(
cls,
queueUtils,
jdbcQueueClient,
queueName()
);
}
}

View File

@@ -1,40 +0,0 @@
package io.kestra.queue.jdbc;
import io.kestra.core.queues.QueueException;
import io.kestra.queue.DispatchEvent;
import io.kestra.queue.DispatchQueueInterface;
import io.kestra.queue.QueueSubscriber;
import io.kestra.queue.QueueUtils;
import io.kestra.queue.jdbc.client.JdbcDispatchSubscriber;
import io.kestra.queue.jdbc.client.JdbcQueueClient;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class JdbcDispatchQueue<T extends DispatchEvent> extends AbstractJdbcQueue<T> implements DispatchQueueInterface<T> {
public JdbcDispatchQueue(Class<T> cls, QueueUtils queueUtils, JdbcQueueClient jdbcQueueClient) {
super(cls, queueUtils, jdbcQueueClient);
}
@Override
public void emit(T message) throws QueueException {
this.internalEmit(null, message);
}
@Override
public void emit(List<T> messages) throws QueueException {
this.internalEmit(null, messages);
}
@Override
public QueueSubscriber<T> subscriber() {
return new JdbcDispatchSubscriber<>(
cls,
queueUtils,
jdbcQueueClient,
queueName(),
null
);
}
}

View File

@@ -1,40 +0,0 @@
package io.kestra.queue.jdbc;
import io.kestra.core.queues.QueueException;
import io.kestra.queue.KeyedDispatchEvent;
import io.kestra.queue.KeyedDispatchQueueInterface;
import io.kestra.queue.QueueSubscriber;
import io.kestra.queue.QueueUtils;
import io.kestra.queue.jdbc.client.JdbcDispatchSubscriber;
import io.kestra.queue.jdbc.client.JdbcQueueClient;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class JdbcKeyedDispatchQueue<T extends KeyedDispatchEvent> extends AbstractJdbcQueue<T> implements KeyedDispatchQueueInterface<T> {
public JdbcKeyedDispatchQueue(Class<T> cls, QueueUtils queueUtils, JdbcQueueClient JdbcQueueClient) {
super(cls, queueUtils, JdbcQueueClient);
}
@Override
public void emit(String key, T message) throws QueueException {
this.internalEmit(key, message);
}
@Override
public void emit(String key, List<T> messages) throws QueueException {
this.internalEmit(key, messages);
}
@Override
public QueueSubscriber<T> subscriber(String key) throws QueueException {
return new JdbcDispatchSubscriber<>(
cls,
queueUtils,
jdbcQueueClient,
queueName(),
key
);
}
}

View File

@@ -1,12 +0,0 @@
package io.kestra.queue.jdbc;
import io.micronaut.context.annotation.Requires;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "kestra.queue.type", value = "jdbc")
public @interface JdbcQueueEnabled {
}

View File

@@ -1,54 +0,0 @@
package io.kestra.queue.jdbc;
import io.kestra.queue.*;
import io.kestra.queue.jdbc.client.JdbcQueueClient;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Context;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Context
@JdbcQueueEnabled
public class JdbcQueueFactory {
@Inject
private QueueUtils queueUtils;
@Inject
private ApplicationContext applicationContext;
@Inject
private JdbcQueueClient jdbcQueueClient;
@PostConstruct
void init() {
QueueFactory
.listAllEvent(this.getClass().getClassLoader(), DispatchEvent.class)
.forEach(event -> applicationContext.registerSingleton(
DispatchQueueInterface.class,
new JdbcDispatchQueue<>(event, queueUtils, jdbcQueueClient),
Qualifiers.byTypeArguments(event),
true
));
QueueFactory
.listAllEvent(this.getClass().getClassLoader(), KeyedDispatchEvent.class)
.forEach(event -> applicationContext.registerSingleton(
KeyedDispatchQueueInterface.class,
new JdbcKeyedDispatchQueue<>(event, queueUtils, jdbcQueueClient),
Qualifiers.byTypeArguments(event),
true
));
QueueFactory
.listAllEvent(this.getClass().getClassLoader(), BroadcastEvent.class)
.forEach(event -> applicationContext.registerSingleton(
BroadcastQueueInterface.class,
new JdbcBroadcastQueue<>(event, queueUtils, jdbcQueueClient),
Qualifiers.byTypeArguments(event),
true
));
}
}

View File

@@ -1,33 +0,0 @@
package io.kestra.queue.jdbc.client;
import io.kestra.queue.GenericEvent;
import io.kestra.queue.QueueUtils;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class JdbcBroadcastSubscriber<T extends GenericEvent> extends JdbcSubscriber<T> {
public AtomicReference<Long> maxOffset = null;
public JdbcBroadcastSubscriber(
Class<T> cls,
QueueUtils queueUtils,
JdbcQueueClient jdbcQueueClient,
String queueName
) {
super(cls, queueUtils, jdbcQueueClient, queueName);
}
@Override
protected Integer pool(JdbcQueueClient.MessageConsumer<String, Exception> messageConsumer) {
return this.jdbcQueueClient.subscribeBroadcast(this.queueName, maxOffset, messageConsumer);
}
@Override
protected void init() {
maxOffset = this.jdbcQueueClient.subscribeBroadcastMaxOffset(this.queueName);
this.markReady();
}
}

View File

@@ -1,32 +0,0 @@
package io.kestra.queue.jdbc.client;
import io.kestra.queue.GenericEvent;
import io.kestra.queue.QueueUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class JdbcDispatchSubscriber<T extends GenericEvent> extends JdbcSubscriber<T> {
private final String routingKey;
public JdbcDispatchSubscriber(
Class<T> cls,
QueueUtils queueUtils,
JdbcQueueClient jdbcQueueClient,
String queueName,
String routingKey
) {
super(cls, queueUtils, jdbcQueueClient, queueName);
this.routingKey = routingKey;
}
@Override
protected Integer pool(JdbcQueueClient.MessageConsumer<String, Exception> messageConsumer) {
return this.jdbcQueueClient.subscribeDispatch(this.queueName, this.routingKey, messageConsumer);
}
@Override
protected void init() {
this.markReady();
}
}

View File

@@ -1,198 +0,0 @@
package io.kestra.queue.jdbc.client;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.UnsupportedMessageException;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.kestra.jdbc.JdbcQueueItem;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.runner.JdbcQueueConfiguration;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.Getter;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.exception.DataException;
import org.jooq.impl.DSL;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.CRC32;
@Singleton
public class JdbcQueueClient {
private static final Map<String, Integer> QUEUE_NAME_CRC32 = new ConcurrentHashMap<>();
private static final List<Field<Object>> COLUMNS = List.of(
io.kestra.jdbc.repository.AbstractJdbcRepository.field("type"),
io.kestra.jdbc.repository.AbstractJdbcRepository.field("routing_key"),
io.kestra.jdbc.repository.AbstractJdbcRepository.field("key"),
io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"),
io.kestra.jdbc.repository.AbstractJdbcRepository.field("created")
);
@Inject
@Named("queue")
private AbstractJdbcRepository<JdbcQueueItem> jdbcRepository;
@Inject
private JooqDSLContextWrapper dslContextWrapper;
@Inject
@Getter
private JdbcQueueConfiguration configuration;
public static Integer queueNameToType(String value) {
return QUEUE_NAME_CRC32.computeIfAbsent(value, s -> {
CRC32 crc32 = new CRC32();
crc32.update(value.getBytes());
return (int) crc32.getValue();
});
}
public void publish(String queue, @Nullable String routingKey, String key, String value) throws QueueException {
this.publish(queue, routingKey, Map.of(key, value));
}
public void publish(String queue, @Nullable String routingKey, Map<String, String> values) throws QueueException {
try {
dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
InsertValuesStepN<Record> insert = context
.insertInto(jdbcRepository.getTable())
.columns(COLUMNS);
for (Map.Entry<String, String> entry : values.entrySet()) {
insert = insert.values(
queueNameToType(queue),
routingKey,
entry.getKey(),
JSONB.valueOf(entry.getValue()),
Instant.now()
);
}
insert.execute();
});
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
// Postgres refuses to store JSONB with the '\0000' codepoint as it has no textual representation.
// We try to detect that and fail with a specific exception so the Worker can recover from it.
if (e.getMessage() != null && e.getMessage().contains("ERROR: unsupported Unicode escape sequence")) {
throw new UnsupportedMessageException(e.getMessage(), e);
}
throw new QueueException("Unable to emit a message to the queue", e);
}
}
public Integer subscribeDispatch(String queue, @Nullable String routingKey, MessageConsumer<String, Exception> consumer) {
return dslContextWrapper.transactionResult(conf -> {
DSLContext context = DSL.using(conf);
SelectConditionStep<Record> select = context.select(DSL.asterisk())
.from(this.jdbcRepository.getTable())
.where(io.kestra.jdbc.repository.AbstractJdbcRepository.field("type").eq(queueNameToType(queue)));
if (routingKey != null) {
select = select.and(io.kestra.jdbc.repository.AbstractJdbcRepository.field("routing_key").eq(routingKey));
}
List<JdbcQueueItem> queueItems = select
.orderBy(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset").asc())
.limit(configuration.pollSize())
.forUpdate()
.skipLocked()
.fetchInto(JdbcQueueItem.class);
if (!queueItems.isEmpty()) {
List<Long> processedItems = queueItems
.stream()
.map(queueItem -> {
Exception exception = consumer.apply(queueItem.getValue());
return exception == null ? queueItem.getOffset() : null;
})
.filter(Objects::nonNull)
.toList();
if (!processedItems.isEmpty()) {
DeleteConditionStep<Record> delete = context.delete(this.jdbcRepository.getTable())
.where(io.kestra.jdbc.repository.AbstractJdbcRepository.field("type").eq(queueNameToType(queue)))
.and(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset", Long.class).in(processedItems));
if (routingKey != null) {
delete = delete.and(io.kestra.jdbc.repository.AbstractJdbcRepository.field("routing_key").eq(routingKey));
}
delete.execute();
}
}
return queueItems.size();
});
}
public AtomicReference<Long> subscribeBroadcastMaxOffset(String queue) {
AtomicReference<Long> maxOffset = new AtomicReference<>();
Long initialOffset = dslContextWrapper.transactionResult(conf -> {
DSLContext context = DSL.using(conf);
return context.select(DSL.max(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset")))
.from(this.jdbcRepository.getTable())
.where(io.kestra.jdbc.repository.AbstractJdbcRepository.field("type").eq(queueNameToType(queue)))
.fetchAny("max", Long.class);
});
if (initialOffset != null) {
maxOffset.set(initialOffset);
}
return maxOffset;
}
public Integer subscribeBroadcast(String queue, AtomicReference<Long> maxOffset, MessageConsumer<String, Exception> consumer) {
return dslContextWrapper.transactionResult(conf -> {
DSLContext context = DSL.using(conf);
SelectConditionStep<Record> select = context.select(DSL.asterisk())
.from(this.jdbcRepository.getTable())
.where(io.kestra.jdbc.repository.AbstractJdbcRepository.field("type").eq(queueNameToType(queue)));
if (maxOffset.get() != null) {
select = select.and(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset").gt(maxOffset.get()));
}
List<JdbcQueueItem> queueItems = select
.orderBy(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset").asc())
.limit(configuration.pollSize())
.forUpdate()
.skipLocked()
.fetchInto(JdbcQueueItem.class);
if (!queueItems.isEmpty()) {
queueItems
.forEach(queueItem -> {
consumer.apply(queueItem.getValue());
});
queueItems
.stream()
.map(JdbcQueueItem::getOffset)
.max(Long::compareTo)
.ifPresent(maxOffset::set);
}
return queueItems.size();
});
}
@FunctionalInterface
public interface MessageConsumer <T, E extends Exception> {
@Nullable E apply(T t);
}
}

View File

@@ -1,111 +0,0 @@
package io.kestra.queue.jdbc.client;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueException;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.Rethrow;
import io.kestra.jdbc.runner.JdbcQueueConfiguration;
import io.kestra.queue.AbstractSubscriber;
import io.kestra.queue.GenericEvent;
import io.kestra.queue.QueueSubscriber;
import io.kestra.queue.QueueUtils;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwRunnable;
@Slf4j
public abstract class JdbcSubscriber<T extends GenericEvent> extends AbstractSubscriber<T> {
protected final JdbcQueueClient jdbcQueueClient;
protected final String queueName;
public JdbcSubscriber(
Class<T> cls,
QueueUtils queueUtils,
JdbcQueueClient jdbcQueueClient,
String queueName
) {
super(cls, queueUtils);
this.jdbcQueueClient = jdbcQueueClient;
this.queueName = queueName;
}
protected abstract Integer pool(JdbcQueueClient.MessageConsumer<String, Exception> messageConsumer);
protected abstract void init();
public QueueSubscriber<T> subscribe(Rethrow.ConsumerChecked<Either<T, DeserializationException>, Exception> consumer) throws QueueException {
this.queueUtils.execute(throwRunnable(() -> {
List<JdbcQueueConfiguration.Step> steps = this.jdbcQueueClient.getConfiguration().computeSteps();
ZonedDateTime lastPoll = ZonedDateTime.now();
Duration sleepDuration;
this.init();
while (this.isRunning() || this.isPaused()) {
this.waitIfPaused();
Integer count = this.pool(message -> {
try {
Either<T, DeserializationException> event = this.queueUtils.deserialize(this.cls, message);
consumer.accept(event);
return null;
} catch (Exception e) {
log.warn(
"[{}] message {} failed and was resubmitted to active queue, error: {}",
cls.getSimpleName(),
message,
e.getMessage()
);
return e;
}
});
// define sleep time before next poll, could be immediate if we have messages to process
if (count > 0) {
lastPoll = ZonedDateTime.now();
sleepDuration = this.jdbcQueueClient.getConfiguration().minPollInterval();
if (this.jdbcQueueClient.getConfiguration().immediateRepoll()) {
sleepDuration = Duration.ofSeconds(0);
} else if (count.equals(jdbcQueueClient.getConfiguration().pollSize())) {
// Note: this provides better latency on high throughput: when Kestra is a top capacity,
// it will not do a sleep and immediately poll again.
// We can even have better latency at even higher latency by continuing for positive count,
// but at higher database cost.
// Current impl balance database cost with latency.
sleepDuration = Duration.ofSeconds(0);
}
} else {
ZonedDateTime finalLastPoll = lastPoll;
// get all poll steps which duration is less than the duration between last poll and now
List<JdbcQueueConfiguration.Step> selectedSteps = steps.stream()
.takeWhile(step -> finalLastPoll.plus(step.switchInterval()).compareTo(ZonedDateTime.now()) < 0)
.toList();
// then select the last one (longest) or minPoll if all are beyond while means we are under the first interval
sleepDuration = selectedSteps.isEmpty() ? jdbcQueueClient.getConfiguration().minPollInterval() : selectedSteps.getLast().pollInterval();
}
// keep thread running
try {
Thread.sleep(sleepDuration);
} catch (Exception e) {
throw new QueueException("[" + this.cls.getSimpleName() + "] failed sleep", e);
}
}
this.markEnd();
}));
Await.until(this::isRunning);
return this;
}
}

View File

@@ -1,7 +0,0 @@
configurations {
implementation.extendsFrom(micronaut)
}
dependencies {
implementation project(':core')
}

View File

@@ -1,32 +0,0 @@
package io.kestra.queue;
import com.google.common.base.CaseFormat;
import jakarta.annotation.Nullable;
public abstract class AbstractQueue<T extends GenericEvent> {
protected final Class<T> cls;
protected final QueueUtils queueUtils;
public AbstractQueue(Class<T> cls, QueueUtils queueUtils) {
this.cls = cls;
this.queueUtils = queueUtils;
}
protected String queueNameSeparator() {
return "__";
}
protected String queueName() {
return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, this.cls.getSimpleName());
}
protected String queueName(@Nullable String key) {
if (key == null) {
return this.queueName();
}
return this.queueName() +
this.queueNameSeparator() +
CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_UNDERSCORE, key);
}
}

View File

@@ -1,146 +0,0 @@
package io.kestra.queue;
import io.kestra.core.queues.QueueException;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public abstract class AbstractSubscriber<T extends GenericEvent> extends AbstractQueue<T> implements QueueSubscriber<T> {
private final CountDownLatch stopped = new CountDownLatch(1);
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition unpaused = pauseLock.newCondition();
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
public AbstractSubscriber(Class<T> cls, QueueUtils queueUtils) {
super(cls, queueUtils);
}
protected void waitIfPaused() throws QueueException {
// return immediately if not paused.
if (!this.state.get().equals(State.PAUSED)) {
return;
}
// lock and wait until resumed
pauseLock.lock();
try {
while (this.state.get().equals(State.PAUSED)) {
if (log.isDebugEnabled()) {
log.debug("[{}] paused, waiting to resume", this.cls.getSimpleName());
}
try {
unpaused.await(); // Wait until resume() signals
} catch (InterruptedException e) {
throw new QueueException("[" + this.cls.getSimpleName() + "] interrupted while paused", e);
}
if (log.isDebugEnabled()) {
log.debug("[{}] resumed", this.cls.getSimpleName());
}
}
} finally {
pauseLock.unlock();
}
}
protected boolean isRunning() {
return this.state.get() == State.RUNNING;
}
protected boolean isPaused() {
return this.state.get() == State.PAUSED;
}
private boolean changeState(State expected, State newState) {
if (log.isDebugEnabled()) {
log.debug("[{}] change state requested {} to {}", this.cls.getSimpleName(), expected, newState);
}
if (this.state.compareAndSet(expected, newState)) {
return true;
}
throw new IllegalStateException("[" + this.cls.getSimpleName() + "] illegal state change to " + newState + " from " + newState + ", current state is " + this.state.get());
}
protected void markReady() {
if (log.isDebugEnabled()) {
log.debug("[{}] Mark ready received", this.cls.getSimpleName());
}
this.changeState(State.STOPPED, State.RUNNING);
}
public void pause() {
if (log.isDebugEnabled()) {
log.debug("[{}] pause received", this.cls.getSimpleName());
}
this.changeState(State.RUNNING, State.PAUSED);
}
public void resume() {
if (log.isDebugEnabled()) {
log.debug("[{}] resume received", this.cls.getSimpleName());
}
pauseLock.lock();
try {
if (this.changeState(State.PAUSED, State.RUNNING)) {
unpaused.signalAll();
}
} finally {
pauseLock.unlock();
}
}
protected void markEnd() {
if (log.isDebugEnabled()) {
log.debug("[{}] mark end received", this.cls.getSimpleName());
}
if (this.isRunning()) {
this.changeState(State.RUNNING, State.STOPPED);
}
this.stopped.countDown();
}
public void close() {
if (log.isDebugEnabled()) {
log.debug("[{}] close received", this.cls.getSimpleName());
}
// in case it's paused and blocked
if (this.isPaused()) {
resume();
}
// already stopped
try {
this.changeState(State.RUNNING, State.STOPPED);
} catch (IllegalStateException ignored) {
return;
}
// wait for the queue to be stooped
try {
stopped.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("[{}}] interrupted while waiting to be stopped.", this.cls.getSimpleName());
}
}
public enum State {
RUNNING,
PAUSED,
STOPPED
}
}

View File

@@ -1,4 +0,0 @@
package io.kestra.queue;
public interface BroadcastEvent extends GenericEvent {
}

View File

@@ -1,13 +0,0 @@
package io.kestra.queue;
import io.kestra.core.queues.QueueException;
import java.util.List;
public interface BroadcastQueueInterface <T extends BroadcastEvent> extends GenericQueueInterface<T> {
void emit(T message) throws QueueException;
void emit(List<T> messages) throws QueueException;
QueueSubscriber<T> subscriber() throws QueueException;
}

View File

@@ -1,5 +0,0 @@
package io.kestra.queue;
public interface DispatchEvent extends GenericEvent {
}

View File

@@ -1,13 +0,0 @@
package io.kestra.queue;
import io.kestra.core.queues.QueueException;
import java.util.List;
public interface DispatchQueueInterface <T extends DispatchEvent> extends GenericQueueInterface<T> {
void emit(T message) throws QueueException;
void emit(List<T> messages) throws QueueException;
QueueSubscriber<T> subscriber() throws QueueException;
}

View File

@@ -1,5 +0,0 @@
package io.kestra.queue;
public interface GenericEvent {
String key();
}

View File

@@ -1,5 +0,0 @@
package io.kestra.queue;
public interface GenericQueueInterface<T extends GenericEvent> {
}

View File

@@ -1,4 +0,0 @@
package io.kestra.queue;
public interface KeyedDispatchEvent extends DispatchEvent {
}

View File

@@ -1,13 +0,0 @@
package io.kestra.queue;
import io.kestra.core.queues.QueueException;
import java.util.List;
public interface KeyedDispatchQueueInterface<T extends KeyedDispatchEvent> extends GenericQueueInterface<T> {
void emit(String key, T message) throws QueueException;
void emit(String key, List<T> messages) throws QueueException;
QueueSubscriber<T> subscriber(String key) throws QueueException;
}

View File

@@ -1,24 +0,0 @@
package io.kestra.queue;
import io.micronaut.context.annotation.ConfigurationProperties;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
@Getter
@ConfigurationProperties(value = "kestra.queue")
public class QueueConfiguration{
@NotNull
String type;
@Nullable
MessageProtection messageProtection;
@Getter
@ConfigurationProperties("message-protection")
public static class MessageProtection{
Boolean enabled = false;
Integer limit;
}
}

View File

@@ -1,55 +0,0 @@
package io.kestra.queue;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.context.annotation.Factory;
import io.micronaut.core.beans.BeanIntrospectionReference;
import io.micronaut.core.io.service.ServiceDefinition;
import io.micronaut.core.io.service.SoftServiceLoader;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@Factory
public class QueueFactory {
public final static String QUEUE_EXECUTOR = "queueExecutor";
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T extends GenericEvent> List<Class<T>> listAllEvent(ClassLoader classLoader, Class<T> eventClass) {
final SoftServiceLoader<BeanIntrospectionReference> definitions = SoftServiceLoader.load(
BeanIntrospectionReference.class,
classLoader
);
List<Class<T>> list = new ArrayList<>();
for (ServiceDefinition<BeanIntrospectionReference> definition : definitions) {
if (definition.isPresent()) {
final BeanIntrospectionReference ref = definition.load();
Class beanType = ref.getBeanType();
if (Modifier.isAbstract(beanType.getModifiers())) {
continue;
}
if (eventClass.isAssignableFrom(beanType)) {
list.add((Class<T>) beanType);
}
}
}
return list;
}
@Named(QUEUE_EXECUTOR)
@Singleton
@Inject
public ExecutorService executorsUtils(ExecutorsUtils executorsUtils, QueueConfiguration queueConfiguration) {
return executorsUtils.cachedThreadPool("queue-" + queueConfiguration.getType());
}
}

View File

@@ -1,32 +0,0 @@
package io.kestra.queue;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueException;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.Rethrow;
public interface QueueSubscriber<T extends GenericEvent> {
/**
* Start a subscription.
*
* @param consumer the consumer that will process messages
* @return self
* @throws QueueException if subscription fails
*/
QueueSubscriber<T> subscribe(Rethrow.ConsumerChecked<Either<T, DeserializationException>, Exception> consumer) throws QueueException;
/**
* Pauses this subscriber.
*/
void pause();
/**
* Resumes this subscriber if currently paused.
*/
void resume();
/**
* close this subscriber.
*/
void close();
}

View File

@@ -1,107 +0,0 @@
package io.kestra.queue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.MessageTooBigException;
import io.kestra.core.queues.QueueException;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.Either;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
@Slf4j
@Singleton
public class QueueUtils {
private static final ObjectMapper MAPPER = JacksonMapper.ofJson(false).copy();
@Inject
@Named(QueueFactory.QUEUE_EXECUTOR)
@Getter
ExecutorService executorService;
@Inject
protected QueueConfiguration queueConfiguration;
@Inject
private MetricRegistry metricRegistry;
public void execute(Runnable runnable) {
this.executorService.execute(runnable);
}
public <T extends GenericEvent> String serialize(Class<T> cls, T message) throws QueueException {
try {
String serialize = MAPPER.writeValueAsString(message);
if (log.isTraceEnabled()) {
log.trace("[{}] produced message: {}", cls.getSimpleName(), serialize);
}
int byteLength = serialize.getBytes(StandardCharsets.UTF_8).length;
if (queueConfiguration.getMessageProtection() != null && queueConfiguration.getMessageProtection().getEnabled() && byteLength >= queueConfiguration.getMessageProtection().getLimit()) {
metricRegistry
.counter(MetricRegistry.METRIC_QUEUE_BIG_MESSAGE_COUNT, MetricRegistry.METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION, MetricRegistry.TAG_CLASS_NAME, cls.getSimpleName()).increment();
// we let terminated execution messages to go through anyway
if (!(message instanceof Execution execution) || !execution.getState().isTerminated()) {
throw new MessageTooBigException("Message of size " + byteLength + " has exceeded the configured limit of " + queueConfiguration.getMessageProtection().getLimit());
}
}
return serialize;
} catch (JsonProcessingException e) {
throw new QueueException("Failed to produce '" + message.getClass() + "'", e);
} finally {
String[] tags = {MetricRegistry.TAG_QUEUE_TYPE, cls.getSimpleName()};
metricRegistry
.counter(MetricRegistry.METRIC_QUEUE_PRODUCE_COUNT, MetricRegistry.METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION, tags)
.increment();
}
}
public <T extends GenericEvent> Either<T, DeserializationException> deserialize(Class<T> cls, byte[] record) {
if (log.isTraceEnabled()) {
log.trace("[{}] received message: {}", cls.getSimpleName(), new String(record));
}
try {
return Either.left(MAPPER.readValue(record, cls));
} catch (IOException e) {
return Either.right(new DeserializationException(e, Arrays.toString(record)));
} finally {
String[] tags = {MetricRegistry.TAG_QUEUE_TYPE, cls.getSimpleName()};
metricRegistry
.counter(MetricRegistry.METRIC_QUEUE_RECEIVE_COUNT, MetricRegistry.METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION, tags)
.increment();
}
}
public <T extends GenericEvent> Either<T, DeserializationException> deserialize(Class<T> cls, String record) {
if (log.isTraceEnabled()) {
log.trace("[{}] received message: {}", cls.getSimpleName(), record);
}
try {
return Either.left(MAPPER.readValue(record, cls));
} catch (IOException e) {
return Either.right(new DeserializationException(e, record));
} finally {
String[] tags = {MetricRegistry.TAG_QUEUE_TYPE, cls.getSimpleName()};
metricRegistry
.counter(MetricRegistry.METRIC_QUEUE_RECEIVE_COUNT, MetricRegistry.METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION, tags)
.increment();
}
}
}

View File

@@ -1,148 +0,0 @@
package io.kestra.queue;
import io.kestra.core.queues.QueueException;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractBroadcastQueueTest extends AbstractQueueTest {
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
@Inject
private BroadcastQueueInterface<TestBroadcast> broadcastQueue;
@Test
void singleConsumer() throws QueueException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
Collection<Integer> list = Collections.synchronizedCollection(new ArrayList<>());
QueueSubscriber<TestBroadcast> subscriber = broadcastQueue
.subscriber()
.subscribe(e -> {
list.add(e.getLeft().id);
countDownLatch.countDown();
});
String prefix = this.keyPrefix();
for (int i = 1; i <= 3; i++) {
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), i));
}
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
assertThat(list).contains(1, 2, 3);
}
@Test
void closingConsumer() throws QueueException, InterruptedException {
singleConsumer();
singleConsumer();
}
@Test
void multipleConsumer() throws QueueException, InterruptedException {
int rand = ThreadLocalRandom.current().nextInt(10, 50);;
CountDownLatch countDownLatch = new CountDownLatch(3 * rand);
Collection<String> list = Collections.synchronizedCollection(new ArrayList<>());
List<QueueSubscriber<TestBroadcast>> subscribers = new ArrayList<>();
IntStream.range(0, rand)
.boxed()
.forEach(throwConsumer(i -> subscribers.add(broadcastQueue
.subscriber()
.subscribe(e -> {
list.add("c" + String.format("%03d", i) + "-i" + String.format("%03d", e.getLeft().id));
countDownLatch.countDown();
})
)));
String prefix = this.keyPrefix();
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 1));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 2));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 3));
// rebalancing can take some time, we multiply timeout by 5
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS * 5, TimeUnit.SECONDS);
subscribers.parallelStream().forEach(QueueSubscriber::close);
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
assertThat(list).hasSize(3 * rand);
assertThat(list).contains("c000-i001", "c000-i002", "c000-i003");
assertThat(list).contains("c" + String.format("%03d", (rand - 1)) +"-i001", "c" + String.format("%03d",(rand - 1)) +"-i002", "c" + String.format("%03d",(rand - 1)) +"-i003");
}
@Test
void pause() throws QueueException, InterruptedException {
CountDownLatch countDownLatchFirst = new CountDownLatch(1);
CountDownLatch countDownLatchSecond = new CountDownLatch(2);
CountDownLatch countDownLatchOthers = new CountDownLatch(2);
Collection<Pair<Instant, Integer>> list = Collections.synchronizedCollection(new ArrayList<>());
QueueSubscriber<TestBroadcast> subscriber = broadcastQueue
.subscriber()
.subscribe(e -> {
list.add(Pair.of(Instant.now(), e.getLeft().id));
if (e.getLeft().id == 1) {
countDownLatchFirst.countDown();
} else if (e.getLeft().id <= 3) {
countDownLatchSecond.countDown();
} else {
countDownLatchOthers.countDown();
}
});
String prefix = this.keyPrefix();
// first round
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 1));
boolean await1 = countDownLatchFirst.await(DEFAULT_TIMEOUT_SECONDS + 10, TimeUnit.SECONDS);
subscriber.pause();
assertThat(await1).isTrue();
// second round
Instant resumeTime = Instant.now();
subscriber.resume();
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 2));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 3));
boolean await2 = countDownLatchSecond.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.pause();
assertThat(await2).isTrue();
// last round
Instant resumeTime2 = Instant.now();
subscriber.resume();
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 4));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 5));
boolean await3 = countDownLatchOthers.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();
assertThat(await3).isEqualTo(true);
assertThat(list).hasSize(5);
assertThat(list.stream().filter(i -> i.getLeft().isBefore(resumeTime)).count()).isEqualTo(1);
assertThat(list.stream().filter(i -> i.getLeft().isAfter(resumeTime)).count()).isEqualTo(4);
assertThat(list.stream().filter(i -> i.getLeft().isAfter(resumeTime2)).count()).isEqualTo(2);
}
@Introspected
public record TestBroadcast(String key, Integer id) implements BroadcastEvent {}
}

View File

@@ -1,190 +0,0 @@
package io.kestra.queue;
import io.kestra.core.queues.QueueException;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractDispatchQueueTest extends AbstractQueueTest {
private static final int DEFAULT_TIMEOUT_SECONDS = 5;
@Inject
private DispatchQueueInterface<TestDispatch> dispatchQueue;
@Test
void singleConsumer() throws QueueException, InterruptedException, IOException {
CountDownLatch countDownLatch = new CountDownLatch(2);
Collection<Integer> list = Collections.synchronizedCollection(new ArrayList<>());
QueueSubscriber<TestDispatch> subscriber = dispatchQueue
.subscriber()
.subscribe(e -> {
list.add(e.getLeft().id);
countDownLatch.countDown();
});
String prefix = this.keyPrefix();
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 1));
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 2));
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
assertThat(list).containsExactlyInAnyOrder(1, 2);
}
@Test
void closingConsumer() throws QueueException, InterruptedException, IOException {
singleConsumer();
singleConsumer();
}
@Test
void multipleConsumer() throws QueueException, InterruptedException {
int rand = ThreadLocalRandom.current().nextInt(10, 50);;
CountDownLatch countDownLatch = new CountDownLatch(rand);
Collection<String> list = Collections.synchronizedCollection(new ArrayList<>());
List<QueueSubscriber<TestDispatch>> subscribers = new ArrayList<>();
IntStream.range(0, 3)
.boxed()
.forEach(throwConsumer(i -> subscribers.add(
dispatchQueue
.subscriber()
.subscribe(e -> {
list.add("c" + String.format("%03d", i) + "-i" + String.format("%03d", e.getLeft().id));
countDownLatch.countDown();
})
)));
String prefix = this.keyPrefix();
for (int i = 0; i < rand; i++) {
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), i));
}
// rebalancing can take some time, we multiply timeout by 5
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS * 5, TimeUnit.SECONDS);
subscribers.parallelStream().forEach(QueueSubscriber::close);
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
assertThat(list).hasSize(rand);
assertThat(list.stream().map(s -> s.substring(0, s.indexOf("-"))).toList()).contains("c000", "c001", "c002");
assertThat(list.stream().map(s -> s.substring(s.indexOf("-") + 1)).toList()).contains("i001", String.format("i%03d", rand - 1));
}
@Test
void errorProcessing() throws QueueException, InterruptedException {
// @TODO: failed on rabbitmq, the published message seems to be not durable
String prefix = this.keyPrefix();
dispatchQueue.emit(IntStream.range(1, 15)
.boxed()
.map(i -> new TestDispatch(prefix + "_" + IdUtils.create(), i))
.toList()
);
CountDownLatch countDownLatch = new CountDownLatch(15);
Collection<Integer> list = Collections.synchronizedCollection(new ArrayList<>());
var crashed = new AtomicBoolean(false);
QueueSubscriber<TestDispatch> subscriber = dispatchQueue
.subscriber()
.subscribe(e -> {
countDownLatch.countDown();
if (e.getLeft().id == 2 && crashed.compareAndSet(false, true)) {
throw new Exception("Boom");
}
list.add(e.getLeft().id);
});
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
assertThat(list).containsExactlyInAnyOrder(IntStream.range(1, 15).boxed().toArray(Integer[]::new));
}
@Test
void pause() throws QueueException, InterruptedException {
CountDownLatch countDownLatchFirst = new CountDownLatch(1);
CountDownLatch countDownLatchSecond = new CountDownLatch(2);
CountDownLatch countDownLatchOthers = new CountDownLatch(2);
Collection<Pair<Instant, Integer>> list = Collections.synchronizedCollection(new ArrayList<>());
QueueSubscriber<TestDispatch> subscriber = dispatchQueue
.subscriber()
.subscribe(e -> {
list.add(Pair.of(Instant.now(), e.getLeft().id));
if (e.getLeft().id == 1) {
countDownLatchFirst.countDown();
} else if (e.getLeft().id <= 3) {
countDownLatchSecond.countDown();
} else {
countDownLatchOthers.countDown();
}
});
// first round
String prefix = this.keyPrefix();
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 1));
boolean await1 = countDownLatchFirst.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.pause();
assertThat(await1).isTrue();
// second round
Instant resumeTime = Instant.now();
subscriber.resume();
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 2));
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 3));
boolean await2 = countDownLatchSecond.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.pause();
assertThat(await2).isTrue();
// last round
Instant resumeTime2 = Instant.now();
subscriber.resume();
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 4));
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 5));
boolean await3 = countDownLatchOthers.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();
assertThat(await3).isTrue();
assertThat(list).hasSize(5);
assertThat(list.stream().filter(i -> i.getLeft().isBefore(resumeTime)).count()).isEqualTo(1);
assertThat(list.stream().filter(i -> i.getLeft().isAfter(resumeTime)).count()).isEqualTo(4);
assertThat(list.stream().filter(i -> i.getLeft().isAfter(resumeTime2)).count()).isEqualTo(2);
}
@Introspected
public record TestDispatch(String key, Integer id) implements DispatchEvent {
}
}

View File

@@ -1,123 +0,0 @@
package io.kestra.queue;
import io.kestra.core.queues.QueueException;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractKeyedDispatchQueueTest extends AbstractQueueTest {
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
@Inject
private KeyedDispatchQueueInterface<TestKeyedDispatch> keyDispatchQueue;
@Test
void singleConsumer() throws QueueException, InterruptedException {
String groupKey = IdUtils.create();
CountDownLatch countDownLatch = new CountDownLatch(2);
Collection<Integer> list = Collections.synchronizedCollection(new ArrayList<>());
QueueSubscriber<TestKeyedDispatch> subscriber = keyDispatchQueue
.subscriber(groupKey)
.subscribe(e -> {
list.add(e.getLeft().id);
countDownLatch.countDown();
});
String prefix = this.keyPrefix();
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 1));
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 2));
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
assertThat(list).containsExactlyInAnyOrder(1, 2);
}
@Test
void multipleConsumer() throws QueueException, InterruptedException {
String groupKey = IdUtils.create();
int rand = ThreadLocalRandom.current().nextInt(10, 50);;
CountDownLatch countDownLatch = new CountDownLatch(rand);
Collection<String> list = Collections.synchronizedCollection(new ArrayList<>());
List<QueueSubscriber<TestKeyedDispatch>> subscribers = new ArrayList<>();
IntStream.range(0, 3)
.boxed()
.forEach(throwConsumer(i -> subscribers.add(keyDispatchQueue
.subscriber(groupKey)
.subscribe(e -> {
list.add("c" + String.format("%03d", i) + "-i" + String.format("%03d", e.getLeft().id));
countDownLatch.countDown();
})
)));
String prefix = this.keyPrefix();
for (int i = 0; i < rand; i++) {
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), i));
}
// rebalancing can take some time, we multiply timeout by 5
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS * 5, TimeUnit.SECONDS);
subscribers.parallelStream().forEach(QueueSubscriber::close);
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
assertThat(list).hasSize(rand);
assertThat(list.stream().map(s -> s.substring(0, s.indexOf("-"))).toList()).contains("c000", "c001", "c002");
assertThat(list.stream().map(s -> s.substring(s.indexOf("-") + 1)).toList()).contains("i001", String.format("i%03d", rand - 1));
}
@Test
void multipleGroup() throws InterruptedException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(6);
List<QueueSubscriber<TestKeyedDispatch>> subscribers = new ArrayList<>();
Map<Integer, Collection<Integer>> map = new HashMap<>();
IntStream.range(0, 3)
.boxed()
.forEach(throwConsumer(i -> {
map.put(i, Collections.synchronizedCollection(new ArrayList<>()));
subscribers.add(keyDispatchQueue
.subscriber("group-" + i)
.subscribe(e -> {
map.get(i).add(e.getLeft().id);
countDownLatch.countDown();
}));
}));
String prefix = this.keyPrefix();
for (int i = 0; i < 3; i++) {
keyDispatchQueue.emit("group-" + i, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 1));
keyDispatchQueue.emit("group-" + i, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 2));
}
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscribers.forEach(QueueSubscriber::close);
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
assertThat(map.entrySet().stream().flatMap(s -> s.getValue().stream()).toList()).hasSize(6);
for (int i = 0; i < 3; i++) {
assertThat(map.get(i)).containsExactlyInAnyOrder(1, 2);
}
}
@Introspected
public record TestKeyedDispatch(String key, Integer id) implements KeyedDispatchEvent {}
}

View File

@@ -1,9 +0,0 @@
package io.kestra.queue;
public class AbstractQueueTest {
protected String keyPrefix() {
StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
StackTraceElement e = stacktrace[2];
return e.getMethodName();
}
}

View File

@@ -1 +0,0 @@
allure.results.directory=build/allure-results

Some files were not shown because too many files have changed in this diff Show More