mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 20:00:14 -05:00
Compare commits
13 Commits
feat/execu
...
feat/new-q
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
297c4aec4b | ||
|
|
ac591956d6 | ||
|
|
f40d5f23cb | ||
|
|
d34c704457 | ||
|
|
489a5c843d | ||
|
|
2e18e2d7e6 | ||
|
|
a54b239e14 | ||
|
|
17f8495009 | ||
|
|
fb042a0e3f | ||
|
|
c52f446d99 | ||
|
|
1beef1d0a9 | ||
|
|
6cc970d991 | ||
|
|
94cdd9fafe |
@@ -31,6 +31,8 @@ dependencies {
|
||||
implementation project(":jdbc-mysql")
|
||||
implementation project(":jdbc-postgres")
|
||||
|
||||
implementation project(":queue")
|
||||
|
||||
implementation project(":storage-local")
|
||||
|
||||
// Kestra server components
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.cli.commands.sys;
|
||||
|
||||
import io.kestra.cli.commands.sys.database.DatabaseCommand;
|
||||
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
@@ -15,6 +16,7 @@ import picocli.CommandLine;
|
||||
ReindexCommand.class,
|
||||
DatabaseCommand.class,
|
||||
SubmitQueuedCommand.class,
|
||||
StateStoreCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import lombok.SneakyThrows;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "state-store",
|
||||
description = "Manage Kestra State Store",
|
||||
mixinStandardHelpOptions = true,
|
||||
subcommands = {
|
||||
StateStoreMigrateCommand.class,
|
||||
}
|
||||
)
|
||||
public class StateStoreCommand extends AbstractCommand {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
PicocliRunner.call(App.class, "sys", "state-store", "--help");
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.StateStore;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "migrate",
|
||||
description = "Migrate old state store files to use the new KV Store implementation.",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
public class StateStoreMigrateCommand extends AbstractCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
|
||||
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
|
||||
|
||||
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
|
||||
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
|
||||
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
|
||||
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
|
||||
try {
|
||||
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
|
||||
} catch (IOException e) {
|
||||
return Stream.empty();
|
||||
}
|
||||
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
|
||||
Flow flow = stateStoreFileUrisForAFlow.getKey();
|
||||
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
|
||||
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
|
||||
|
||||
String stateName = statesUriPart[0];
|
||||
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
|
||||
String stateSubName = statesUriPart[statesUriPart.length - 1];
|
||||
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
|
||||
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
|
||||
|
||||
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
|
||||
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
|
||||
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}));
|
||||
|
||||
stdOut("Successfully ran the state-store migration.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
|
||||
Map<String, String> flowVariables = new HashMap<>();
|
||||
flowVariables.put("tenantId", flow.getTenantId());
|
||||
flowVariables.put("id", flow.getId());
|
||||
flowVariables.put("namespace", flow.getNamespace());
|
||||
return runContextFactory.of(flow, Map.of("flow", flowVariables));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.cli.commands.sys.database.DatabaseCommand;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class StateStoreCommandTest {
|
||||
@Test
|
||||
void runWithNoParam() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra sys state-store");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package io.kestra.cli.commands.sys.statestore;
|
||||
|
||||
import io.kestra.core.exceptions.MigrationRequiredException;
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.storages.StateStore;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Hashing;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class StateStoreMigrateCommandTest {
|
||||
@Test
|
||||
void runMigration() throws IOException, ResourceExpiredException {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
|
||||
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
|
||||
Flow flow = Flow.builder()
|
||||
.tenantId("my-tenant")
|
||||
.id("a-flow")
|
||||
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
|
||||
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
|
||||
.build();
|
||||
flowRepository.create(GenericFlow.of(flow));
|
||||
|
||||
StorageInterface storage = ctx.getBean(StorageInterface.class);
|
||||
String tenantId = flow.getTenantId();
|
||||
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
|
||||
storage.put(
|
||||
tenantId,
|
||||
flow.getNamespace(),
|
||||
oldStateStoreUri,
|
||||
new ByteArrayInputStream("my-value".getBytes())
|
||||
);
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
|
||||
|
||||
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
|
||||
"tenantId", tenantId,
|
||||
"id", flow.getId(),
|
||||
"namespace", flow.getNamespace()
|
||||
)));
|
||||
StateStore stateStore = new StateStore(runContext, true);
|
||||
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
|
||||
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
|
||||
|
||||
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
|
||||
|
||||
assertThat(call).isZero();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -127,6 +127,8 @@ public class MetricRegistry {
|
||||
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
|
||||
public static final String METRIC_QUEUE_PRODUCE_COUNT = "queue.produce.count";
|
||||
public static final String METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION = "Total number of produced messages";
|
||||
public static final String METRIC_QUEUE_RECEIVE_COUNT = "queue.receive.count";
|
||||
public static final String METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION = "Total number of received messages";
|
||||
public static final String METRIC_QUEUE_RECEIVE_DURATION = "queue.receive.duration";
|
||||
public static final String METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION = "Queue duration to receive and consume a batch of messages";
|
||||
public static final String METRIC_QUEUE_POLL_SIZE = "queue.poll.size";
|
||||
|
||||
@@ -4,6 +4,8 @@ import java.util.Set;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.validations.FileInputValidation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
@@ -17,14 +19,17 @@ import java.util.List;
|
||||
@FileInputValidation
|
||||
public class FileInput extends Input<URI> {
|
||||
|
||||
public static final String DEFAULT_EXTENSION = ".upl";
|
||||
private static final String DEFAULT_EXTENSION = ".upl";
|
||||
|
||||
@Deprecated(since = "0.24", forRemoval = true)
|
||||
public String extension;
|
||||
|
||||
/**
|
||||
* List of allowed file extensions (e.g., [".csv", ".txt", ".pdf"]).
|
||||
* Each extension must start with a dot.
|
||||
*/
|
||||
private List<String> allowedFileExtensions;
|
||||
|
||||
|
||||
/**
|
||||
* Gets the file extension from the URI's path
|
||||
*/
|
||||
@@ -48,4 +53,15 @@ public class FileInput extends Input<URI> {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public static String findFileInputExtension(@NotNull final List<Input<?>> inputs, @NotNull final String fileName) {
|
||||
String res = inputs.stream()
|
||||
.filter(in -> in instanceof FileInput)
|
||||
.filter(in -> in.getId().equals(fileName))
|
||||
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
|
||||
.map(flowInput -> ((FileInput) flowInput).getExtension())
|
||||
.findFirst()
|
||||
.orElse(FileInput.DEFAULT_EXTENSION);
|
||||
return res.startsWith(".") ? res : "." + res;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,7 +54,12 @@ public class Property<T> {
|
||||
private String expression;
|
||||
private T value;
|
||||
|
||||
private Property(String expression) {
|
||||
/**
|
||||
* @deprecated use {@link #ofExpression(String)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
|
||||
public Property(String expression) {
|
||||
this.expression = expression;
|
||||
}
|
||||
|
||||
@@ -118,6 +123,14 @@ public class Property<T> {
|
||||
return p;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #ofValue(Object)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static <V> Property<V> of(V value) {
|
||||
return ofValue(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new Property object with a Pebble expression.<br>
|
||||
* <p>
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import jakarta.inject.Singleton;
|
||||
import java.util.Objects;
|
||||
import lombok.Setter;
|
||||
@@ -17,14 +15,12 @@ import java.util.Optional;
|
||||
@Singleton
|
||||
public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
|
||||
private final FlowRepositoryInterface flowRepository;
|
||||
private final PluginDefaultService pluginDefaultService;
|
||||
|
||||
@Setter
|
||||
private List<FlowWithSource> allFlows;
|
||||
|
||||
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
|
||||
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository) {
|
||||
this.flowRepository = flowRepository;
|
||||
this.pluginDefaultService = pluginDefaultService;
|
||||
flowListeners.listen(flows -> allFlows = flows);
|
||||
}
|
||||
|
||||
@@ -57,9 +53,4 @@ public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
|
||||
public Boolean isReady() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<FlowWithSource> findByExecutionThenInjectDefaults(Execution execution) {
|
||||
return findByExecution(execution).map(it -> pluginDefaultService.injectDefaults(it, execution));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,7 +142,7 @@ public class FlowInputOutput {
|
||||
runContext.logger().warn("Using a deprecated way to upload a FILE input. You must set the input 'id' as part name and set the name of the file using the regular 'filename' part attribute.");
|
||||
}
|
||||
String inputId = oldStyleInput ? fileUpload.getFilename() : fileUpload.getName();
|
||||
String fileName = oldStyleInput ? FileInput.DEFAULT_EXTENSION : fileUpload.getFilename();
|
||||
String fileName = oldStyleInput ? FileInput.findFileInputExtension(inputs, fileUpload.getFilename()) : fileUpload.getFilename();
|
||||
|
||||
if (!uploadFiles) {
|
||||
URI from = URI.create("kestra://" + StorageContext
|
||||
@@ -153,7 +153,7 @@ public class FlowInputOutput {
|
||||
sink.next(Map.entry(inputId, from.toString()));
|
||||
} else {
|
||||
try {
|
||||
final String fileExtension = FileInput.DEFAULT_EXTENSION;
|
||||
final String fileExtension = FileInput.findFileInputExtension(inputs, fileName);
|
||||
|
||||
String prefix = StringUtils.leftPad(fileName + "_", 3, "_");
|
||||
File tempFile = File.createTempFile(prefix, fileExtension);
|
||||
|
||||
@@ -49,6 +49,4 @@ public interface FlowMetaStoreInterface {
|
||||
Optional.of(execution.getFlowRevision())
|
||||
);
|
||||
}
|
||||
|
||||
Optional<FlowWithSource> findByExecutionThenInjectDefaults(Execution execution);
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.storages.StateStore;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.storages.kv.KVStore;
|
||||
import org.slf4j.Logger;
|
||||
@@ -169,6 +170,14 @@ public abstract class RunContext implements PropertyContext {
|
||||
*/
|
||||
public abstract KVStore namespaceKv(String namespace);
|
||||
|
||||
/**
|
||||
* @deprecated use #namespaceKv(String) instead
|
||||
*/
|
||||
@Deprecated(since = "1.1.0", forRemoval = true)
|
||||
public StateStore stateStore() {
|
||||
return new StateStore(this, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get access to local paths of the host machine.
|
||||
*/
|
||||
|
||||
114
core/src/main/java/io/kestra/core/storages/StateStore.java
Normal file
114
core/src/main/java/io/kestra/core/storages/StateStore.java
Normal file
@@ -0,0 +1,114 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import io.kestra.core.exceptions.MigrationRequiredException;
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.storages.kv.KVValue;
|
||||
import io.kestra.core.storages.kv.KVValueAndMetadata;
|
||||
import io.kestra.core.utils.Hashing;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* @deprecated use KVStore instead
|
||||
*/
|
||||
@Deprecated(since = "1.1.0", forRemoval = true)
|
||||
public record StateStore(RunContext runContext, boolean hashTaskRunValue) {
|
||||
|
||||
public InputStream getState(String stateName, @Nullable String stateSubName, String taskRunValue) throws IOException, ResourceExpiredException {
|
||||
return this.getState(true, stateName, stateSubName, taskRunValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the state for the given state name, sub-name and task run value.
|
||||
*
|
||||
* @param flowScoped if true, will scope it to flow, otherwise to namespace
|
||||
* @param stateName state name
|
||||
* @param stateSubName state sub-name (optional)
|
||||
* @param taskRunValue task run value
|
||||
* @return an InputStream of the state data
|
||||
*/
|
||||
public InputStream getState(boolean flowScoped, String stateName, @Nullable String stateSubName, String taskRunValue) throws IOException, ResourceExpiredException {
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
// We check if a file containing the state exists in the old state store
|
||||
URI oldStateStoreUri = this.oldStateStoreUri(flowInfo.namespace(), flowScoped, flowInfo.id(), stateName, taskRunValue, stateSubName);
|
||||
if (runContext.storage().isFileExist(oldStateStoreUri)) {
|
||||
throw new MigrationRequiredException("State Store", "sys state-store migrate");
|
||||
}
|
||||
|
||||
String key = this.statePrefix("_", flowScoped, flowInfo.id(), stateName + nameSuffix(stateSubName), taskRunValue);
|
||||
Optional<KVValue> kvStateValue = runContext.namespaceKv(flowInfo.namespace()).getValue(key);
|
||||
if (kvStateValue.isEmpty()) {
|
||||
throw new FileNotFoundException("State " + key + " not found");
|
||||
}
|
||||
Object value = kvStateValue.get().value();
|
||||
if (value instanceof String string) {
|
||||
return new ByteArrayInputStream(string.getBytes());
|
||||
} else {
|
||||
return new ByteArrayInputStream(((byte[]) Objects.requireNonNull(value)));
|
||||
}
|
||||
}
|
||||
|
||||
public String putState(String stateName, String stateSubName, String taskRunValue, byte[] value) throws IOException {
|
||||
return this.putState(true, stateName, stateSubName, taskRunValue, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the state for the given state name, sub-name and task run value.
|
||||
*
|
||||
* @param flowScoped if true, will scope it to flow, otherwise to namespace
|
||||
* @param stateName state name
|
||||
* @param stateSubName state sub-name (optional)
|
||||
* @param taskRunValue task run value
|
||||
* @param value the state value to store
|
||||
* @return the KV Store key at which the state is stored
|
||||
*/
|
||||
public String putState(boolean flowScoped, String stateName, String stateSubName, String taskRunValue, byte[] value) throws IOException {
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
String key = this.statePrefix("_", flowScoped, flowInfo.id(), stateName + nameSuffix(stateSubName), taskRunValue);
|
||||
runContext.namespaceKv(flowInfo.namespace()).put(key, new KVValueAndMetadata(null, value));
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
public boolean deleteState(String stateName, String stateSubName, String taskRunValue) throws IOException {
|
||||
return this.deleteState(true, stateName, stateSubName, taskRunValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the stateName for the given name, sub-name and task run value.
|
||||
* @param flowScoped if true, will scope it to flow, otherwise to namespace
|
||||
* @param stateName state name
|
||||
* @param stateSubName state sub-name (optional)
|
||||
* @param taskRunValue task run value
|
||||
* @return true if the state exists and was deleted, false otherwise
|
||||
*/
|
||||
public boolean deleteState(boolean flowScoped, String stateName, String stateSubName, String taskRunValue) throws IOException {
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
|
||||
return runContext.namespaceKv(flowInfo.namespace()).delete(
|
||||
this.statePrefix("_", flowScoped, flowInfo.id(), stateName + nameSuffix(stateSubName), taskRunValue)
|
||||
);
|
||||
}
|
||||
|
||||
private URI oldStateStoreUri(String namespace, boolean flowScoped, String flowId, String stateName, @Nullable String taskRunValue, String name) {
|
||||
return URI.create("kestra:/" + namespace.replace(".", "/") + "/" + this.statePrefix("/", flowScoped, flowId, stateName, taskRunValue) + (name == null ? "" : ("/" + name)));
|
||||
}
|
||||
|
||||
private String statePrefix(String separator, boolean flowScoped, String flowId, String stateName, @Nullable String taskRunValue) {
|
||||
String flowIdPrefix = (!flowScoped || flowId == null) ? "" : (Slugify.of(flowId) + separator);
|
||||
return flowIdPrefix + "states" + separator + stateName + (taskRunValue == null ? "" : (separator + (hashTaskRunValue ? Hashing.hashToString(taskRunValue) : taskRunValue)));
|
||||
}
|
||||
|
||||
private static String nameSuffix(String name) {
|
||||
return Optional.ofNullable(name).map(n -> "_" + n).orElse("");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
package io.kestra.plugin.core.state;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Deprecated(since = "1.1.0", forRemoval = true)
|
||||
public abstract class AbstractState extends Task {
|
||||
private static final TypeReference<Map<String, Object>> TYPE_REFERENCE = new TypeReference<>() {};
|
||||
public static final String TASKS_STATES = "tasks-states";
|
||||
|
||||
@Schema(
|
||||
title = "The name of the state file"
|
||||
)
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
protected Property<String> name = Property.ofValue("default");
|
||||
|
||||
@Schema(
|
||||
title = "Share state for the current namespace.",
|
||||
description = "By default, the state is isolated by namespace **and** flow, setting to `true` will share the state between the **same** namespace"
|
||||
)
|
||||
@Builder.Default
|
||||
private final Property<Boolean> namespace = Property.ofValue(false);
|
||||
|
||||
@Schema(
|
||||
title = "Isolate the state with `taskrun.value`.",
|
||||
description = "By default, the state will be isolated with `taskrun.value` (during iteration with each). Setting to `false` will use the same state for every run of the iteration."
|
||||
)
|
||||
@Builder.Default
|
||||
private final Property<Boolean> taskrunValue = Property.ofValue(true);
|
||||
|
||||
|
||||
protected Map<String, Object> get(RunContext runContext) throws IllegalVariableEvaluationException, IOException, ResourceExpiredException {
|
||||
return JacksonMapper.ofJson(false).readValue(runContext.stateStore().getState(
|
||||
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
|
||||
TASKS_STATES,
|
||||
runContext.render(this.name).as(String.class).orElse(null),
|
||||
taskRunValue(runContext)
|
||||
), TYPE_REFERENCE);
|
||||
}
|
||||
|
||||
protected Pair<String, Map<String, Object>> merge(RunContext runContext, Map<String, Object> map) throws IllegalVariableEvaluationException, IOException, ResourceExpiredException {
|
||||
Map<String, Object> current;
|
||||
|
||||
try {
|
||||
current = this.get(runContext);
|
||||
} catch (FileNotFoundException e) {
|
||||
current = Map.of();
|
||||
}
|
||||
|
||||
Map<String, Object> merge = MapUtils.deepMerge(current, runContext.render(map));
|
||||
|
||||
String key = runContext.stateStore().putState(
|
||||
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
|
||||
TASKS_STATES,
|
||||
runContext.render(this.name).as(String.class).orElse(null),
|
||||
taskRunValue(runContext),
|
||||
JacksonMapper.ofJson(false).writeValueAsBytes(merge)
|
||||
);
|
||||
|
||||
return Pair.of(key, merge);
|
||||
}
|
||||
|
||||
protected boolean delete(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
|
||||
return runContext.stateStore().deleteState(
|
||||
!runContext.render(this.namespace).as(Boolean.class).orElseThrow(),
|
||||
TASKS_STATES,
|
||||
runContext.render(this.name).as(String.class).orElse(null),
|
||||
taskRunValue(runContext)
|
||||
);
|
||||
}
|
||||
|
||||
private String taskRunValue(RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
return Boolean.TRUE.equals(runContext.render(this.taskrunValue).as(Boolean.class).orElseThrow()) ?
|
||||
runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null) : null;
|
||||
}
|
||||
}
|
||||
74
core/src/main/java/io/kestra/plugin/core/state/Delete.java
Normal file
74
core/src/main/java/io/kestra/plugin/core/state/Delete.java
Normal file
@@ -0,0 +1,74 @@
|
||||
package io.kestra.plugin.core.state;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Delete a state from the state store (Deprecated, use KV store instead)."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Delete the default state for the current flow.",
|
||||
code = {
|
||||
"id: delete_state",
|
||||
"type: io.kestra.plugin.core.state.Delete",
|
||||
},
|
||||
full = true
|
||||
),
|
||||
@Example(
|
||||
title = "Delete the `myState` state for the current flow.",
|
||||
code = {
|
||||
"id: delete_state",
|
||||
"type: io.kestra.plugin.core.state.Delete",
|
||||
"name: myState",
|
||||
},
|
||||
full = true
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.states.Delete"
|
||||
)
|
||||
@Deprecated(since = "1.1.0", forRemoval = true)
|
||||
public class Delete extends AbstractState implements RunnableTask<Delete.Output> {
|
||||
@Schema(
|
||||
title = "Raise an error if the state is not found."
|
||||
)
|
||||
@Builder.Default
|
||||
private final Property<Boolean> errorOnMissing = Property.ofValue(false);
|
||||
|
||||
@Override
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
|
||||
boolean delete = this.delete(runContext);
|
||||
|
||||
if (Boolean.TRUE.equals(runContext.render(errorOnMissing).as(Boolean.class).orElseThrow()) && !delete) {
|
||||
throw new FileNotFoundException("Unable to find the state file '" + runContext.render(this.name).as(String.class).orElseThrow() + "'");
|
||||
}
|
||||
|
||||
return Output.builder()
|
||||
.deleted(delete)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
@Schema(
|
||||
title = "Flag specifying whether the state file was deleted"
|
||||
)
|
||||
private final Boolean deleted;
|
||||
}
|
||||
}
|
||||
86
core/src/main/java/io/kestra/plugin/core/state/Get.java
Normal file
86
core/src/main/java/io/kestra/plugin/core/state/Get.java
Normal file
@@ -0,0 +1,86 @@
|
||||
package io.kestra.plugin.core.state;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.Map;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Get a state from the state store (Deprecated, use KV store instead)."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Get the default state file for the current flow.",
|
||||
code = {
|
||||
"id: get_state",
|
||||
"type: io.kestra.plugin.core.state.Get",
|
||||
},
|
||||
full = true
|
||||
),
|
||||
@Example(
|
||||
title = "Get the `myState` state for the current flow.",
|
||||
code = {
|
||||
"id: get_state",
|
||||
"type: io.kestra.plugin.core.state.Get",
|
||||
"name: myState",
|
||||
},
|
||||
full = true
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.states.Get"
|
||||
)
|
||||
@Deprecated(since = "1.1.0", forRemoval = true)
|
||||
public class Get extends AbstractState implements RunnableTask<Get.Output> {
|
||||
@Schema(
|
||||
title = "Raise an error if the state file is not found."
|
||||
)
|
||||
@Builder.Default
|
||||
private final Property<Boolean> errorOnMissing = Property.ofValue(false);
|
||||
|
||||
@Override
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
Map<String, Object> data;
|
||||
|
||||
try {
|
||||
data = this.get(runContext);
|
||||
} catch (FileNotFoundException e) {
|
||||
if (Boolean.TRUE.equals(runContext.render(this.errorOnMissing).as(Boolean.class).orElseThrow())) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
data = Map.of();
|
||||
}
|
||||
|
||||
return Output.builder()
|
||||
.count(data.size())
|
||||
.data(data)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
@Schema(
|
||||
title = "The count of properties found in the state"
|
||||
)
|
||||
private final int count;
|
||||
|
||||
@Schema(
|
||||
title = "The data extracted from the state"
|
||||
)
|
||||
private final Map<String, Object> data;
|
||||
}
|
||||
}
|
||||
89
core/src/main/java/io/kestra/plugin/core/state/Set.java
Normal file
89
core/src/main/java/io/kestra/plugin/core/state/Set.java
Normal file
@@ -0,0 +1,89 @@
|
||||
package io.kestra.plugin.core.state;
|
||||
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Set a state in the state store (Deprecated, use KV store instead).",
|
||||
description = "Values will be merged: \n" +
|
||||
"* If you provide a new key, the new key will be added.\n" +
|
||||
"* If you provide an existing key, the previous key will be overwrite.\n" +
|
||||
"\n" +
|
||||
"::alert{type=\"warning\"}\n" +
|
||||
"This method is not concurrency safe. If many executions for the same flow are concurrent, there is no guarantee on isolation on the value.\n" +
|
||||
"The value can be overwritten by other executions.\n" +
|
||||
"::\n"
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Set the default state for the current flow.",
|
||||
code = {
|
||||
"id: set_state",
|
||||
"type: io.kestra.plugin.core.state.Set",
|
||||
"data:",
|
||||
" '{{ inputs.store }}': '{{ outputs.download.md5 }}'",
|
||||
},
|
||||
full = true
|
||||
),
|
||||
@Example(
|
||||
title = "Set the `myState` state for the current flow.",
|
||||
code = {
|
||||
"id: set_state",
|
||||
"type: io.kestra.plugin.core.state.Set",
|
||||
"name: myState",
|
||||
"data:",
|
||||
" '{{ inputs.store }}': '{{ outputs.download.md5 }}'",
|
||||
},
|
||||
full = true
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.states.Set"
|
||||
)
|
||||
@Deprecated(since = "1.1.0", forRemoval = true)
|
||||
public class Set extends AbstractState implements RunnableTask<Set.Output> {
|
||||
@Schema(
|
||||
title = "The data to be stored in the state store"
|
||||
)
|
||||
private Property<Map<String, Object>> data;
|
||||
|
||||
@Override
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
Pair<String, Map<String, Object>> dataRendered = this.merge(runContext, runContext.render(this.data).asMap(String.class, Object.class));
|
||||
|
||||
return Output.builder()
|
||||
.count(dataRendered.getRight().size())
|
||||
.key(dataRendered.getLeft())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
@Schema(
|
||||
title = "The count of properties found in the state"
|
||||
)
|
||||
private final int count;
|
||||
|
||||
@Schema(
|
||||
title = "The key of the current state"
|
||||
)
|
||||
private final String key;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
@PluginSubGroup(categories = { PluginSubGroup.PluginCategory.STORAGE, PluginSubGroup.PluginCategory.CORE})
|
||||
package io.kestra.plugin.core.state;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
@@ -1,10 +1,6 @@
|
||||
package io.kestra.core.docs;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.tasks.VoidOutput;
|
||||
import io.kestra.core.plugins.PluginClassAndMetadata;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.plugin.core.runner.Process;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.plugins.PluginScanner;
|
||||
@@ -12,14 +8,9 @@ import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.plugin.core.flow.Dag;
|
||||
import io.kestra.plugin.core.flow.Subflow;
|
||||
import io.kestra.plugin.core.state.Set;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -135,17 +126,17 @@ class DocumentationGeneratorTest {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void deprecated() throws IOException {
|
||||
void state() throws IOException {
|
||||
PluginScanner pluginScanner = new PluginScanner(ClassPluginDocumentationTest.class.getClassLoader());
|
||||
RegisteredPlugin scan = pluginScanner.scan();
|
||||
Class<DeprecatedTask> set = scan.findClass(DeprecatedTask.class.getName()).orElseThrow();
|
||||
Class<Set> set = scan.findClass(Set.class.getName()).orElseThrow();
|
||||
|
||||
PluginClassAndMetadata<Task> metadata = PluginClassAndMetadata.create(scan, set, Task.class, null);
|
||||
ClassPluginDocumentation<? extends Task> doc = ClassPluginDocumentation.of(jsonSchemaGenerator, metadata, scan.version(), false);
|
||||
|
||||
String render = DocumentationGenerator.render(doc);
|
||||
|
||||
assertThat(render).contains("DeprecatedTask");
|
||||
assertThat(render).contains("Set");
|
||||
assertThat(render).contains("::alert{type=\"warning\"}\n");
|
||||
}
|
||||
|
||||
@@ -187,26 +178,4 @@ class DocumentationGeneratorTest {
|
||||
assertThat(render).contains("title: Process");
|
||||
assertThat(render).contains("Task runner that executes a task as a subprocess on the Kestra host.");
|
||||
}
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Deprecated
|
||||
public static class DeprecatedTask extends Task implements RunnableTask<VoidOutput> {
|
||||
@PluginProperty(dynamic = true)
|
||||
@Deprecated
|
||||
private String someProperty;
|
||||
|
||||
@NotBlank
|
||||
@PluginProperty(dynamic = true)
|
||||
@Deprecated
|
||||
private String additionalProperty;
|
||||
|
||||
@Override
|
||||
public VoidOutput run(RunContext runContext) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -429,7 +429,7 @@ class HttpClientTest {
|
||||
|
||||
@Test
|
||||
void shouldReturnResponseForAllowedResponseCode() throws IOException, IllegalVariableEvaluationException, HttpClientException {
|
||||
try (HttpClient client = client(b -> b.configuration(HttpConfiguration.builder().allowedResponseCodes(Property.ofValue(List.of(404))).build()))) {
|
||||
try (HttpClient client = client(b -> b.configuration(HttpConfiguration.builder().allowedResponseCodes(Property.of(List.of(404))).build()))) {
|
||||
HttpResponse<Map<String, String>> response = client.request(HttpRequest.of(URI.create(embeddedServerUri + "/http/error?status=404")));
|
||||
|
||||
assertThat(response.getStatus().getCode()).isEqualTo(404);
|
||||
@@ -438,7 +438,7 @@ class HttpClientTest {
|
||||
|
||||
@Test
|
||||
void shouldThrowExceptionForNotAllowedResponseCode() throws IOException, IllegalVariableEvaluationException {
|
||||
try (HttpClient client = client(b -> b.configuration(HttpConfiguration.builder().allowedResponseCodes(Property.ofValue(List.of(404))).build()))) {
|
||||
try (HttpClient client = client(b -> b.configuration(HttpConfiguration.builder().allowedResponseCodes(Property.of(List.of(404))).build()))) {
|
||||
URI uri = URI.create(embeddedServerUri + "/http/error?status=405");
|
||||
|
||||
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> {
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
@@ -12,6 +15,28 @@ import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class FileInputTest {
|
||||
|
||||
@Test
|
||||
void shouldGetExtensionWhenFindingFileExtensionForExistingFile() {
|
||||
List<Input<?>> inputs = List.of(
|
||||
FileInput.builder().id("test-file1").extension(".zip").build(),
|
||||
FileInput.builder().id("test-file2").extension(".gz").build()
|
||||
);
|
||||
|
||||
String result = FileInput.findFileInputExtension(inputs, "test-file1");
|
||||
Assertions.assertEquals(".zip", result);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReturnDefaultExtensionWhenFindingExtensionForUnknownFile() {
|
||||
List<Input<?>> inputs = List.of(
|
||||
FileInput.builder().id("test-file1").extension(".zip").build(),
|
||||
FileInput.builder().id("test-file2").extension(".gz").build()
|
||||
);
|
||||
|
||||
String result = FileInput.findFileInputExtension(inputs, "???");
|
||||
Assertions.assertEquals(".upl", result);
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateValidFileTypes() {
|
||||
final FileInput csvInput = FileInput.builder()
|
||||
|
||||
@@ -74,7 +74,7 @@ public abstract class AbstractLockRepositoryTest {
|
||||
|
||||
// In Elasticsearch, as we search then delete we need to wait for the refresh period
|
||||
// This would not be an issue in real-case scenario as when we delete by owner the service would be down for a certain amount of time
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(500);
|
||||
|
||||
List<Lock> deleted = lockRepository.deleteByOwner("me");
|
||||
assertThat(deleted).hasSize(1);
|
||||
|
||||
@@ -258,12 +258,6 @@ public abstract class AbstractRunnerTest {
|
||||
multipleConditionTriggerCaseTest.flowTriggerMultipleConditions();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-trigger-mixed-conditions-flow-a.yaml", "flows/valids/flow-trigger-mixed-conditions-flow-listen.yaml"})
|
||||
void flowTriggerMixedConditions() throws Exception {
|
||||
multipleConditionTriggerCaseTest.flowTriggerMixedConditions();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/each-null.yaml"})
|
||||
void eachWithNull() throws Exception {
|
||||
|
||||
@@ -170,24 +170,4 @@ public class MultipleConditionTriggerCaseTest {
|
||||
e -> e.getState().getCurrent().equals(Type.SUCCESS),
|
||||
MAIN_TENANT, "io.kestra.tests.trigger.multiple.conditions", "flow-trigger-multiple-conditions-flow-listen", Duration.ofSeconds(1)));
|
||||
}
|
||||
|
||||
public void flowTriggerMixedConditions() throws TimeoutException, QueueException {
|
||||
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.mixed.conditions",
|
||||
"flow-trigger-mixed-conditions-flow-a");
|
||||
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
// trigger is done
|
||||
Execution triggerExecution = runnerUtils.awaitFlowExecution(
|
||||
e -> e.getState().getCurrent().equals(Type.SUCCESS),
|
||||
MAIN_TENANT, "io.kestra.tests.trigger.mixed.conditions", "flow-trigger-mixed-conditions-flow-listen");
|
||||
executionRepository.delete(triggerExecution);
|
||||
assertThat(triggerExecution.getTaskRunList().size()).isEqualTo(1);
|
||||
assertThat(triggerExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
// we assert that we didn't have any other flow triggered
|
||||
assertThrows(RuntimeException.class, () -> runnerUtils.awaitFlowExecution(
|
||||
e -> e.getState().getCurrent().equals(Type.SUCCESS),
|
||||
MAIN_TENANT, "io.kestra.tests.trigger.mixed.conditions", "flow-trigger-mixed-conditions-flow-listen", Duration.ofSeconds(1)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,7 +220,7 @@ class YamlParserTest {
|
||||
);
|
||||
|
||||
assertThat(exception.getConstraintViolations().size()).isEqualTo(1);
|
||||
assertThat(new ArrayList<>(exception.getConstraintViolations()).getFirst().getMessage()).contains("Duplicate field 'message'");
|
||||
assertThat(new ArrayList<>(exception.getConstraintViolations()).getFirst().getMessage()).contains("Duplicate field 'variables.tf'");
|
||||
}
|
||||
|
||||
private Flow parse(String path) {
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.exceptions.MigrationRequiredException;
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.utils.Hashing;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@MicronautTest
|
||||
public class StateStoreTest {
|
||||
@Inject
|
||||
private TestRunContextFactory runContextFactory;
|
||||
|
||||
@Test
|
||||
void all() throws IOException, ResourceExpiredException {
|
||||
RunContext runContext = runContext();
|
||||
|
||||
String state = IdUtils.create();
|
||||
runContext.stateStore().putState(state, "some-name", "my-taskrun-value", "my-value".getBytes());
|
||||
|
||||
assertThat(runContext.stateStore().getState(state, "some-name", "my-taskrun-value").readAllBytes()).isEqualTo("my-value".getBytes());
|
||||
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
String key = flowInfo.id() + "_states_" + state + "_some-name_" + Hashing.hashToString("my-taskrun-value");
|
||||
assertThat(runContext.namespaceKv(flowInfo.namespace()).getValue(key).get().value()).isEqualTo("my-value".getBytes());
|
||||
|
||||
runContext.stateStore().deleteState(state, "some-name", "my-taskrun-value");
|
||||
|
||||
FileNotFoundException fileNotFoundException = Assertions.assertThrows(FileNotFoundException.class, () -> runContext.stateStore().getState(state, "some-name", "my-taskrun-value"));
|
||||
assertThat(fileNotFoundException.getMessage()).isEqualTo("State " + key + " not found");
|
||||
}
|
||||
|
||||
@Test
|
||||
void getState_WithOldStateStore_ShouldThrowMigrationException() throws IOException, ResourceExpiredException {
|
||||
RunContext runContext = runContext();
|
||||
String state = IdUtils.create();
|
||||
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
URI oldStateStoreFileUri = URI.create("kestra:/" + flowInfo.namespace().replace(".", "/") + "/" + flowInfo.id() + "/states/" + state + "/" + Hashing.hashToString("my-taskrun-value") + "/some-name");
|
||||
byte[] expectedContent = "from-old-state".getBytes();
|
||||
runContext.storage().putFile(new ByteArrayInputStream(expectedContent), oldStateStoreFileUri);
|
||||
|
||||
String key = flowInfo.id() + "_states_" + state + "_some-name_" + Hashing.hashToString("my-taskrun-value");
|
||||
assertThat(runContext.storage().getFile(oldStateStoreFileUri).readAllBytes()).isEqualTo(expectedContent);
|
||||
|
||||
MigrationRequiredException migrationRequiredException = Assertions.assertThrows(MigrationRequiredException.class, () -> runContext.stateStore().getState(state, "some-name", "my-taskrun-value"));
|
||||
assertThat(migrationRequiredException.getMessage()).isEqualTo("It looks like the State Store migration hasn't been run, please run the `/app/kestra sys state-store migrate` command before.");
|
||||
|
||||
assertThat(runContext.namespaceKv(flowInfo.namespace()).getValue(key).isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void subNameAndTaskrunValueOptional() throws IOException, ResourceExpiredException {
|
||||
RunContext runContext = runContext();
|
||||
|
||||
String state = IdUtils.create();
|
||||
runContext.stateStore().putState(state, "a-name", "a-taskrun-value", "aa-value".getBytes());
|
||||
runContext.stateStore().putState(state, "a-name", "b-taskrun-value", "ab-value".getBytes());
|
||||
runContext.stateStore().putState(state, "b-name", "a-taskrun-value", "ba-value".getBytes());
|
||||
runContext.stateStore().putState(state, "b-name", "b-taskrun-value", "bb-value".getBytes());
|
||||
runContext.stateStore().putState(state, null, "a-taskrun-value", "0a-value".getBytes());
|
||||
runContext.stateStore().putState(state, null, "b-taskrun-value", "0b-value".getBytes());
|
||||
runContext.stateStore().putState(state, "a-name", null, "a0-value".getBytes());
|
||||
runContext.stateStore().putState(state, "b-name", null, "b0-value".getBytes());
|
||||
|
||||
assertThat(runContext.stateStore().getState(state, "a-name", "a-taskrun-value").readAllBytes()).isEqualTo("aa-value".getBytes());
|
||||
assertThat(runContext.stateStore().getState(state, "a-name", "b-taskrun-value").readAllBytes()).isEqualTo("ab-value".getBytes());
|
||||
assertThat(runContext.stateStore().getState(state, "b-name", "a-taskrun-value").readAllBytes()).isEqualTo("ba-value".getBytes());
|
||||
assertThat(runContext.stateStore().getState(state, "b-name", "b-taskrun-value").readAllBytes()).isEqualTo("bb-value".getBytes());
|
||||
assertThat(runContext.stateStore().getState(state, null, "a-taskrun-value").readAllBytes()).isEqualTo("0a-value".getBytes());
|
||||
assertThat(runContext.stateStore().getState(state, null, "b-taskrun-value").readAllBytes()).isEqualTo("0b-value".getBytes());
|
||||
assertThat(runContext.stateStore().getState(state, "a-name", null).readAllBytes()).isEqualTo("a0-value".getBytes());
|
||||
assertThat(runContext.stateStore().getState(state, "b-name", null).readAllBytes()).isEqualTo("b0-value".getBytes());
|
||||
}
|
||||
|
||||
private RunContext runContext() {
|
||||
return TestsUtils.mockRunContext(runContextFactory, Log.builder().id("log").type(Log.class.getName()).message("logging").build(), null);
|
||||
}
|
||||
}
|
||||
66
core/src/test/java/io/kestra/plugin/core/flow/StateTest.java
Normal file
66
core/src/test/java/io/kestra/plugin/core/flow/StateTest.java
Normal file
@@ -0,0 +1,66 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.runners.TestRunnerUtils;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
class StateTest {
|
||||
|
||||
public static final String FLOW_ID = "state";
|
||||
public static final String NAMESPACE = "io.kestra.tests";
|
||||
@Inject
|
||||
private TestRunnerUtils runnerUtils;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/state.yaml"})
|
||||
void set() throws TimeoutException, QueueException {
|
||||
String stateName = IdUtils.create();
|
||||
|
||||
Execution execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE,
|
||||
FLOW_ID, null, (f, e) -> ImmutableMap.of(FLOW_ID, stateName));
|
||||
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(((Map<String, Integer>) execution.findTaskRunsByTaskId("createGet").getFirst().getOutputs().get("data")).get("value")).isEqualTo(1);
|
||||
|
||||
execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE,
|
||||
FLOW_ID, null, (f, e) -> ImmutableMap.of(FLOW_ID, stateName));
|
||||
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(((Map<String, Object>) execution.findTaskRunsByTaskId("updateGet").getFirst().getOutputs().get("data")).get("value")).isEqualTo("2");
|
||||
|
||||
execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE,
|
||||
FLOW_ID, null, (f, e) -> ImmutableMap.of(FLOW_ID, stateName));
|
||||
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat((Integer) execution.findTaskRunsByTaskId("deleteGet").getFirst().getOutputs().get("count")).isZero();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/state.yaml"}, tenantId = "tenant1")
|
||||
void each() throws TimeoutException, InternalException, QueueException {
|
||||
|
||||
Execution execution = runnerUtils.runOne("tenant1", NAMESPACE,
|
||||
FLOW_ID, null, (f, e) -> ImmutableMap.of(FLOW_ID, "each"));
|
||||
assertThat(execution.getTaskRunList()).hasSize(19);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(((Map<String, String>) execution.findTaskRunByTaskIdAndValue("regetEach1", List.of("b")).getOutputs().get("data")).get("value")).isEqualTo("null-b");
|
||||
assertThat(((Map<String, String>) execution.findTaskRunByTaskIdAndValue("regetEach2", List.of("b")).getOutputs().get("data")).get("value")).isEqualTo("null-a-b");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
package io.kestra.plugin.core.state;
|
||||
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class StateNamespaceTest {
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
private RunContext runContextFlow1(Task task) {
|
||||
return TestsUtils.mockRunContext(runContextFactory, task, Map.of());
|
||||
}
|
||||
|
||||
private RunContext runContextFlow2(Task task) {
|
||||
return TestsUtils.mockRunContext(runContextFactory, task, Map.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
Set set = Set.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Set.class.getSimpleName())
|
||||
.namespace(Property.ofValue(true))
|
||||
.data(Property.ofValue(Map.of(
|
||||
"john", "doe"
|
||||
)))
|
||||
.build();
|
||||
Set.Output setOutput = set.run(runContextFlow1(set));
|
||||
assertThat(setOutput.getCount()).isEqualTo(1);
|
||||
|
||||
Get get = Get.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Get.class.getSimpleName())
|
||||
.namespace(Property.ofValue(true))
|
||||
.build();
|
||||
Get.Output getOutput = get.run(runContextFlow2(get));
|
||||
assertThat(getOutput.getCount()).isEqualTo(1);
|
||||
assertThat(getOutput.getData().get("john")).isEqualTo("doe");
|
||||
|
||||
get = Get.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Get.class.getSimpleName())
|
||||
.build();
|
||||
getOutput = get.run(runContextFlow2(get));
|
||||
assertThat(getOutput.getCount()).isZero();
|
||||
}
|
||||
}
|
||||
125
core/src/test/java/io/kestra/plugin/core/state/StateTest.java
Normal file
125
core/src/test/java/io/kestra/plugin/core/state/StateTest.java
Normal file
@@ -0,0 +1,125 @@
|
||||
package io.kestra.plugin.core.state;
|
||||
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
class StateTest {
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
Get get = Get.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Get.class.getName())
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, get, Map.of(
|
||||
"key", "test",
|
||||
"inc", 1
|
||||
));
|
||||
|
||||
Get.Output getOutput = get.run(runContext);
|
||||
assertThat(getOutput.getCount()).isZero();
|
||||
|
||||
Set set = Set.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Set.class.toString())
|
||||
.data(Property.ofValue(Map.of(
|
||||
"{{ inputs.key }}", "{{ inputs.inc }}"
|
||||
)))
|
||||
.build();
|
||||
Set.Output setOutput = set.run(runContext);
|
||||
assertThat(setOutput.getCount()).isEqualTo(1);
|
||||
|
||||
get = Get.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Get.class.toString())
|
||||
.build();
|
||||
getOutput = get.run(runContext);
|
||||
assertThat(getOutput.getCount()).isEqualTo(1);
|
||||
assertThat(getOutput.getData().get("test")).isEqualTo("1");
|
||||
|
||||
set = Set.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Set.class.toString())
|
||||
.data(Property.ofValue(Map.of(
|
||||
"{{ inputs.key }}", "2",
|
||||
"test2", "3"
|
||||
)))
|
||||
.build();
|
||||
|
||||
setOutput = set.run(runContext);
|
||||
assertThat(setOutput.getCount()).isEqualTo(2);
|
||||
|
||||
get = Get.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Get.class.toString())
|
||||
.build();
|
||||
|
||||
getOutput = get.run(runContext);
|
||||
|
||||
assertThat(getOutput.getCount()).isEqualTo(2);
|
||||
assertThat(getOutput.getData().get("test")).isEqualTo("2");
|
||||
assertThat(getOutput.getData().get("test2")).isEqualTo("3");
|
||||
|
||||
Delete delete = Delete.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Get.class.toString())
|
||||
.build();
|
||||
|
||||
Delete.Output deleteRun = delete.run(runContext);
|
||||
assertThat(deleteRun.getDeleted()).isTrue();
|
||||
|
||||
|
||||
get = Get.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Get.class.toString())
|
||||
.build();
|
||||
|
||||
getOutput = get.run(runContext);
|
||||
|
||||
assertThat(getOutput.getCount()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteThrow() {
|
||||
Delete task = Delete.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Get.class.getName())
|
||||
.name(Property.ofValue(IdUtils.create()))
|
||||
.errorOnMissing(Property.ofValue(true))
|
||||
.build();
|
||||
|
||||
assertThrows(FileNotFoundException.class, () -> {
|
||||
task.run(TestsUtils.mockRunContext(runContextFactory, task, Map.of()));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void getThrow() {
|
||||
Get task = Get.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Get.class.getName())
|
||||
.name(Property.ofValue(IdUtils.create()))
|
||||
.errorOnMissing(Property.ofValue(true))
|
||||
.build();
|
||||
|
||||
assertThrows(FileNotFoundException.class, () -> {
|
||||
task.run(TestsUtils.mockRunContext(runContextFactory, task, Map.of()));
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,10 @@ namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: bad-task
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World
|
||||
message: Hello World
|
||||
type: "io.kestra.plugin.core.state.Set"
|
||||
data:
|
||||
variables.tf: |
|
||||
test
|
||||
variables.tf: |
|
||||
test
|
||||
name: "unit"
|
||||
@@ -1,10 +0,0 @@
|
||||
id: flow-trigger-mixed-conditions-flow-a
|
||||
namespace: io.kestra.tests.trigger.mixed.conditions
|
||||
|
||||
labels:
|
||||
some: label
|
||||
|
||||
tasks:
|
||||
- id: only
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "from parents: {{execution.id}}"
|
||||
@@ -1,25 +0,0 @@
|
||||
id: flow-trigger-mixed-conditions-flow-listen
|
||||
namespace: io.kestra.tests.trigger.mixed.conditions
|
||||
|
||||
triggers:
|
||||
- id: on_completion
|
||||
type: io.kestra.plugin.core.trigger.Flow
|
||||
states: [ SUCCESS ]
|
||||
conditions:
|
||||
- type: io.kestra.plugin.core.condition.ExecutionFlow
|
||||
namespace: io.kestra.tests.trigger.mixed.conditions
|
||||
flowId: flow-trigger-mixed-conditions-flow-a
|
||||
- id: on_failure
|
||||
type: io.kestra.plugin.core.trigger.Flow
|
||||
states: [ FAILED ]
|
||||
preconditions:
|
||||
id: flowsFailure
|
||||
flows:
|
||||
- namespace: io.kestra.tests.trigger.multiple.conditions
|
||||
flowId: flow-trigger-multiple-conditions-flow-a
|
||||
states: [FAILED]
|
||||
|
||||
tasks:
|
||||
- id: only
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "It works"
|
||||
99
core/src/test/resources/flows/valids/state.yaml
Normal file
99
core/src/test/resources/flows/valids/state.yaml
Normal file
@@ -0,0 +1,99 @@
|
||||
id: state
|
||||
namespace: io.kestra.tests
|
||||
|
||||
inputs:
|
||||
- id: state
|
||||
type: STRING
|
||||
|
||||
tasks:
|
||||
- id: if
|
||||
type: io.kestra.plugin.core.flow.If
|
||||
condition: "{{ inputs.state == 'each' }}"
|
||||
then:
|
||||
- id: delete1
|
||||
type: io.kestra.plugin.core.state.Delete
|
||||
name: "{{ inputs.state }}"
|
||||
|
||||
- id: each1
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
values: '["a", "b"]'
|
||||
tasks:
|
||||
- id: getEach1
|
||||
type: io.kestra.plugin.core.state.Get
|
||||
name: "{{ inputs.state }}"
|
||||
|
||||
- id: setEach1
|
||||
type: io.kestra.plugin.core.state.Set
|
||||
name: "{{ inputs.state }}"
|
||||
data:
|
||||
value: "{{ (currentEachOutput(outputs.getEach1).data.value ?? 'null') ~ '-' ~ taskrun.value }}"
|
||||
- id: regetEach1
|
||||
type: io.kestra.plugin.core.state.Get
|
||||
name: "{{ inputs.state }}"
|
||||
|
||||
- id: delete2
|
||||
type: io.kestra.plugin.core.state.Delete
|
||||
name: "{{ inputs.state }}"
|
||||
|
||||
- id: each2
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
values: '["a", "b"]'
|
||||
tasks:
|
||||
- id: getEach2
|
||||
type: io.kestra.plugin.core.state.Get
|
||||
taskrunValue: false
|
||||
name: "{{ inputs.state }}"
|
||||
- id: setEach2
|
||||
type: io.kestra.plugin.core.state.Set
|
||||
name: "{{ inputs.state }}"
|
||||
taskrunValue: false
|
||||
data:
|
||||
value: "{{ (currentEachOutput(outputs.getEach2).data.value ?? 'null') ~ '-' ~ taskrun.value }}"
|
||||
- id: regetEach2
|
||||
type: io.kestra.plugin.core.state.Get
|
||||
taskrunValue: false
|
||||
name: "{{ inputs.state }}"
|
||||
- id: delete3
|
||||
type: io.kestra.plugin.core.state.Delete
|
||||
name: "{{ inputs.state }}"
|
||||
|
||||
|
||||
else:
|
||||
- id: state
|
||||
type: io.kestra.plugin.core.state.Get
|
||||
name: "{{ inputs.state }}"
|
||||
|
||||
- id: switch
|
||||
type: io.kestra.plugin.core.flow.Switch
|
||||
value: "{{ (outputs.state.data.value ?? 0) == 0 ? 'create' : ( outputs.state.data.value == 1 ? 'update' : 'delete') }}"
|
||||
cases:
|
||||
"create":
|
||||
- id: create
|
||||
type: io.kestra.plugin.core.state.Set
|
||||
name: "{{ inputs.state }}"
|
||||
data:
|
||||
value: 1
|
||||
- id: createGet
|
||||
type: io.kestra.plugin.core.state.Get
|
||||
name: "{{ inputs.state }}"
|
||||
|
||||
"update":
|
||||
- id: update
|
||||
type: io.kestra.plugin.core.state.Set
|
||||
name: "{{ inputs.state }}"
|
||||
data:
|
||||
value: "{{ outputs.state.data.value + 1 }}"
|
||||
- id: updateGet
|
||||
type: io.kestra.plugin.core.state.Get
|
||||
name: "{{ inputs.state }}"
|
||||
|
||||
|
||||
"delete":
|
||||
- id: delete
|
||||
type: io.kestra.plugin.core.state.Delete
|
||||
name: "{{ inputs.state }}"
|
||||
- id: deleteGet
|
||||
type: io.kestra.plugin.core.state.Get
|
||||
name: "{{ inputs.state }}"
|
||||
|
||||
|
||||
@@ -2,13 +2,19 @@ package io.kestra.executor;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.*;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.flows.sla.ExecutionMonitoringSLA;
|
||||
import io.kestra.core.models.flows.sla.SLA;
|
||||
import io.kestra.core.models.flows.sla.SLAMonitor;
|
||||
import io.kestra.core.models.flows.sla.Violation;
|
||||
import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
@@ -21,16 +27,23 @@ import io.kestra.core.server.ServiceStateChangeEvent;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.services.*;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.trace.Tracer;
|
||||
import io.kestra.core.trace.TracerFactory;
|
||||
import io.kestra.core.utils.*;
|
||||
import io.kestra.executor.handler.*;
|
||||
import io.kestra.plugin.core.flow.ForEachItem;
|
||||
import io.kestra.plugin.core.flow.WorkingDirectory;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
@@ -70,6 +83,9 @@ public class DefaultExecutor implements Executor {
|
||||
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
|
||||
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
private QueueInterface<LogEntry> logQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.KILL_NAMED)
|
||||
private QueueInterface<ExecutionKilled> killQueue;
|
||||
@Inject
|
||||
@@ -88,8 +104,12 @@ public class DefaultExecutor implements Executor {
|
||||
@Inject
|
||||
private SkipExecutionService skipExecutionService;
|
||||
@Inject
|
||||
private PluginDefaultService pluginDefaultService;
|
||||
@Inject
|
||||
private ExecutorService executorService;
|
||||
@Inject
|
||||
private WorkerGroupService workerGroupService;
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
@Inject
|
||||
private VariablesService variablesService;
|
||||
@@ -108,6 +128,8 @@ public class DefaultExecutor implements Executor {
|
||||
@Inject
|
||||
private ExecutionQueuedStateStore executionQueuedStateStore;
|
||||
@Inject
|
||||
private MultipleConditionStorageInterface multipleConditionStorage;
|
||||
@Inject
|
||||
private ExecutionDelayStateStore executionDelayStateStore;
|
||||
@Inject
|
||||
private SLAMonitorStateStore slaMonitorStateStore;
|
||||
@@ -123,21 +145,6 @@ public class DefaultExecutor implements Executor {
|
||||
@Inject
|
||||
private FlowListenersInterface flowListeners;
|
||||
|
||||
@Inject
|
||||
private ExecutionMessageHandler executionMessageHandler;
|
||||
@Inject
|
||||
private ExecutionEventMessageHandler executionEventMessageHandler;
|
||||
@Inject
|
||||
private WorkerTaskResultMessageHandler workerTaskResultMessageHandler;
|
||||
@Inject
|
||||
private ExecutionKilledExecutionMessageHandler executionKilledExecutionMessageHandler;
|
||||
@Inject
|
||||
private SubflowExecutionResultMessageHandler subflowExecutionResultMessageHandler;
|
||||
@Inject
|
||||
private SubflowExecutionEndMessageHandler subflowExecutionEndMessageHandler;
|
||||
@Inject
|
||||
private MultipleConditionEventMessageHandler multipleConditionEventMessageHandler;
|
||||
|
||||
@Value("${kestra.executor.clean.execution-queue:true}")
|
||||
private boolean cleanExecutionQueue;
|
||||
@Value("${kestra.executor.clean.worker-queue:true}")
|
||||
@@ -155,13 +162,16 @@ public class DefaultExecutor implements Executor {
|
||||
|
||||
private List<FlowWithSource> allFlows;
|
||||
|
||||
private final Tracer tracer;
|
||||
private final java.util.concurrent.ExecutorService workerTaskResultExecutorService;
|
||||
private final java.util.concurrent.ExecutorService executionExecutorService;
|
||||
private final int numberOfThreads;
|
||||
|
||||
|
||||
@Inject
|
||||
public DefaultExecutor(ExecutorsUtils executorsUtils, @Value("${kestra.executor.thread-count:0}") int threadCount) {
|
||||
public DefaultExecutor(TracerFactory tracerFactory, ExecutorsUtils executorsUtils, @Value("${kestra.executor.thread-count:0}") int threadCount) {
|
||||
this.tracer = tracerFactory.getTracer(DefaultExecutor.class, "EXECUTOR");
|
||||
|
||||
// By default, we start available processors count threads with a minimum of 4 by executor service
|
||||
// for the worker task result queue and the execution queue.
|
||||
// Other queues would not benefit from more consumers.
|
||||
@@ -312,8 +322,22 @@ public class DefaultExecutor implements Executor {
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<ExecutorContext> maybeExecutor = executionMessageHandler.handle(message);
|
||||
maybeExecutor.ifPresent(this::toExecution);
|
||||
try {
|
||||
executionEventQueue.emit(new ExecutionEvent(message, ExecutionEventType.CREATED));
|
||||
} catch (QueueException e) {
|
||||
// If we cannot send the execution event, we fail the execution
|
||||
executionStateStore.lock(message.getId(), execution -> {
|
||||
try {
|
||||
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
|
||||
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.TERMINATED);
|
||||
this.executionEventQueue.emit(event);
|
||||
return new ExecutorContext(failed);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the execution {}", execution.getId(), ex);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void executionEventQueue(Either<ExecutionEvent, DeserializationException> either) {
|
||||
@@ -328,8 +352,190 @@ public class DefaultExecutor implements Executor {
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<ExecutorContext> maybeExecutor = executionEventMessageHandler.handle(message);
|
||||
maybeExecutor.ifPresent(this::toExecution);
|
||||
ExecutorContext result = executionStateStore.lock(message.executionId(), execution -> tracer.inCurrentContext(
|
||||
execution,
|
||||
FlowId.uidWithoutRevision(execution),
|
||||
() -> {
|
||||
try {
|
||||
final FlowWithSource flow = findFlow(execution);
|
||||
ExecutorContext executor = new ExecutorContext(execution, flow);
|
||||
|
||||
// schedule it for later if needed
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
|
||||
ExecutionDelay executionDelay = ExecutionDelay.builder()
|
||||
.executionId(executor.getExecution().getId())
|
||||
.date(execution.getScheduleDate())
|
||||
.state(State.Type.RUNNING)
|
||||
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
|
||||
.build();
|
||||
executionDelayStateStore.save(executionDelay);
|
||||
return executor;
|
||||
}
|
||||
|
||||
// create an SLA monitor if needed
|
||||
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty(flow.getSla())) {
|
||||
List<SLAMonitor> monitors = flow.getSla().stream()
|
||||
.filter(ExecutionMonitoringSLA.class::isInstance)
|
||||
.map(ExecutionMonitoringSLA.class::cast)
|
||||
.map(sla -> SLAMonitor.builder()
|
||||
.executionId(execution.getId())
|
||||
.slaId(((SLA) sla).getId())
|
||||
.deadline(execution.getState().getStartDate().plus(sla.getDuration()))
|
||||
.build()
|
||||
)
|
||||
.toList();
|
||||
monitors.forEach(monitor -> slaMonitorStateStore.save(monitor));
|
||||
}
|
||||
|
||||
// handle concurrency limit, we need to use a different queue to be sure that execution running
|
||||
// are processed sequentially so inside a queue with no parallelism
|
||||
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null) {
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(executor.getFlow().getTenantId())
|
||||
.namespace(executor.getFlow().getNamespace())
|
||||
.flowId(executor.getFlow().getId())
|
||||
.execution(executor.getExecution())
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.CREATED)
|
||||
.build();
|
||||
|
||||
ExecutionRunning processed = concurrencyLimitStateStore.countThenProcess(flow, (txContext, concurrencyLimit) -> {
|
||||
ExecutionRunning computed = executorService.processExecutionRunning(flow, concurrencyLimit.getRunning(), executionRunning.withExecution(execution)); // be sure that the execution running contains the latest value of the execution
|
||||
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
|
||||
return Pair.of(computed, concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1));
|
||||
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
|
||||
executionQueuedStateStore.save(txContext, ExecutionQueued.fromExecutionRunning(computed));
|
||||
}
|
||||
return Pair.of(computed, concurrencyLimit);
|
||||
});
|
||||
|
||||
// if the execution is queued or terminated due to concurrency limit, we stop here
|
||||
if (processed.getExecution().getState().isTerminated() || processed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
|
||||
return executor.withExecution(processed.getExecution(), "handleConcurrencyLimit");
|
||||
}
|
||||
}
|
||||
|
||||
// handle execution changed SLA
|
||||
executor = executorService.handleExecutionChangedSLA(executor);
|
||||
|
||||
// process the execution
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, executor);
|
||||
}
|
||||
executor = executorService.process(executor);
|
||||
|
||||
if (!executor.getNexts().isEmpty()) {
|
||||
executor.withExecution(
|
||||
executorService.onNexts(executor.getExecution(), executor.getNexts()),
|
||||
"onNexts"
|
||||
);
|
||||
}
|
||||
|
||||
// worker task
|
||||
if (!executor.getWorkerTasks().isEmpty()) {
|
||||
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
|
||||
executor
|
||||
.getWorkerTasks()
|
||||
.forEach(throwConsumer(workerTask -> {
|
||||
try {
|
||||
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
|
||||
} else {
|
||||
if (workerTask.getTask().isSendToWorkerTask()) {
|
||||
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
|
||||
String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey())))
|
||||
.orElse(null);
|
||||
if (workerTask.getTask() instanceof WorkingDirectory) {
|
||||
// WorkingDirectory is a flowable so it will be moved to RUNNING a few lines under
|
||||
workerJobQueue.emit(workerGroupKey, workerTask);
|
||||
} else {
|
||||
TaskRun taskRun = workerTask.getTaskRun().withState(State.Type.SUBMITTED);
|
||||
workerJobQueue.emit(workerGroupKey, workerTask.withTaskRun(taskRun));
|
||||
workerTaskResults.add(new WorkerTaskResult(taskRun));
|
||||
}
|
||||
}
|
||||
/// flowable attempt state transition to running
|
||||
if (workerTask.getTask().isFlowable()) {
|
||||
List<TaskRunAttempt> attempts = Optional.ofNullable(workerTask.getTaskRun().getAttempts())
|
||||
.map(ArrayList::new)
|
||||
.orElseGet(ArrayList::new);
|
||||
|
||||
|
||||
attempts.add(
|
||||
TaskRunAttempt.builder()
|
||||
.state(new State().withState(State.Type.RUNNING))
|
||||
.build()
|
||||
);
|
||||
|
||||
TaskRun updatedTaskRun = workerTask.getTaskRun()
|
||||
.withAttempts(attempts)
|
||||
.withState(State.Type.RUNNING);
|
||||
|
||||
workerTaskResults.add(new WorkerTaskResult(updatedTaskRun));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
|
||||
workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", workerTask.getTask().getId(), e.getMessage(), e);
|
||||
}
|
||||
}));
|
||||
|
||||
try {
|
||||
executorService.addWorkerTaskResults(executor, workerTaskResults);
|
||||
} catch (InternalException e) {
|
||||
log.error("Unable to add a worker task result to the execution", e);
|
||||
}
|
||||
}
|
||||
|
||||
// subflow execution results
|
||||
if (!executor.getSubflowExecutionResults().isEmpty()) {
|
||||
executor.getSubflowExecutionResults()
|
||||
.forEach(throwConsumer(subflowExecutionResult -> subflowExecutionResultQueue.emit(subflowExecutionResult)));
|
||||
}
|
||||
|
||||
// schedulerDelay
|
||||
if (!executor.getExecutionDelays().isEmpty()) {
|
||||
executor.getExecutionDelays()
|
||||
.forEach(executionDelay -> executionDelayStateStore.save(executionDelay));
|
||||
}
|
||||
|
||||
// subflow executions
|
||||
if (!executor.getSubflowExecutions().isEmpty()) {
|
||||
executor.getSubflowExecutions().forEach(throwConsumer(subflowExecution -> {
|
||||
Execution subExecution = subflowExecution.getExecution();
|
||||
String msg = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
|
||||
|
||||
log.info(msg);
|
||||
|
||||
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun(), subflowExecution.getExecution().getKind()).toBuilder()
|
||||
.level(Level.INFO)
|
||||
.message(msg)
|
||||
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
|
||||
.thread(Thread.currentThread().getName())
|
||||
.build()
|
||||
);
|
||||
|
||||
executionQueue.emit(subflowExecution.getExecution());
|
||||
}));
|
||||
}
|
||||
|
||||
return executor;
|
||||
} catch (QueueException e) {
|
||||
try {
|
||||
Execution failedExecution = fail(execution, e);
|
||||
this.executionQueue.emit(failedExecution);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the execution {}", execution.getId(), ex);
|
||||
}
|
||||
Span.current().recordException(e).setStatus(StatusCode.ERROR);
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
));
|
||||
|
||||
if (result != null) {
|
||||
this.toExecution(result);
|
||||
}
|
||||
}
|
||||
|
||||
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
|
||||
@@ -344,8 +550,30 @@ public class DefaultExecutor implements Executor {
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<ExecutorContext> maybeExecutor = workerTaskResultMessageHandler.handle(message);
|
||||
maybeExecutor.ifPresent(this::toExecution);
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, message);
|
||||
}
|
||||
|
||||
ExecutorContext executor = executionStateStore.lock(message.getTaskRun().getExecutionId(), execution -> {
|
||||
ExecutorContext current = new ExecutorContext(execution);
|
||||
|
||||
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
|
||||
try {
|
||||
// process worker task result
|
||||
executorService.addWorkerTaskResult(current, () -> findFlow(execution), message);
|
||||
// join worker result
|
||||
return current;
|
||||
} catch (InternalException e) {
|
||||
return handleFailedExecutionFromExecutor(current, e);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
|
||||
if (executor != null) {
|
||||
this.toExecution(executor);
|
||||
}
|
||||
}
|
||||
|
||||
private void killQueue(Either<ExecutionKilled, DeserializationException> either) {
|
||||
@@ -371,12 +599,54 @@ public class DefaultExecutor implements Executor {
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<ExecutorContext> maybeExecutor = executionKilledExecutionMessageHandler.handle(killedExecution);
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_EXECUTOR_KILLED_COUNT, MetricRegistry.METRIC_EXECUTOR_KILLED_COUNT_DESCRIPTION, metricRegistry.tags(killedExecution))
|
||||
.increment();
|
||||
|
||||
// Transmit the new execution state. Note that the execution
|
||||
// will eventually transition to KILLED state before sub-flow executions are actually killed.
|
||||
// This behavior is acceptable due to the fire-and-forget nature of the killing event.
|
||||
maybeExecutor.ifPresent(executor -> this.toExecution(executor, true));
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, killedExecution);
|
||||
}
|
||||
|
||||
// Immediately fire the event in EXECUTED state to notify the Workers to kill
|
||||
// any remaining tasks for that executing regardless of if the execution exist or not.
|
||||
// Note, that this event will be a noop if all tasks for that execution are already killed or completed.
|
||||
try {
|
||||
killQueue.emit(ExecutionKilledExecution
|
||||
.builder()
|
||||
.executionId(killedExecution.getExecutionId())
|
||||
.isOnKillCascade(false)
|
||||
.state(ExecutionKilled.State.EXECUTED)
|
||||
.tenantId(killedExecution.getTenantId())
|
||||
.build()
|
||||
);
|
||||
} catch (QueueException e) {
|
||||
log.error("Unable to kill the execution {}", killedExecution.getExecutionId(), e);
|
||||
}
|
||||
|
||||
ExecutorContext executor = killingOrAfterKillState(killedExecution.getExecutionId(), Optional.ofNullable(killedExecution.getExecutionState()));
|
||||
|
||||
// Check whether kill event should be propagated to downstream executions.
|
||||
// By default, always propagate the ExecutionKill to sub-flows (for backward compatibility).
|
||||
Boolean isOnKillCascade = Optional.ofNullable(killedExecution.getIsOnKillCascade()).orElse(true);
|
||||
if (isOnKillCascade) {
|
||||
executionService
|
||||
.killSubflowExecutions(event.getTenantId(), killedExecution.getExecutionId())
|
||||
.doOnNext(executionKilled -> {
|
||||
try {
|
||||
killQueue.emit(executionKilled);
|
||||
} catch (QueueException e) {
|
||||
log.error("Unable to kill the execution {}", executionKilled.getExecutionId(), e);
|
||||
}
|
||||
})
|
||||
.blockLast();
|
||||
}
|
||||
|
||||
if (executor != null) {
|
||||
// Transmit the new execution state. Note that the execution
|
||||
// will eventually transition to KILLED state before sub-flow executions are actually killed.
|
||||
// This behavior is acceptable due to the fire-and-forget nature of the killing event.
|
||||
this.toExecution(executor, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void subflowExecutionResultQueue(Either<SubflowExecutionResult, DeserializationException> either) {
|
||||
@@ -395,8 +665,80 @@ public class DefaultExecutor implements Executor {
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<ExecutorContext> maybeExecutor = subflowExecutionResultMessageHandler.handle(message);
|
||||
maybeExecutor.ifPresent(this::toExecution);
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, message);
|
||||
}
|
||||
|
||||
ExecutorContext executor = executionStateStore.lock(message.getParentTaskRun().getExecutionId(), execution -> {
|
||||
ExecutorContext current = new ExecutorContext(execution);
|
||||
|
||||
if (execution.hasTaskRunJoinable(message.getParentTaskRun())) { // TODO if we remove this check, we can avoid adding 'iteration' on the 'isSame()' method
|
||||
try {
|
||||
FlowWithSource flow = findFlow(execution);
|
||||
Task task = flow.findTaskByTaskId(message.getParentTaskRun().getTaskId());
|
||||
TaskRun taskRun;
|
||||
|
||||
// iterative tasks
|
||||
if (task instanceof ForEachItem.ForEachItemExecutable forEachItem) {
|
||||
// For iterative tasks, we need to get the taskRun from the execution,
|
||||
// move it to the state of the child flow, and merge the outputs.
|
||||
// This is important to avoid races such as RUNNING that arrives after the first SUCCESS/FAILED.
|
||||
RunContext runContext = runContextFactory.of(flow, task, current.getExecution(), message.getParentTaskRun());
|
||||
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId());
|
||||
if (taskRun.getState().getCurrent() != message.getState()) {
|
||||
taskRun = taskRun.withState(message.getState());
|
||||
}
|
||||
Map<String, Object> outputs = MapUtils.deepMerge(taskRun.getOutputs(), message.getParentTaskRun().getOutputs());
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
|
||||
taskRun = taskRun.withOutputs(variables);
|
||||
taskRun = ExecutableUtils.manageIterations(
|
||||
runContext.storage(),
|
||||
taskRun,
|
||||
current.getExecution(),
|
||||
forEachItem.getTransmitFailed(),
|
||||
forEachItem.isAllowFailure(),
|
||||
forEachItem.isAllowWarning()
|
||||
);
|
||||
} else {
|
||||
taskRun = message.getParentTaskRun();
|
||||
}
|
||||
|
||||
Execution newExecution = current.getExecution().withTaskRun(taskRun);
|
||||
|
||||
// If the worker task result is killed, we must check if it has a parents to also kill them if not already done.
|
||||
// Running flowable tasks that have child tasks running in the worker will be killed thanks to that.
|
||||
if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
|
||||
newExecution = executionService.killParentTaskruns(taskRun, newExecution);
|
||||
}
|
||||
|
||||
current = current.withExecution(newExecution, "joinSubflowExecutionResult");
|
||||
|
||||
// send metrics on parent taskRun terminated
|
||||
if (taskRun.getState().isTerminated()) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION, metricRegistry.tags(message))
|
||||
.increment();
|
||||
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION, metricRegistry.tags(message))
|
||||
.record(taskRun.getState().getDuration());
|
||||
|
||||
log.trace("TaskRun terminated: {}", taskRun);
|
||||
}
|
||||
|
||||
// join worker result
|
||||
return current;
|
||||
} catch (InternalException e) {
|
||||
return handleFailedExecutionFromExecutor(current, e);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
|
||||
if (executor != null) {
|
||||
this.toExecution(executor);
|
||||
}
|
||||
}
|
||||
|
||||
private void subflowExecutionEndQueue(Either<SubflowExecutionEnd, DeserializationException> either) {
|
||||
@@ -415,7 +757,44 @@ public class DefaultExecutor implements Executor {
|
||||
return;
|
||||
}
|
||||
|
||||
subflowExecutionEndMessageHandler.handle(message);
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, message);
|
||||
}
|
||||
|
||||
executionStateStore.lock(message.getParentExecutionId(), execution -> {
|
||||
if (execution == null) {
|
||||
throw new IllegalStateException("Execution state don't exist for " + message.getParentExecutionId() + ", receive " + message);
|
||||
}
|
||||
|
||||
FlowWithSource flow = findFlow(execution);
|
||||
try {
|
||||
ExecutableTask<?> executableTask = (ExecutableTask<?>) flow.findTaskByTaskId(message.getTaskId());
|
||||
if (!executableTask.waitForExecution()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
TaskRun taskRun = execution.findTaskRunByTaskRunId(message.getTaskRunId()).withState(message.getState()).withOutputs(message.getOutputs());
|
||||
FlowInterface childFlow = flowMetaStore.findByExecution(message.getChildExecution()).orElseThrow();
|
||||
RunContext runContext = runContextFactory.of(
|
||||
childFlow,
|
||||
(Task) executableTask,
|
||||
message.getChildExecution(),
|
||||
taskRun
|
||||
);
|
||||
|
||||
SubflowExecutionResult subflowExecutionResult = ExecutableUtils.subflowExecutionResultFromChildExecution(runContext, childFlow, message.getChildExecution(), executableTask, taskRun);
|
||||
if (subflowExecutionResult != null) {
|
||||
try {
|
||||
this.subflowExecutionResultQueue.emit(subflowExecutionResult);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the subflow execution result", ex);
|
||||
}
|
||||
}
|
||||
} catch (InternalException e) {
|
||||
log.error("Unable to process the subflow execution end", e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private void multipleConditionEventQueue(Either<MultipleConditionEvent, DeserializationException> either) {
|
||||
@@ -426,7 +805,14 @@ public class DefaultExecutor implements Executor {
|
||||
|
||||
MultipleConditionEvent multipleConditionEvent = either.getLeft();
|
||||
|
||||
multipleConditionEventMessageHandler.handle(multipleConditionEvent);
|
||||
flowTriggerService.computeExecutionsFromFlowTriggers(multipleConditionEvent.execution(), List.of(multipleConditionEvent.flow()), Optional.of(multipleConditionStorage))
|
||||
.forEach(exec -> {
|
||||
try {
|
||||
executionQueue.emit(exec);
|
||||
} catch (QueueException e) {
|
||||
log.error("Unable to emit the execution {}", exec.getId(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void clusterEventQueue(Either<ClusterEvent, DeserializationException> either) {
|
||||
@@ -456,7 +842,7 @@ public class DefaultExecutor implements Executor {
|
||||
}
|
||||
|
||||
executionDelayStateStore.processExpired(Instant.now(), executionDelay -> {
|
||||
Optional<ExecutorContext> maybeExecutor = executionStateStore.lock(executionDelay.getExecutionId(), execution -> {
|
||||
ExecutorContext result = executionStateStore.lock(executionDelay.getExecutionId(), execution -> {
|
||||
ExecutorContext executor = new ExecutorContext(execution);
|
||||
|
||||
metricRegistry
|
||||
@@ -485,7 +871,7 @@ public class DefaultExecutor implements Executor {
|
||||
}
|
||||
// Handle failed task retries
|
||||
else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESTART_FAILED_TASK)) {
|
||||
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
|
||||
FlowWithSource flow = findFlow(execution);
|
||||
Execution newAttempt = executionService.retryTask(
|
||||
execution,
|
||||
flow,
|
||||
@@ -504,13 +890,15 @@ public class DefaultExecutor implements Executor {
|
||||
executor = executor.withExecution(newExecution, "continueLoop");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
executor = executorService.handleFailedExecutionFromExecutor(executor, e);
|
||||
executor = handleFailedExecutionFromExecutor(executor, e);
|
||||
}
|
||||
|
||||
return executor;
|
||||
});
|
||||
|
||||
maybeExecutor.ifPresent(this::toExecution);
|
||||
if (result != null) {
|
||||
this.toExecution(result);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -520,8 +908,8 @@ public class DefaultExecutor implements Executor {
|
||||
}
|
||||
|
||||
slaMonitorStateStore.processExpired(Instant.now(), slaMonitor -> {
|
||||
Optional<ExecutorContext> maybeExecutor = executionStateStore.lock(slaMonitor.getExecutionId(), execution -> {
|
||||
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
|
||||
ExecutorContext result = executionStateStore.lock(slaMonitor.getExecutionId(), execution -> {
|
||||
FlowWithSource flow = findFlow(execution);
|
||||
Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();
|
||||
if (sla.isEmpty()) {
|
||||
// this can happen in case the flow has been updated and the SLA removed
|
||||
@@ -552,13 +940,15 @@ public class DefaultExecutor implements Executor {
|
||||
.increment();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
executor = executorService.handleFailedExecutionFromExecutor(executor, e);
|
||||
executor = handleFailedExecutionFromExecutor(executor, e);
|
||||
}
|
||||
|
||||
return executor;
|
||||
});
|
||||
|
||||
maybeExecutor.ifPresent(this::toExecution);
|
||||
if (result != null) {
|
||||
this.toExecution(result);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -582,6 +972,31 @@ public class DefaultExecutor implements Executor {
|
||||
this.setState(ServiceState.RUNNING);
|
||||
}
|
||||
|
||||
private Execution fail(Execution message, Exception e) {
|
||||
var failedExecution = message.failedExecutionFromExecutor(e);
|
||||
try {
|
||||
logQueue.emitAsync(failedExecution.getLogs());
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
return failedExecution.getExecution().getState().isFailed() ? failedExecution.getExecution() : failedExecution.getExecution().withState(State.Type.FAILED);
|
||||
}
|
||||
|
||||
private ExecutorContext killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
|
||||
return executionStateStore.lock(executionId, execution -> {
|
||||
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
|
||||
|
||||
// remove it from the queued store if it was queued so it would not be restarted
|
||||
if (execution.getState().isQueued()) {
|
||||
executionQueuedStateStore.remove(execution);
|
||||
}
|
||||
|
||||
Execution killing = executionService.kill(execution, flow, afterKillState);
|
||||
return new ExecutorContext(execution)
|
||||
.withExecution(killing, "joinKillingExecution");
|
||||
});
|
||||
}
|
||||
|
||||
private void toExecution(ExecutorContext executor) {
|
||||
toExecution(executor, false);
|
||||
}
|
||||
@@ -591,7 +1006,7 @@ public class DefaultExecutor implements Executor {
|
||||
boolean shouldSend = false;
|
||||
|
||||
if (executor.getException() != null) {
|
||||
executor = executorService.handleFailedExecutionFromExecutor(executor, executor.getException());
|
||||
executor = handleFailedExecutionFromExecutor(executor, executor.getException());
|
||||
shouldSend = true;
|
||||
} else if (executor.isExecutionUpdated()) {
|
||||
shouldSend = true;
|
||||
@@ -622,8 +1037,7 @@ public class DefaultExecutor implements Executor {
|
||||
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
|
||||
// or from a worker task in an afterExecution block, in this case we need to load the flow
|
||||
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
|
||||
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(executor.getExecution()).orElseThrow();
|
||||
executor = executor.withFlow(flow);
|
||||
executor = executor.withFlow(findFlow(executor.getExecution()));
|
||||
}
|
||||
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
|
||||
|
||||
@@ -742,7 +1156,7 @@ public class DefaultExecutor implements Executor {
|
||||
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
|
||||
.map(f -> f.getFlow())
|
||||
.distinct() // as computeExecutionsFromFlowTriggers is based on flow, we must map FlowWithFlowTrigger to a flow and distinct to avoid multiple execution for the same flow
|
||||
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggerConditions(execution, f).stream())
|
||||
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f), Optional.empty()).stream())
|
||||
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
|
||||
|
||||
// send multiple conditions to the multiple condition queue for later processing
|
||||
@@ -753,6 +1167,23 @@ public class DefaultExecutor implements Executor {
|
||||
.forEach(throwConsumer(multipleCondition -> multipleConditionEventQueue.emit(multipleCondition)));
|
||||
}
|
||||
|
||||
private FlowWithSource findFlow(Execution execution) {
|
||||
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
|
||||
return pluginDefaultService.injectDefaults(flow, execution);
|
||||
}
|
||||
|
||||
private ExecutorContext handleFailedExecutionFromExecutor(ExecutorContext executor, Exception e) {
|
||||
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
|
||||
|
||||
try {
|
||||
logQueue.emitAsync(failedExecutionWithLog.getLogs());
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
|
||||
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
@PreDestroy
|
||||
public void close() {
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.executor;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
@@ -12,5 +11,5 @@ public interface ExecutionStateStore {
|
||||
/**
|
||||
* Lock an execution for processing using the provided function.
|
||||
*/
|
||||
Optional<ExecutorContext> lock(String executionId, Function<Execution, ExecutorContext> function);
|
||||
ExecutorContext lock(String executionId, Function<Execution, ExecutorContext> function);
|
||||
}
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
package io.kestra.executor;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Executor queue message handler that may return an {@link ExecutorContext} for updating the current execution.
|
||||
*
|
||||
* @see MessageHandler
|
||||
*
|
||||
* @param <T> the message type, this message should be tied to execution processing.
|
||||
*/
|
||||
public interface ExecutorMessageHandler<T> {
|
||||
/**
|
||||
* Handle a message then return an {@link ExecutorContext} if the current execution must be updated.
|
||||
*
|
||||
* @implNote implementors usually start by locking the current execution ,then process the message to avoid any concurrency issue.
|
||||
*/
|
||||
Optional<ExecutorContext> handle(T message);
|
||||
}
|
||||
@@ -58,6 +58,9 @@ public class ExecutorService {
|
||||
@Inject
|
||||
private MetricRegistry metricRegistry;
|
||||
|
||||
@Inject
|
||||
private ConditionService conditionService;
|
||||
|
||||
@Inject
|
||||
private LogService logService;
|
||||
|
||||
@@ -95,7 +98,7 @@ public class ExecutorService {
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
private QueueInterface<LogEntry> logQueue;
|
||||
|
||||
private FlowMetaStoreInterface flowExecutorInterface() {
|
||||
protected FlowMetaStoreInterface flowExecutorInterface() {
|
||||
// bean is injected late, so we need to wait
|
||||
if (this.flowExecutorInterface == null) {
|
||||
this.flowExecutorInterface = applicationContext.getBean(FlowMetaStoreInterface.class);
|
||||
@@ -234,18 +237,6 @@ public class ExecutorService {
|
||||
return newExecution;
|
||||
}
|
||||
|
||||
public ExecutorContext handleFailedExecutionFromExecutor(ExecutorContext executor, Exception e) {
|
||||
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
|
||||
|
||||
try {
|
||||
logQueue.emitAsync(failedExecutionWithLog.getLogs());
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
|
||||
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
|
||||
}
|
||||
|
||||
private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution execution, TaskRun parentTaskRun) throws InternalException {
|
||||
Task parent = flow.findTaskByTaskId(parentTaskRun.getTaskId());
|
||||
|
||||
|
||||
@@ -50,147 +50,16 @@ public class FlowTriggerService {
|
||||
.map(io.kestra.plugin.core.trigger.Flow.class::cast);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method computes executions to trigger from flow triggers from a given execution.
|
||||
* It only computes those depending on standard (non-multiple / non-preconditions) conditions, so it must be used
|
||||
* in conjunction with {@link #computeExecutionsFromFlowTriggerPreconditions(Execution, Flow, MultipleConditionStorageInterface)}.
|
||||
*/
|
||||
public List<Execution> computeExecutionsFromFlowTriggerConditions(Execution execution, Flow flow) {
|
||||
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
|
||||
.stream()
|
||||
// we must filter on no multiple conditions and no preconditions to avoid evaluating two times triggers that have standard conditions and multiple conditions
|
||||
.filter(it -> it.getTrigger().getPreconditions() == null && ListUtils.emptyOnNull(it.getTrigger().getConditions()).stream().noneMatch(MultipleCondition.class::isInstance))
|
||||
.toList();
|
||||
|
||||
// short-circuit empty triggers to evaluate
|
||||
if (flowWithFlowTriggers.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// compute all executions to create from flow triggers without taken into account multiple conditions
|
||||
return flowWithFlowTriggers.stream()
|
||||
.map(f -> f.getTrigger().evaluate(
|
||||
Optional.empty(),
|
||||
runContextFactory.of(f.getFlow(), execution),
|
||||
f.getFlow(),
|
||||
execution
|
||||
))
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method computes executions to trigger from flow triggers from a given execution.
|
||||
* It only computes those depending on multiple conditions and preconditions, so it must be used
|
||||
* in conjunction with {@link #computeExecutionsFromFlowTriggerConditions(Execution, Flow)}.
|
||||
*/
|
||||
public List<Execution> computeExecutionsFromFlowTriggerPreconditions(Execution execution, Flow flow, MultipleConditionStorageInterface multipleConditionStorage) {
|
||||
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
|
||||
.stream()
|
||||
// we must filter on multiple conditions or preconditions to avoid evaluating two times triggers that only have standard conditions
|
||||
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getPreconditions() != null || ListUtils.emptyOnNull(flowWithFlowTrigger.getTrigger().getConditions()).stream().anyMatch(MultipleCondition.class::isInstance))
|
||||
.toList();
|
||||
|
||||
// short-circuit empty triggers to evaluate
|
||||
if (flowWithFlowTriggers.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = flowWithFlowTriggers.stream()
|
||||
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
|
||||
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
multipleConditionStorage.getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
multipleCondition
|
||||
)
|
||||
)
|
||||
)
|
||||
// avoid evaluating expired windows (for ex for daily time window or deadline)
|
||||
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
|
||||
.toList();
|
||||
|
||||
// evaluate multiple conditions
|
||||
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
|
||||
Map<String, Boolean> results = f.getMultipleCondition()
|
||||
.getConditions()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(e -> new AbstractMap.SimpleEntry<>(
|
||||
e.getKey(),
|
||||
conditionService.isValid(e.getValue(), f.getFlow(), execution)
|
||||
))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
return Map.entry(f, f.getMultipleConditionWindow().with(results));
|
||||
})
|
||||
.filter(e -> !e.getValue().getResults().isEmpty())
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// persist results
|
||||
multipleConditionStorage.save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
|
||||
|
||||
// compute all executions to create from flow triggers now that multiple conditions storage is populated
|
||||
List<Execution> executions = flowWithFlowTriggers.stream()
|
||||
// will evaluate conditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage
|
||||
)
|
||||
)
|
||||
// will evaluate preconditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger().getPreconditions(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage
|
||||
)
|
||||
)
|
||||
.map(f -> f.getTrigger().evaluate(
|
||||
Optional.of(multipleConditionStorage),
|
||||
runContextFactory.of(f.getFlow(), execution),
|
||||
f.getFlow(),
|
||||
execution
|
||||
))
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.toList();
|
||||
|
||||
// purge fulfilled or expired multiple condition windows
|
||||
Stream.concat(
|
||||
multipleConditionWindowsByFlow.entrySet().stream()
|
||||
.map(e -> Map.entry(
|
||||
e.getKey().getMultipleCondition(),
|
||||
e.getValue()
|
||||
))
|
||||
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
|
||||
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
|
||||
)
|
||||
.map(Map.Entry::getValue),
|
||||
multipleConditionStorage.expired(execution.getTenantId()).stream()
|
||||
).forEach(multipleConditionStorage::delete);
|
||||
|
||||
return executions;
|
||||
}
|
||||
|
||||
private List<FlowWithFlowTrigger> computeFlowTriggers(Execution execution, Flow flow) {
|
||||
if (
|
||||
public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<? extends Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
|
||||
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
|
||||
// prevent recursive flow triggers
|
||||
!flowService.removeUnwanted(flow, execution) ||
|
||||
// filter out Test Executions
|
||||
execution.getKind() != null ||
|
||||
// ensure flow & triggers are enabled
|
||||
flow.isDisabled() || flow instanceof FlowWithException ||
|
||||
flow.getTriggers() == null || flow.getTriggers().isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))
|
||||
.filter(flow -> flowService.removeUnwanted(flow, execution))
|
||||
// filter out Test Executions
|
||||
.filter(flow -> execution.getKind() == null)
|
||||
// ensure flow & triggers are enabled
|
||||
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
|
||||
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
||||
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
|
||||
// filter on the execution state the flow listen to
|
||||
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getStates().contains(execution.getState().getCurrent()))
|
||||
// validate flow triggers conditions excluding multiple conditions
|
||||
@@ -205,14 +74,96 @@ public class FlowTriggerService {
|
||||
execution
|
||||
)
|
||||
)).toList();
|
||||
}
|
||||
|
||||
private Stream<MultipleCondition> flowTriggerMultipleConditions(FlowWithFlowTrigger flowWithFlowTrigger) {
|
||||
Stream<MultipleCondition> legacyMultipleConditions = ListUtils.emptyOnNull(flowWithFlowTrigger.getTrigger().getConditions()).stream()
|
||||
.filter(MultipleCondition.class::isInstance)
|
||||
.map(MultipleCondition.class::cast);
|
||||
Stream<io.kestra.plugin.core.trigger.Flow.Preconditions> preconditions = Optional.ofNullable(flowWithFlowTrigger.getTrigger().getPreconditions()).stream();
|
||||
return Stream.concat(legacyMultipleConditions, preconditions);
|
||||
// short-circuit empty triggers to evaluate
|
||||
if (validTriggersBeforeMultipleConditionEval.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = null;
|
||||
if (multipleConditionStorage.isPresent()) {
|
||||
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
|
||||
.flatMap(flowWithFlowTrigger -> Optional.ofNullable(flowWithFlowTrigger.getTrigger().getPreconditions()).stream()
|
||||
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
multipleCondition
|
||||
)
|
||||
)
|
||||
)
|
||||
// avoid evaluating expired windows (for ex for daily time window or deadline)
|
||||
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
|
||||
.toList();
|
||||
|
||||
// evaluate multiple conditions
|
||||
multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
|
||||
Map<String, Boolean> results = f.getMultipleCondition()
|
||||
.getConditions()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(e -> new AbstractMap.SimpleEntry<>(
|
||||
e.getKey(),
|
||||
conditionService.isValid(e.getValue(), f.getFlow(), execution)
|
||||
))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
return Map.entry(f, f.getMultipleConditionWindow().with(results));
|
||||
})
|
||||
.filter(e -> !e.getValue().getResults().isEmpty())
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// persist results
|
||||
multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
|
||||
}
|
||||
|
||||
// compute all executions to create from flow triggers now that multiple conditions storage is populated
|
||||
List<Execution> executions = validTriggersBeforeMultipleConditionEval.stream()
|
||||
// will evaluate conditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage.orElse(null)
|
||||
)
|
||||
)
|
||||
// will evaluate preconditions
|
||||
.filter(flowWithFlowTrigger ->
|
||||
conditionService.isValid(
|
||||
flowWithFlowTrigger.getTrigger().getPreconditions(),
|
||||
flowWithFlowTrigger.getFlow(),
|
||||
execution,
|
||||
multipleConditionStorage.orElse(null)
|
||||
)
|
||||
)
|
||||
.map(f -> f.getTrigger().evaluate(
|
||||
multipleConditionStorage,
|
||||
runContextFactory.of(f.getFlow(), execution),
|
||||
f.getFlow(),
|
||||
execution
|
||||
))
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.toList();
|
||||
|
||||
if (multipleConditionStorage.isPresent()) {
|
||||
// purge fulfilled or expired multiple condition windows
|
||||
Stream.concat(
|
||||
multipleConditionWindowsByFlow.entrySet().stream()
|
||||
.map(e -> Map.entry(
|
||||
e.getKey().getMultipleCondition(),
|
||||
e.getValue()
|
||||
))
|
||||
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
|
||||
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
|
||||
)
|
||||
.map(Map.Entry::getValue),
|
||||
multipleConditionStorage.get().expired(execution.getTenantId()).stream()
|
||||
).forEach(multipleConditionStorage.get()::delete);
|
||||
}
|
||||
|
||||
return executions;
|
||||
}
|
||||
|
||||
@AllArgsConstructor
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.executor;
|
||||
|
||||
/**
|
||||
* Executor queue message handler.
|
||||
*
|
||||
* @see ExecutorMessageHandler
|
||||
*
|
||||
* @param <T> the message type, this message should not be tied to execution processing.
|
||||
*/
|
||||
public interface MessageHandler<T> {
|
||||
/**
|
||||
* Handle a message.
|
||||
*/
|
||||
void handle(T message);
|
||||
}
|
||||
@@ -1,278 +0,0 @@
|
||||
package io.kestra.executor.handler;
|
||||
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.TaskRunAttempt;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.sla.ExecutionMonitoringSLA;
|
||||
import io.kestra.core.models.flows.sla.SLA;
|
||||
import io.kestra.core.models.flows.sla.SLAMonitor;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.services.WorkerGroupService;
|
||||
import io.kestra.core.trace.Tracer;
|
||||
import io.kestra.core.trace.TracerFactory;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.TruthUtils;
|
||||
import io.kestra.executor.*;
|
||||
import io.kestra.plugin.core.flow.WorkingDirectory;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ExecutionEventMessageHandler implements ExecutorMessageHandler<ExecutionEvent> {
|
||||
@Inject
|
||||
private ExecutionStateStore executionStateStore;
|
||||
@Inject
|
||||
private ExecutionQueuedStateStore executionQueuedStateStore;
|
||||
@Inject
|
||||
private ExecutionDelayStateStore executionDelayStateStore;
|
||||
@Inject
|
||||
private SLAMonitorStateStore slaMonitorStateStore;
|
||||
@Inject
|
||||
private ConcurrencyLimitStateStore concurrencyLimitStateStore;
|
||||
|
||||
@Inject
|
||||
private ExecutorService executorService;
|
||||
@Inject
|
||||
private WorkerGroupService workerGroupService;
|
||||
|
||||
@Inject
|
||||
private FlowMetaStoreInterface flowMetaStore;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
|
||||
private QueueInterface<WorkerJob> workerJobQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.SUBFLOWEXECUTIONRESULT_NAMED)
|
||||
private QueueInterface<SubflowExecutionResult> subflowExecutionResultQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
private QueueInterface<Execution> executionQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
private QueueInterface<LogEntry> logQueue;
|
||||
|
||||
private final Tracer tracer;
|
||||
|
||||
@Inject
|
||||
public ExecutionEventMessageHandler(TracerFactory tracerFactory) {
|
||||
this.tracer = tracerFactory.getTracer(DefaultExecutor.class, "EXECUTOR");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ExecutorContext> handle(ExecutionEvent message) {
|
||||
return executionStateStore.lock(message.executionId(), execution -> tracer.inCurrentContext(
|
||||
execution,
|
||||
FlowId.uidWithoutRevision(execution),
|
||||
() -> {
|
||||
try {
|
||||
final FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
|
||||
ExecutorContext executor = new ExecutorContext(execution, flow);
|
||||
|
||||
// schedule it for later if needed
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
|
||||
ExecutionDelay executionDelay = ExecutionDelay.builder()
|
||||
.executionId(executor.getExecution().getId())
|
||||
.date(execution.getScheduleDate())
|
||||
.state(State.Type.RUNNING)
|
||||
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
|
||||
.build();
|
||||
executionDelayStateStore.save(executionDelay);
|
||||
return executor;
|
||||
}
|
||||
|
||||
// create an SLA monitor if needed
|
||||
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty(flow.getSla())) {
|
||||
List<SLAMonitor> monitors = flow.getSla().stream()
|
||||
.filter(ExecutionMonitoringSLA.class::isInstance)
|
||||
.map(ExecutionMonitoringSLA.class::cast)
|
||||
.map(sla -> SLAMonitor.builder()
|
||||
.executionId(execution.getId())
|
||||
.slaId(((SLA) sla).getId())
|
||||
.deadline(execution.getState().getStartDate().plus(sla.getDuration()))
|
||||
.build()
|
||||
)
|
||||
.toList();
|
||||
monitors.forEach(monitor -> slaMonitorStateStore.save(monitor));
|
||||
}
|
||||
|
||||
// handle concurrency limit, we need to use a different queue to be sure that execution running
|
||||
// are processed sequentially so inside a queue with no parallelism
|
||||
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null) {
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(executor.getFlow().getTenantId())
|
||||
.namespace(executor.getFlow().getNamespace())
|
||||
.flowId(executor.getFlow().getId())
|
||||
.execution(executor.getExecution())
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.CREATED)
|
||||
.build();
|
||||
|
||||
ExecutionRunning processed = concurrencyLimitStateStore.countThenProcess(flow, (txContext, concurrencyLimit) -> {
|
||||
ExecutionRunning computed = executorService.processExecutionRunning(flow, concurrencyLimit.getRunning(), executionRunning.withExecution(execution)); // be sure that the execution running contains the latest value of the execution
|
||||
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
|
||||
return Pair.of(computed, concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1));
|
||||
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
|
||||
executionQueuedStateStore.save(txContext, ExecutionQueued.fromExecutionRunning(computed));
|
||||
}
|
||||
return Pair.of(computed, concurrencyLimit);
|
||||
});
|
||||
|
||||
// if the execution is queued or terminated due to concurrency limit, we stop here
|
||||
if (processed.getExecution().getState().isTerminated() || processed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
|
||||
return executor.withExecution(processed.getExecution(), "handleConcurrencyLimit");
|
||||
}
|
||||
}
|
||||
|
||||
// handle execution changed SLA
|
||||
executor = executorService.handleExecutionChangedSLA(executor);
|
||||
|
||||
// process the execution
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, executor);
|
||||
}
|
||||
executor = executorService.process(executor);
|
||||
|
||||
if (!executor.getNexts().isEmpty()) {
|
||||
executor.withExecution(
|
||||
executorService.onNexts(executor.getExecution(), executor.getNexts()),
|
||||
"onNexts"
|
||||
);
|
||||
}
|
||||
|
||||
// worker task
|
||||
if (!executor.getWorkerTasks().isEmpty()) {
|
||||
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
|
||||
executor
|
||||
.getWorkerTasks()
|
||||
.forEach(throwConsumer(workerTask -> {
|
||||
try {
|
||||
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
|
||||
} else {
|
||||
if (workerTask.getTask().isSendToWorkerTask()) {
|
||||
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
|
||||
String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey())))
|
||||
.orElse(null);
|
||||
if (workerTask.getTask() instanceof WorkingDirectory) {
|
||||
// WorkingDirectory is a flowable so it will be moved to RUNNING a few lines under
|
||||
workerJobQueue.emit(workerGroupKey, workerTask);
|
||||
} else {
|
||||
TaskRun taskRun = workerTask.getTaskRun().withState(State.Type.SUBMITTED);
|
||||
workerJobQueue.emit(workerGroupKey, workerTask.withTaskRun(taskRun));
|
||||
workerTaskResults.add(new WorkerTaskResult(taskRun));
|
||||
}
|
||||
}
|
||||
/// flowable attempt state transition to running
|
||||
if (workerTask.getTask().isFlowable()) {
|
||||
List<TaskRunAttempt> attempts = Optional.ofNullable(workerTask.getTaskRun().getAttempts())
|
||||
.map(ArrayList::new)
|
||||
.orElseGet(ArrayList::new);
|
||||
|
||||
|
||||
attempts.add(
|
||||
TaskRunAttempt.builder()
|
||||
.state(new State().withState(State.Type.RUNNING))
|
||||
.build()
|
||||
);
|
||||
|
||||
TaskRun updatedTaskRun = workerTask.getTaskRun()
|
||||
.withAttempts(attempts)
|
||||
.withState(State.Type.RUNNING);
|
||||
|
||||
workerTaskResults.add(new WorkerTaskResult(updatedTaskRun));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
|
||||
workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", workerTask.getTask().getId(), e.getMessage(), e);
|
||||
}
|
||||
}));
|
||||
|
||||
try {
|
||||
executorService.addWorkerTaskResults(executor, workerTaskResults);
|
||||
} catch (InternalException e) {
|
||||
log.error("Unable to add a worker task result to the execution", e);
|
||||
}
|
||||
}
|
||||
|
||||
// subflow execution results
|
||||
if (!executor.getSubflowExecutionResults().isEmpty()) {
|
||||
executor.getSubflowExecutionResults()
|
||||
.forEach(throwConsumer(subflowExecutionResult -> subflowExecutionResultQueue.emit(subflowExecutionResult)));
|
||||
}
|
||||
|
||||
// schedulerDelay
|
||||
if (!executor.getExecutionDelays().isEmpty()) {
|
||||
executor.getExecutionDelays()
|
||||
.forEach(executionDelay -> executionDelayStateStore.save(executionDelay));
|
||||
}
|
||||
|
||||
// subflow executions
|
||||
if (!executor.getSubflowExecutions().isEmpty()) {
|
||||
executor.getSubflowExecutions().forEach(throwConsumer(subflowExecution -> {
|
||||
Execution subExecution = subflowExecution.getExecution();
|
||||
String msg = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace());
|
||||
|
||||
log.info(msg);
|
||||
|
||||
logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun(), subflowExecution.getExecution().getKind()).toBuilder()
|
||||
.level(Level.INFO)
|
||||
.message(msg)
|
||||
.timestamp(subflowExecution.getParentTaskRun().getState().getStartDate())
|
||||
.thread(Thread.currentThread().getName())
|
||||
.build()
|
||||
);
|
||||
|
||||
executionQueue.emit(subflowExecution.getExecution());
|
||||
}));
|
||||
}
|
||||
|
||||
return executor;
|
||||
} catch (QueueException e) {
|
||||
try {
|
||||
Execution failedExecution = fail(execution, e);
|
||||
this.executionQueue.emit(failedExecution);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the execution {}", execution.getId(), ex);
|
||||
}
|
||||
Span.current().recordException(e).setStatus(StatusCode.ERROR);
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
private Execution fail(Execution message, Exception e) {
|
||||
var failedExecution = message.failedExecutionFromExecutor(e);
|
||||
try {
|
||||
logQueue.emitAsync(failedExecution.getLogs());
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
return failedExecution.getExecution().getState().isFailed() ? failedExecution.getExecution() : failedExecution.getExecution().withState(State.Type.FAILED);
|
||||
}
|
||||
}
|
||||
@@ -1,110 +0,0 @@
|
||||
package io.kestra.executor.handler;
|
||||
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKilled;
|
||||
import io.kestra.core.models.executions.ExecutionKilledExecution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.ExecutionQueuedStateStore;
|
||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.executor.ExecutionStateStore;
|
||||
import io.kestra.executor.ExecutorContext;
|
||||
import io.kestra.executor.ExecutorService;
|
||||
import io.kestra.executor.ExecutorMessageHandler;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ExecutionKilledExecutionMessageHandler implements ExecutorMessageHandler<ExecutionKilledExecution> {
|
||||
@Inject
|
||||
private ExecutorService executorService;
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
@Inject
|
||||
private ExecutionStateStore executionStateStore;
|
||||
@Inject
|
||||
private ExecutionQueuedStateStore executionQueuedStateStore;
|
||||
|
||||
@Inject
|
||||
private MetricRegistry metricRegistry;
|
||||
|
||||
@Inject
|
||||
private FlowMetaStoreInterface flowMetaStore;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.KILL_NAMED)
|
||||
private QueueInterface<ExecutionKilled> killQueue;
|
||||
|
||||
@Override
|
||||
public Optional<ExecutorContext> handle(ExecutionKilledExecution message) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_EXECUTOR_KILLED_COUNT, MetricRegistry.METRIC_EXECUTOR_KILLED_COUNT_DESCRIPTION, metricRegistry.tags(message))
|
||||
.increment();
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, message);
|
||||
}
|
||||
|
||||
// Immediately fire the event in EXECUTED state to notify the Workers to kill
|
||||
// any remaining tasks for that executing regardless of if the execution exist or not.
|
||||
// Note, that this event will be a noop if all tasks for that execution are already killed or completed.
|
||||
try {
|
||||
killQueue.emit(ExecutionKilledExecution
|
||||
.builder()
|
||||
.executionId(message.getExecutionId())
|
||||
.isOnKillCascade(false)
|
||||
.state(ExecutionKilled.State.EXECUTED)
|
||||
.tenantId(message.getTenantId())
|
||||
.build()
|
||||
);
|
||||
} catch (QueueException e) {
|
||||
log.error("Unable to kill the execution {}", message.getExecutionId(), e);
|
||||
}
|
||||
|
||||
Optional<ExecutorContext> maybeExecutor = killingOrAfterKillState(message.getExecutionId(), Optional.ofNullable(message.getExecutionState()));
|
||||
|
||||
// Check whether kill event should be propagated to downstream executions.
|
||||
// By default, always propagate the ExecutionKill to sub-flows (for backward compatibility).
|
||||
Boolean isOnKillCascade = Optional.ofNullable(message.getIsOnKillCascade()).orElse(true);
|
||||
if (isOnKillCascade) {
|
||||
executionService
|
||||
.killSubflowExecutions(message.getTenantId(), message.getExecutionId())
|
||||
.doOnNext(executionKilled -> {
|
||||
try {
|
||||
killQueue.emit(executionKilled);
|
||||
} catch (QueueException e) {
|
||||
log.error("Unable to kill the execution {}", executionKilled.getExecutionId(), e);
|
||||
}
|
||||
})
|
||||
.blockLast();
|
||||
}
|
||||
|
||||
return maybeExecutor;
|
||||
}
|
||||
|
||||
private Optional<ExecutorContext> killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
|
||||
return executionStateStore.lock(executionId, execution -> {
|
||||
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
|
||||
|
||||
// remove it from the queued store if it was queued so it would not be restarted
|
||||
if (execution.getState().isQueued()) {
|
||||
executionQueuedStateStore.remove(execution);
|
||||
}
|
||||
|
||||
Execution killing = executionService.kill(execution, flow, afterKillState);
|
||||
return new ExecutorContext(execution)
|
||||
.withExecution(killing, "joinKillingExecution");
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
package io.kestra.executor.handler;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.ExecutionEvent;
|
||||
import io.kestra.core.runners.ExecutionEventType;
|
||||
import io.kestra.executor.ExecutionStateStore;
|
||||
import io.kestra.executor.ExecutorContext;
|
||||
import io.kestra.executor.ExecutorMessageHandler;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ExecutionMessageHandler implements ExecutorMessageHandler<Execution> {
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
private QueueInterface<ExecutionEvent> executionEventQueue;
|
||||
|
||||
@Inject
|
||||
private ExecutionStateStore executionStateStore;
|
||||
|
||||
@Override
|
||||
public Optional<ExecutorContext> handle(Execution message) {
|
||||
try {
|
||||
executionEventQueue.emit(new ExecutionEvent(message, ExecutionEventType.CREATED));
|
||||
return Optional.empty();
|
||||
} catch (QueueException e) {
|
||||
// If we cannot send the execution event, we fail the execution
|
||||
return executionStateStore.lock(message.getId(), execution -> {
|
||||
try {
|
||||
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
|
||||
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.TERMINATED);
|
||||
this.executionEventQueue.emit(event);
|
||||
return new ExecutorContext(failed);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the execution {}", execution.getId(), ex);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
package io.kestra.executor.handler;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.MultipleConditionEvent;
|
||||
import io.kestra.executor.FlowTriggerService;
|
||||
import io.kestra.executor.MessageHandler;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class MultipleConditionEventMessageHandler implements MessageHandler<MultipleConditionEvent> {
|
||||
@Inject
|
||||
private FlowTriggerService flowTriggerService;
|
||||
|
||||
@Inject
|
||||
private MultipleConditionStorageInterface multipleConditionStorage;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
private QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Override
|
||||
public void handle(MultipleConditionEvent message) {
|
||||
flowTriggerService.computeExecutionsFromFlowTriggerPreconditions(message.execution(), message.flow(), multipleConditionStorage)
|
||||
.forEach(exec -> {
|
||||
try {
|
||||
executionQueue.emit(exec);
|
||||
} catch (QueueException e) {
|
||||
log.error("Unable to emit the execution {}", exec.getId(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
package io.kestra.executor.handler;
|
||||
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.executor.*;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class SubflowExecutionEndMessageHandler implements MessageHandler<SubflowExecutionEnd> {
|
||||
@Inject
|
||||
private ExecutorService executorService;
|
||||
|
||||
@Inject
|
||||
private ExecutionStateStore executionStateStore;
|
||||
|
||||
@Inject
|
||||
private FlowMetaStoreInterface flowMetaStore;
|
||||
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.SUBFLOWEXECUTIONRESULT_NAMED)
|
||||
private QueueInterface<SubflowExecutionResult> subflowExecutionResultQueue;
|
||||
|
||||
@Override
|
||||
public void handle(SubflowExecutionEnd message) {
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, message);
|
||||
}
|
||||
|
||||
executionStateStore.lock(message.getParentExecutionId(), execution -> {
|
||||
if (execution == null) {
|
||||
throw new IllegalStateException("Execution state don't exist for " + message.getParentExecutionId() + ", receive " + message);
|
||||
}
|
||||
|
||||
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
|
||||
try {
|
||||
ExecutableTask<?> executableTask = (ExecutableTask<?>) flow.findTaskByTaskId(message.getTaskId());
|
||||
if (!executableTask.waitForExecution()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
TaskRun taskRun = execution.findTaskRunByTaskRunId(message.getTaskRunId()).withState(message.getState()).withOutputs(message.getOutputs());
|
||||
FlowInterface childFlow = flowMetaStore.findByExecution(message.getChildExecution()).orElseThrow();
|
||||
RunContext runContext = runContextFactory.of(
|
||||
childFlow,
|
||||
(Task) executableTask,
|
||||
message.getChildExecution(),
|
||||
taskRun
|
||||
);
|
||||
|
||||
SubflowExecutionResult subflowExecutionResult = ExecutableUtils.subflowExecutionResultFromChildExecution(runContext, childFlow, message.getChildExecution(), executableTask, taskRun);
|
||||
if (subflowExecutionResult != null) {
|
||||
try {
|
||||
this.subflowExecutionResultQueue.emit(subflowExecutionResult);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the subflow execution result", ex);
|
||||
}
|
||||
}
|
||||
} catch (InternalException e) {
|
||||
log.error("Unable to process the subflow execution end", e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,121 +0,0 @@
|
||||
package io.kestra.executor.handler;
|
||||
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.Variables;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.services.VariablesService;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.kestra.executor.ExecutionStateStore;
|
||||
import io.kestra.executor.ExecutorContext;
|
||||
import io.kestra.executor.ExecutorService;
|
||||
import io.kestra.executor.ExecutorMessageHandler;
|
||||
import io.kestra.plugin.core.flow.ForEachItem;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class SubflowExecutionResultMessageHandler implements ExecutorMessageHandler<SubflowExecutionResult> {
|
||||
@Inject
|
||||
private ExecutorService executorService;
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
@Inject
|
||||
private MetricRegistry metricRegistry;
|
||||
@Inject
|
||||
private VariablesService variablesService;
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
@Inject
|
||||
private FlowMetaStoreInterface flowMetaStore;
|
||||
|
||||
@Inject
|
||||
private ExecutionStateStore executionStateStore;
|
||||
|
||||
@Override
|
||||
public Optional<ExecutorContext> handle(SubflowExecutionResult message) {
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, message);
|
||||
}
|
||||
|
||||
return executionStateStore.lock(message.getParentTaskRun().getExecutionId(), execution -> {
|
||||
ExecutorContext current = new ExecutorContext(execution);
|
||||
|
||||
if (execution.hasTaskRunJoinable(message.getParentTaskRun())) { // TODO if we remove this check, we can avoid adding 'iteration' on the 'isSame()' method
|
||||
try {
|
||||
FlowWithSource flow = flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow();
|
||||
Task task = flow.findTaskByTaskId(message.getParentTaskRun().getTaskId());
|
||||
TaskRun taskRun;
|
||||
|
||||
// iterative tasks
|
||||
if (task instanceof ForEachItem.ForEachItemExecutable forEachItem) {
|
||||
// For iterative tasks, we need to get the taskRun from the execution,
|
||||
// move it to the state of the child flow, and merge the outputs.
|
||||
// This is important to avoid races such as RUNNING that arrives after the first SUCCESS/FAILED.
|
||||
RunContext runContext = runContextFactory.of(flow, task, current.getExecution(), message.getParentTaskRun());
|
||||
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId());
|
||||
if (taskRun.getState().getCurrent() != message.getState()) {
|
||||
taskRun = taskRun.withState(message.getState());
|
||||
}
|
||||
Map<String, Object> outputs = MapUtils.deepMerge(taskRun.getOutputs(), message.getParentTaskRun().getOutputs());
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
|
||||
taskRun = taskRun.withOutputs(variables);
|
||||
taskRun = ExecutableUtils.manageIterations(
|
||||
runContext.storage(),
|
||||
taskRun,
|
||||
current.getExecution(),
|
||||
forEachItem.getTransmitFailed(),
|
||||
forEachItem.isAllowFailure(),
|
||||
forEachItem.isAllowWarning()
|
||||
);
|
||||
} else {
|
||||
taskRun = message.getParentTaskRun();
|
||||
}
|
||||
|
||||
Execution newExecution = current.getExecution().withTaskRun(taskRun);
|
||||
|
||||
// If the worker task result is killed, we must check if it has a parents to also kill them if not already done.
|
||||
// Running flowable tasks that have child tasks running in the worker will be killed thanks to that.
|
||||
if (taskRun.getState().getCurrent() == State.Type.KILLED && taskRun.getParentTaskRunId() != null) {
|
||||
newExecution = executionService.killParentTaskruns(taskRun, newExecution);
|
||||
}
|
||||
|
||||
current = current.withExecution(newExecution, "joinSubflowExecutionResult");
|
||||
|
||||
// send metrics on parent taskRun terminated
|
||||
if (taskRun.getState().isTerminated()) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION, metricRegistry.tags(message))
|
||||
.increment();
|
||||
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION, metricRegistry.tags(message))
|
||||
.record(taskRun.getState().getDuration());
|
||||
|
||||
log.trace("TaskRun terminated: {}", taskRun);
|
||||
}
|
||||
|
||||
// join worker result
|
||||
return current;
|
||||
} catch (InternalException e) {
|
||||
return executorService.handleFailedExecutionFromExecutor(current, e);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,51 +0,0 @@
|
||||
package io.kestra.executor.handler;
|
||||
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||
import io.kestra.core.runners.WorkerTaskResult;
|
||||
import io.kestra.executor.ExecutionStateStore;
|
||||
import io.kestra.executor.ExecutorContext;
|
||||
import io.kestra.executor.ExecutorService;
|
||||
import io.kestra.executor.ExecutorMessageHandler;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class WorkerTaskResultMessageHandler implements ExecutorMessageHandler<WorkerTaskResult> {
|
||||
@Inject
|
||||
private ExecutionStateStore executionStateStore;
|
||||
|
||||
@Inject
|
||||
private ExecutorService executorService;
|
||||
|
||||
@Inject
|
||||
private FlowMetaStoreInterface flowMetaStore;
|
||||
|
||||
@Override
|
||||
public Optional<ExecutorContext> handle(WorkerTaskResult message) {
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, message);
|
||||
}
|
||||
|
||||
return executionStateStore.lock(message.getTaskRun().getExecutionId(), execution -> {
|
||||
ExecutorContext current = new ExecutorContext(execution);
|
||||
|
||||
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
|
||||
try {
|
||||
// process worker task result
|
||||
executorService.addWorkerTaskResult(current, () -> flowMetaStore.findByExecutionThenInjectDefaults(execution).orElseThrow(), message);
|
||||
// join worker result
|
||||
return current;
|
||||
} catch (InternalException e) {
|
||||
return executorService.handleFailedExecutionFromExecutor(current, e);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -25,7 +25,8 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class FlowTriggerServiceTest {
|
||||
private static final List<Label> EMPTY_LABELS = List.of();
|
||||
public static final List<Label> EMPTY_LABELS = List.of();
|
||||
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
|
||||
|
||||
@Inject
|
||||
private TestRunContextFactory runContextFactory;
|
||||
@@ -55,27 +56,14 @@ class FlowTriggerServiceTest {
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
simpleFlowExecution,
|
||||
flowWithFlowTrigger
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
|
||||
assertThat(resultingExecutionsToRun.getFirst().getFlowId()).isEqualTo(flowWithFlowTrigger.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
void computeExecutionsFromFlowTriggers_none() {
|
||||
var simpleFlow = aSimpleFlow();
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
simpleFlowExecution,
|
||||
simpleFlow
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).isEmpty();
|
||||
assertThat(resultingExecutionsToRun.get(0).getFlowId()).isEqualTo(flowWithFlowTrigger.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -93,9 +81,10 @@ class FlowTriggerServiceTest {
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
simpleFlowExecution,
|
||||
flowWithFlowTrigger
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
@@ -120,9 +109,10 @@ class FlowTriggerServiceTest {
|
||||
.kind(ExecutionKind.TEST)
|
||||
.build();
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggerConditions(
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
simpleFlowExecutionComingFromATest,
|
||||
flowWithFlowTrigger
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
|
||||
@@ -7,6 +7,8 @@ dependencies {
|
||||
implementation project(":jdbc")
|
||||
implementation project(":executor")
|
||||
implementation project(":scheduler")
|
||||
implementation project(":queue")
|
||||
implementation project(":queue-jdbc")
|
||||
|
||||
implementation("io.micronaut.sql:micronaut-jooq")
|
||||
runtimeOnly("com.h2database:h2")
|
||||
@@ -16,4 +18,5 @@ dependencies {
|
||||
testImplementation project(':executor').sourceSets.test.output
|
||||
testImplementation project(':storage-local')
|
||||
testImplementation project(':tests')
|
||||
testImplementation project(':queue').sourceSets.test.output
|
||||
}
|
||||
|
||||
13
jdbc-h2/src/main/resources/migrations/h2/V3_4__queue_v2.sql
Normal file
13
jdbc-h2/src/main/resources/migrations/h2/V3_4__queue_v2.sql
Normal file
@@ -0,0 +1,13 @@
|
||||
CREATE TABLE IF NOT EXISTS queue (
|
||||
"offset" BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
"type" INT NOT NULL,
|
||||
"routing_key" VARCHAR(50),
|
||||
"key" VARCHAR(250) NOT NULL,
|
||||
"value" TEXT NOT NULL,
|
||||
"created" TIMESTAMP NOT NULL,
|
||||
UNIQUE("type", "routing_key", "offset")
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS queue_type__offset ON queue ("type", "offset");
|
||||
CREATE INDEX IF NOT EXISTS queue_created ON queue ("created");
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.queue.h2;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.queue.AbstractBroadcastQueueTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@KestraTest(environments = {"test", "queue"})
|
||||
class H2BroadcastQueueTest extends AbstractBroadcastQueueTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package io.kestra.queue.h2;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.queue.AbstractDispatchQueueTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@KestraTest(environments = {"test", "queue"})
|
||||
class H2DispatchQueueTest extends AbstractDispatchQueueTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.queue.h2;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.queue.AbstractKeyedDispatchQueueTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@KestraTest(environments = {"test", "queue"})
|
||||
class H2KeyedDispatchQueueTest extends AbstractKeyedDispatchQueueTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
4
jdbc-h2/src/test/resources/application-queue.yml
Normal file
4
jdbc-h2/src/test/resources/application-queue.yml
Normal file
@@ -0,0 +1,4 @@
|
||||
kestra:
|
||||
server-type: WEBSERVER
|
||||
queue:
|
||||
type: jdbc
|
||||
@@ -7,6 +7,8 @@ dependencies {
|
||||
implementation project(":jdbc")
|
||||
implementation project(":executor")
|
||||
implementation project(":scheduler")
|
||||
implementation project(":queue")
|
||||
implementation project(":queue-jdbc")
|
||||
|
||||
implementation("io.micronaut.sql:micronaut-jooq")
|
||||
runtimeOnly("com.mysql:mysql-connector-j")
|
||||
@@ -18,5 +20,6 @@ dependencies {
|
||||
testImplementation project(':scheduler').sourceSets.test.output
|
||||
testImplementation project(':storage-local')
|
||||
testImplementation project(':tests')
|
||||
testImplementation project(':queue').sourceSets.test.output
|
||||
testImplementation("io.micronaut.validation:micronaut-validation") // MysqlServiceLivenessCoordinatorTest fail to init without that
|
||||
}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
CREATE TABLE IF NOT EXISTS queue (
|
||||
`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
`type` INT NOT NULL,
|
||||
`routing_key` CHAR(50),
|
||||
`key` VARCHAR(250) NOT NULL,
|
||||
`value` JSON NOT NULL,
|
||||
`created` TIMESTAMP NOT NULL,
|
||||
UNIQUE(`type`, `routing_key`, `offset`),
|
||||
INDEX `ix_type__offset` (`type`, `offset`),
|
||||
INDEX `ix_created` (`created`)
|
||||
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.queue.mysql;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.queue.AbstractBroadcastQueueTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@KestraTest(environments = {"test", "queue"})
|
||||
class MysqlBroadcastQueueTest extends AbstractBroadcastQueueTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.queue.mysql;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.queue.AbstractDispatchQueueTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@KestraTest(environments = {"test", "queue"})
|
||||
class MysqlDispatchQueueTest extends AbstractDispatchQueueTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.queue.mysql;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.queue.AbstractKeyedDispatchQueueTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@KestraTest(environments = {"test", "queue"})
|
||||
class MysqlKeyedDispatchQueueTest extends AbstractKeyedDispatchQueueTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
4
jdbc-mysql/src/test/resources/application-queue.yml
Normal file
4
jdbc-mysql/src/test/resources/application-queue.yml
Normal file
@@ -0,0 +1,4 @@
|
||||
kestra:
|
||||
server-type: WEBSERVER
|
||||
queue:
|
||||
type: jdbc
|
||||
@@ -7,6 +7,8 @@ dependencies {
|
||||
implementation project(":jdbc")
|
||||
implementation project(":executor")
|
||||
implementation project(":scheduler")
|
||||
implementation project(":queue")
|
||||
implementation project(":queue-jdbc")
|
||||
|
||||
implementation("io.micronaut.sql:micronaut-jooq")
|
||||
runtimeOnly("org.postgresql:postgresql")
|
||||
@@ -17,5 +19,6 @@ dependencies {
|
||||
testImplementation project(':executor').sourceSets.test.output
|
||||
testImplementation project(':storage-local')
|
||||
testImplementation project(':tests')
|
||||
testImplementation project(':queue').sourceSets.test.output
|
||||
testImplementation("io.micronaut.validation:micronaut-validation") // PostgresServiceLivenessCoordinatorTest fail to init without that
|
||||
}
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
CREATE TABLE IF NOT EXISTS queue (
|
||||
"offset" SERIAL PRIMARY KEY,
|
||||
type INT NOT NULL,
|
||||
routing_key VARCHAR(250),
|
||||
key VARCHAR(250) NOT NULL,
|
||||
value JSONB NOT NULL,
|
||||
created TIMESTAMPTZ NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS queues_type__key__offset ON queues (type, routing_key, "offset");
|
||||
CREATE INDEX IF NOT EXISTS queues_type__offset ON queues (type, "offset");
|
||||
CREATE INDEX IF NOT EXISTS queues_created ON queues ("created");
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.queue.postgres;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.queue.AbstractBroadcastQueueTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@KestraTest(environments = {"test", "queue"})
|
||||
class PostgresBroadcastQueueTest extends AbstractBroadcastQueueTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.queue.postgres;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.queue.AbstractDispatchQueueTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@KestraTest(environments = {"test", "queue"})
|
||||
class PostgresDispatchQueueTest extends AbstractDispatchQueueTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.queue.postgres;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.queue.AbstractKeyedDispatchQueueTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@KestraTest(environments = {"test", "queue"})
|
||||
class PostgresKeyedDispatchQueueTest extends AbstractKeyedDispatchQueueTest {
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@BeforeEach
|
||||
protected void init() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
}
|
||||
}
|
||||
4
jdbc-postgres/src/test/resources/application-queue.yml
Normal file
4
jdbc-postgres/src/test/resources/application-queue.yml
Normal file
@@ -0,0 +1,4 @@
|
||||
kestra:
|
||||
server-type: WEBSERVER
|
||||
queue:
|
||||
type: jdbc
|
||||
17
jdbc/src/main/java/io/kestra/jdbc/JdbcQueueItem.java
Normal file
17
jdbc/src/main/java/io/kestra/jdbc/JdbcQueueItem.java
Normal file
@@ -0,0 +1,17 @@
|
||||
package io.kestra.jdbc;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Getter
|
||||
public class JdbcQueueItem {
|
||||
Long offset;
|
||||
Integer type;
|
||||
String routingKey;
|
||||
String key;
|
||||
String value;
|
||||
Instant created;
|
||||
}
|
||||
@@ -30,6 +30,12 @@ public class JdbcTableConfigsFactory {
|
||||
return new InstantiableJdbcTableConfig("queues", null, "queues");
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Named("queue")
|
||||
public InstantiableJdbcTableConfig queue() {
|
||||
return new InstantiableJdbcTableConfig("queue", JdbcQueueItem.class, "queue");
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Named("flows")
|
||||
public InstantiableJdbcTableConfig flows() {
|
||||
|
||||
@@ -983,7 +983,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ExecutorContext> lock(String executionId, Function<Execution, ExecutorContext> function) {
|
||||
public ExecutorContext lock(String executionId, Function<Execution, ExecutorContext> function) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
@@ -1000,17 +1000,17 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
|
||||
// not ready for now, skip and wait for a first state
|
||||
if (execution.isEmpty()) {
|
||||
return Optional.empty();
|
||||
return null;
|
||||
}
|
||||
|
||||
ExecutorContext executor = function.apply(execution.get());
|
||||
|
||||
if (executor != null) {
|
||||
this.jdbcRepository.persist(executor.getExecution(), context, null);
|
||||
return Optional.of(executor);
|
||||
return executor;
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
7
queue-jdbc/build.gradle
Normal file
7
queue-jdbc/build.gradle
Normal file
@@ -0,0 +1,7 @@
|
||||
dependencies {
|
||||
implementation project(':core')
|
||||
implementation project(':queue')
|
||||
|
||||
implementation project(':jdbc')
|
||||
implementation("io.micronaut.sql:micronaut-jooq")
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
package io.kestra.queue.jdbc;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.queue.AbstractQueue;
|
||||
import io.kestra.queue.GenericEvent;
|
||||
import io.kestra.queue.QueueUtils;
|
||||
import io.kestra.queue.jdbc.client.JdbcQueueClient;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.AbstractMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractJdbcQueue<T extends GenericEvent> extends AbstractQueue<T> {
|
||||
protected final JdbcQueueClient jdbcQueueClient;
|
||||
|
||||
public AbstractJdbcQueue(Class<T> cls, QueueUtils queueUtils, JdbcQueueClient jdbcQueueClient) {
|
||||
super(cls, queueUtils);
|
||||
this.jdbcQueueClient = jdbcQueueClient;
|
||||
}
|
||||
|
||||
public void internalEmit(@Nullable String key, T message) throws QueueException {
|
||||
String serialize = this.queueUtils.serialize(this.cls, message);
|
||||
|
||||
jdbcQueueClient.publish(this.queueName(), key, message.key(), serialize);
|
||||
}
|
||||
|
||||
public void internalEmit(@Nullable String key, List<T> messages) throws QueueException {
|
||||
jdbcQueueClient.publish(
|
||||
this.queueName(),
|
||||
key,
|
||||
messages
|
||||
.stream()
|
||||
.map(throwFunction(e -> {
|
||||
String serialize = this.queueUtils.serialize(this.cls, e);
|
||||
|
||||
return new AbstractMap.SimpleEntry<>(
|
||||
e.key(),
|
||||
serialize
|
||||
);
|
||||
}))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package io.kestra.queue.jdbc;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.queue.BroadcastEvent;
|
||||
import io.kestra.queue.BroadcastQueueInterface;
|
||||
import io.kestra.queue.QueueSubscriber;
|
||||
import io.kestra.queue.QueueUtils;
|
||||
import io.kestra.queue.jdbc.client.JdbcBroadcastSubscriber;
|
||||
import io.kestra.queue.jdbc.client.JdbcQueueClient;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
public class JdbcBroadcastQueue<T extends BroadcastEvent> extends AbstractJdbcQueue<T> implements BroadcastQueueInterface<T> {
|
||||
public JdbcBroadcastQueue(Class<T> cls, QueueUtils queueUtils, JdbcQueueClient jdbcQueueClient) {
|
||||
super(cls, queueUtils, jdbcQueueClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(T message) throws QueueException {
|
||||
this.internalEmit(null, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(List<T> messages) throws QueueException {
|
||||
this.internalEmit(null, messages);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueSubscriber<T> subscriber() {
|
||||
return new JdbcBroadcastSubscriber<>(
|
||||
cls,
|
||||
queueUtils,
|
||||
jdbcQueueClient,
|
||||
queueName()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package io.kestra.queue.jdbc;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.queue.DispatchEvent;
|
||||
import io.kestra.queue.DispatchQueueInterface;
|
||||
import io.kestra.queue.QueueSubscriber;
|
||||
import io.kestra.queue.QueueUtils;
|
||||
import io.kestra.queue.jdbc.client.JdbcDispatchSubscriber;
|
||||
import io.kestra.queue.jdbc.client.JdbcQueueClient;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
public class JdbcDispatchQueue<T extends DispatchEvent> extends AbstractJdbcQueue<T> implements DispatchQueueInterface<T> {
|
||||
public JdbcDispatchQueue(Class<T> cls, QueueUtils queueUtils, JdbcQueueClient jdbcQueueClient) {
|
||||
super(cls, queueUtils, jdbcQueueClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(T message) throws QueueException {
|
||||
this.internalEmit(null, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(List<T> messages) throws QueueException {
|
||||
this.internalEmit(null, messages);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueSubscriber<T> subscriber() {
|
||||
return new JdbcDispatchSubscriber<>(
|
||||
cls,
|
||||
queueUtils,
|
||||
jdbcQueueClient,
|
||||
queueName(),
|
||||
null
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package io.kestra.queue.jdbc;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.queue.KeyedDispatchEvent;
|
||||
import io.kestra.queue.KeyedDispatchQueueInterface;
|
||||
import io.kestra.queue.QueueSubscriber;
|
||||
import io.kestra.queue.QueueUtils;
|
||||
import io.kestra.queue.jdbc.client.JdbcDispatchSubscriber;
|
||||
import io.kestra.queue.jdbc.client.JdbcQueueClient;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
public class JdbcKeyedDispatchQueue<T extends KeyedDispatchEvent> extends AbstractJdbcQueue<T> implements KeyedDispatchQueueInterface<T> {
|
||||
public JdbcKeyedDispatchQueue(Class<T> cls, QueueUtils queueUtils, JdbcQueueClient JdbcQueueClient) {
|
||||
super(cls, queueUtils, JdbcQueueClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(String key, T message) throws QueueException {
|
||||
this.internalEmit(key, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(String key, List<T> messages) throws QueueException {
|
||||
this.internalEmit(key, messages);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueSubscriber<T> subscriber(String key) throws QueueException {
|
||||
return new JdbcDispatchSubscriber<>(
|
||||
cls,
|
||||
queueUtils,
|
||||
jdbcQueueClient,
|
||||
queueName(),
|
||||
key
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.kestra.queue.jdbc;
|
||||
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
@Documented
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target({ElementType.PACKAGE, ElementType.TYPE})
|
||||
@Requires(property = "kestra.queue.type", value = "jdbc")
|
||||
public @interface JdbcQueueEnabled {
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package io.kestra.queue.jdbc;
|
||||
|
||||
import io.kestra.queue.*;
|
||||
import io.kestra.queue.jdbc.client.JdbcQueueClient;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Context;
|
||||
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Context
|
||||
@JdbcQueueEnabled
|
||||
public class JdbcQueueFactory {
|
||||
@Inject
|
||||
private QueueUtils queueUtils;
|
||||
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
private JdbcQueueClient jdbcQueueClient;
|
||||
|
||||
@PostConstruct
|
||||
void init() {
|
||||
QueueFactory
|
||||
.listAllEvent(this.getClass().getClassLoader(), DispatchEvent.class)
|
||||
.forEach(event -> applicationContext.registerSingleton(
|
||||
DispatchQueueInterface.class,
|
||||
new JdbcDispatchQueue<>(event, queueUtils, jdbcQueueClient),
|
||||
Qualifiers.byTypeArguments(event),
|
||||
true
|
||||
));
|
||||
|
||||
QueueFactory
|
||||
.listAllEvent(this.getClass().getClassLoader(), KeyedDispatchEvent.class)
|
||||
.forEach(event -> applicationContext.registerSingleton(
|
||||
KeyedDispatchQueueInterface.class,
|
||||
new JdbcKeyedDispatchQueue<>(event, queueUtils, jdbcQueueClient),
|
||||
Qualifiers.byTypeArguments(event),
|
||||
true
|
||||
));
|
||||
|
||||
QueueFactory
|
||||
.listAllEvent(this.getClass().getClassLoader(), BroadcastEvent.class)
|
||||
.forEach(event -> applicationContext.registerSingleton(
|
||||
BroadcastQueueInterface.class,
|
||||
new JdbcBroadcastQueue<>(event, queueUtils, jdbcQueueClient),
|
||||
Qualifiers.byTypeArguments(event),
|
||||
true
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package io.kestra.queue.jdbc.client;
|
||||
|
||||
import io.kestra.queue.GenericEvent;
|
||||
import io.kestra.queue.QueueUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Slf4j
|
||||
public class JdbcBroadcastSubscriber<T extends GenericEvent> extends JdbcSubscriber<T> {
|
||||
public AtomicReference<Long> maxOffset = null;
|
||||
|
||||
public JdbcBroadcastSubscriber(
|
||||
Class<T> cls,
|
||||
QueueUtils queueUtils,
|
||||
JdbcQueueClient jdbcQueueClient,
|
||||
String queueName
|
||||
) {
|
||||
super(cls, queueUtils, jdbcQueueClient, queueName);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Integer pool(JdbcQueueClient.MessageConsumer<String, Exception> messageConsumer) {
|
||||
return this.jdbcQueueClient.subscribeBroadcast(this.queueName, maxOffset, messageConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init() {
|
||||
maxOffset = this.jdbcQueueClient.subscribeBroadcastMaxOffset(this.queueName);
|
||||
|
||||
this.markReady();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package io.kestra.queue.jdbc.client;
|
||||
|
||||
import io.kestra.queue.GenericEvent;
|
||||
import io.kestra.queue.QueueUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class JdbcDispatchSubscriber<T extends GenericEvent> extends JdbcSubscriber<T> {
|
||||
private final String routingKey;
|
||||
|
||||
public JdbcDispatchSubscriber(
|
||||
Class<T> cls,
|
||||
QueueUtils queueUtils,
|
||||
JdbcQueueClient jdbcQueueClient,
|
||||
String queueName,
|
||||
String routingKey
|
||||
) {
|
||||
super(cls, queueUtils, jdbcQueueClient, queueName);
|
||||
|
||||
this.routingKey = routingKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Integer pool(JdbcQueueClient.MessageConsumer<String, Exception> messageConsumer) {
|
||||
return this.jdbcQueueClient.subscribeDispatch(this.queueName, this.routingKey, messageConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init() {
|
||||
this.markReady();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,198 @@
|
||||
package io.kestra.queue.jdbc.client;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.UnsupportedMessageException;
|
||||
import io.kestra.jdbc.AbstractJdbcRepository;
|
||||
import io.kestra.jdbc.JdbcQueueItem;
|
||||
import io.kestra.jdbc.JooqDSLContextWrapper;
|
||||
import io.kestra.jdbc.runner.JdbcQueueConfiguration;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Getter;
|
||||
import org.jooq.*;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.exception.DataException;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.zip.CRC32;
|
||||
|
||||
@Singleton
|
||||
public class JdbcQueueClient {
|
||||
private static final Map<String, Integer> QUEUE_NAME_CRC32 = new ConcurrentHashMap<>();
|
||||
private static final List<Field<Object>> COLUMNS = List.of(
|
||||
io.kestra.jdbc.repository.AbstractJdbcRepository.field("type"),
|
||||
io.kestra.jdbc.repository.AbstractJdbcRepository.field("routing_key"),
|
||||
io.kestra.jdbc.repository.AbstractJdbcRepository.field("key"),
|
||||
io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"),
|
||||
io.kestra.jdbc.repository.AbstractJdbcRepository.field("created")
|
||||
);
|
||||
|
||||
@Inject
|
||||
@Named("queue")
|
||||
private AbstractJdbcRepository<JdbcQueueItem> jdbcRepository;
|
||||
|
||||
@Inject
|
||||
private JooqDSLContextWrapper dslContextWrapper;
|
||||
|
||||
@Inject
|
||||
@Getter
|
||||
private JdbcQueueConfiguration configuration;
|
||||
|
||||
public static Integer queueNameToType(String value) {
|
||||
return QUEUE_NAME_CRC32.computeIfAbsent(value, s -> {
|
||||
CRC32 crc32 = new CRC32();
|
||||
crc32.update(value.getBytes());
|
||||
|
||||
return (int) crc32.getValue();
|
||||
});
|
||||
}
|
||||
|
||||
public void publish(String queue, @Nullable String routingKey, String key, String value) throws QueueException {
|
||||
this.publish(queue, routingKey, Map.of(key, value));
|
||||
}
|
||||
|
||||
public void publish(String queue, @Nullable String routingKey, Map<String, String> values) throws QueueException {
|
||||
try {
|
||||
dslContextWrapper.transaction(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
InsertValuesStepN<Record> insert = context
|
||||
.insertInto(jdbcRepository.getTable())
|
||||
.columns(COLUMNS);
|
||||
|
||||
for (Map.Entry<String, String> entry : values.entrySet()) {
|
||||
insert = insert.values(
|
||||
queueNameToType(queue),
|
||||
routingKey,
|
||||
entry.getKey(),
|
||||
JSONB.valueOf(entry.getValue()),
|
||||
Instant.now()
|
||||
);
|
||||
}
|
||||
|
||||
insert.execute();
|
||||
});
|
||||
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
|
||||
// Postgres refuses to store JSONB with the '\0000' codepoint as it has no textual representation.
|
||||
// We try to detect that and fail with a specific exception so the Worker can recover from it.
|
||||
if (e.getMessage() != null && e.getMessage().contains("ERROR: unsupported Unicode escape sequence")) {
|
||||
throw new UnsupportedMessageException(e.getMessage(), e);
|
||||
}
|
||||
throw new QueueException("Unable to emit a message to the queue", e);
|
||||
}
|
||||
}
|
||||
|
||||
public Integer subscribeDispatch(String queue, @Nullable String routingKey, MessageConsumer<String, Exception> consumer) {
|
||||
return dslContextWrapper.transactionResult(conf -> {
|
||||
DSLContext context = DSL.using(conf);
|
||||
|
||||
SelectConditionStep<Record> select = context.select(DSL.asterisk())
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(io.kestra.jdbc.repository.AbstractJdbcRepository.field("type").eq(queueNameToType(queue)));
|
||||
|
||||
if (routingKey != null) {
|
||||
select = select.and(io.kestra.jdbc.repository.AbstractJdbcRepository.field("routing_key").eq(routingKey));
|
||||
}
|
||||
|
||||
List<JdbcQueueItem> queueItems = select
|
||||
.orderBy(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset").asc())
|
||||
.limit(configuration.pollSize())
|
||||
.forUpdate()
|
||||
.skipLocked()
|
||||
.fetchInto(JdbcQueueItem.class);
|
||||
|
||||
if (!queueItems.isEmpty()) {
|
||||
List<Long> processedItems = queueItems
|
||||
.stream()
|
||||
.map(queueItem -> {
|
||||
Exception exception = consumer.apply(queueItem.getValue());
|
||||
return exception == null ? queueItem.getOffset() : null;
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
.toList();
|
||||
|
||||
if (!processedItems.isEmpty()) {
|
||||
DeleteConditionStep<Record> delete = context.delete(this.jdbcRepository.getTable())
|
||||
.where(io.kestra.jdbc.repository.AbstractJdbcRepository.field("type").eq(queueNameToType(queue)))
|
||||
.and(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset", Long.class).in(processedItems));
|
||||
|
||||
if (routingKey != null) {
|
||||
delete = delete.and(io.kestra.jdbc.repository.AbstractJdbcRepository.field("routing_key").eq(routingKey));
|
||||
}
|
||||
|
||||
delete.execute();
|
||||
}
|
||||
}
|
||||
|
||||
return queueItems.size();
|
||||
});
|
||||
}
|
||||
|
||||
public AtomicReference<Long> subscribeBroadcastMaxOffset(String queue) {
|
||||
AtomicReference<Long> maxOffset = new AtomicReference<>();
|
||||
|
||||
Long initialOffset = dslContextWrapper.transactionResult(conf -> {
|
||||
DSLContext context = DSL.using(conf);
|
||||
|
||||
return context.select(DSL.max(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset")))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(io.kestra.jdbc.repository.AbstractJdbcRepository.field("type").eq(queueNameToType(queue)))
|
||||
.fetchAny("max", Long.class);
|
||||
});
|
||||
|
||||
if (initialOffset != null) {
|
||||
maxOffset.set(initialOffset);
|
||||
}
|
||||
|
||||
return maxOffset;
|
||||
}
|
||||
|
||||
public Integer subscribeBroadcast(String queue, AtomicReference<Long> maxOffset, MessageConsumer<String, Exception> consumer) {
|
||||
return dslContextWrapper.transactionResult(conf -> {
|
||||
DSLContext context = DSL.using(conf);
|
||||
|
||||
SelectConditionStep<Record> select = context.select(DSL.asterisk())
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(io.kestra.jdbc.repository.AbstractJdbcRepository.field("type").eq(queueNameToType(queue)));
|
||||
|
||||
if (maxOffset.get() != null) {
|
||||
select = select.and(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset").gt(maxOffset.get()));
|
||||
}
|
||||
|
||||
List<JdbcQueueItem> queueItems = select
|
||||
.orderBy(io.kestra.jdbc.repository.AbstractJdbcRepository.field("offset").asc())
|
||||
.limit(configuration.pollSize())
|
||||
.forUpdate()
|
||||
.skipLocked()
|
||||
.fetchInto(JdbcQueueItem.class);
|
||||
|
||||
if (!queueItems.isEmpty()) {
|
||||
queueItems
|
||||
.forEach(queueItem -> {
|
||||
consumer.apply(queueItem.getValue());
|
||||
});
|
||||
|
||||
queueItems
|
||||
.stream()
|
||||
.map(JdbcQueueItem::getOffset)
|
||||
.max(Long::compareTo)
|
||||
.ifPresent(maxOffset::set);
|
||||
}
|
||||
|
||||
return queueItems.size();
|
||||
});
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface MessageConsumer <T, E extends Exception> {
|
||||
@Nullable E apply(T t);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
package io.kestra.queue.jdbc.client;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.Rethrow;
|
||||
import io.kestra.jdbc.runner.JdbcQueueConfiguration;
|
||||
import io.kestra.queue.AbstractSubscriber;
|
||||
import io.kestra.queue.GenericEvent;
|
||||
import io.kestra.queue.QueueSubscriber;
|
||||
import io.kestra.queue.QueueUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
|
||||
@Slf4j
|
||||
public abstract class JdbcSubscriber<T extends GenericEvent> extends AbstractSubscriber<T> {
|
||||
protected final JdbcQueueClient jdbcQueueClient;
|
||||
protected final String queueName;
|
||||
|
||||
public JdbcSubscriber(
|
||||
Class<T> cls,
|
||||
QueueUtils queueUtils,
|
||||
JdbcQueueClient jdbcQueueClient,
|
||||
String queueName
|
||||
) {
|
||||
super(cls, queueUtils);
|
||||
|
||||
this.jdbcQueueClient = jdbcQueueClient;
|
||||
this.queueName = queueName;
|
||||
}
|
||||
|
||||
protected abstract Integer pool(JdbcQueueClient.MessageConsumer<String, Exception> messageConsumer);
|
||||
|
||||
protected abstract void init();
|
||||
|
||||
public QueueSubscriber<T> subscribe(Rethrow.ConsumerChecked<Either<T, DeserializationException>, Exception> consumer) throws QueueException {
|
||||
this.queueUtils.execute(throwRunnable(() -> {
|
||||
List<JdbcQueueConfiguration.Step> steps = this.jdbcQueueClient.getConfiguration().computeSteps();
|
||||
ZonedDateTime lastPoll = ZonedDateTime.now();
|
||||
Duration sleepDuration;
|
||||
|
||||
this.init();
|
||||
|
||||
while (this.isRunning() || this.isPaused()) {
|
||||
this.waitIfPaused();
|
||||
|
||||
Integer count = this.pool(message -> {
|
||||
try {
|
||||
Either<T, DeserializationException> event = this.queueUtils.deserialize(this.cls, message);
|
||||
consumer.accept(event);
|
||||
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
log.warn(
|
||||
"[{}] message {} failed and was resubmitted to active queue, error: {}",
|
||||
cls.getSimpleName(),
|
||||
message,
|
||||
e.getMessage()
|
||||
);
|
||||
|
||||
return e;
|
||||
}
|
||||
});
|
||||
|
||||
// define sleep time before next poll, could be immediate if we have messages to process
|
||||
if (count > 0) {
|
||||
lastPoll = ZonedDateTime.now();
|
||||
sleepDuration = this.jdbcQueueClient.getConfiguration().minPollInterval();
|
||||
|
||||
if (this.jdbcQueueClient.getConfiguration().immediateRepoll()) {
|
||||
sleepDuration = Duration.ofSeconds(0);
|
||||
} else if (count.equals(jdbcQueueClient.getConfiguration().pollSize())) {
|
||||
// Note: this provides better latency on high throughput: when Kestra is a top capacity,
|
||||
// it will not do a sleep and immediately poll again.
|
||||
// We can even have better latency at even higher latency by continuing for positive count,
|
||||
// but at higher database cost.
|
||||
// Current impl balance database cost with latency.
|
||||
sleepDuration = Duration.ofSeconds(0);
|
||||
}
|
||||
} else {
|
||||
ZonedDateTime finalLastPoll = lastPoll;
|
||||
// get all poll steps which duration is less than the duration between last poll and now
|
||||
List<JdbcQueueConfiguration.Step> selectedSteps = steps.stream()
|
||||
.takeWhile(step -> finalLastPoll.plus(step.switchInterval()).compareTo(ZonedDateTime.now()) < 0)
|
||||
.toList();
|
||||
// then select the last one (longest) or minPoll if all are beyond while means we are under the first interval
|
||||
sleepDuration = selectedSteps.isEmpty() ? jdbcQueueClient.getConfiguration().minPollInterval() : selectedSteps.getLast().pollInterval();
|
||||
}
|
||||
|
||||
// keep thread running
|
||||
try {
|
||||
Thread.sleep(sleepDuration);
|
||||
} catch (Exception e) {
|
||||
throw new QueueException("[" + this.cls.getSimpleName() + "] failed sleep", e);
|
||||
}
|
||||
}
|
||||
|
||||
this.markEnd();
|
||||
}));
|
||||
|
||||
Await.until(this::isRunning);
|
||||
|
||||
return this;
|
||||
}
|
||||
}
|
||||
7
queue/build.gradle
Normal file
7
queue/build.gradle
Normal file
@@ -0,0 +1,7 @@
|
||||
configurations {
|
||||
implementation.extendsFrom(micronaut)
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation project(':core')
|
||||
}
|
||||
32
queue/src/main/java/io/kestra/queue/AbstractQueue.java
Normal file
32
queue/src/main/java/io/kestra/queue/AbstractQueue.java
Normal file
@@ -0,0 +1,32 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import com.google.common.base.CaseFormat;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
public abstract class AbstractQueue<T extends GenericEvent> {
|
||||
protected final Class<T> cls;
|
||||
protected final QueueUtils queueUtils;
|
||||
|
||||
public AbstractQueue(Class<T> cls, QueueUtils queueUtils) {
|
||||
this.cls = cls;
|
||||
this.queueUtils = queueUtils;
|
||||
}
|
||||
|
||||
protected String queueNameSeparator() {
|
||||
return "__";
|
||||
}
|
||||
|
||||
protected String queueName() {
|
||||
return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, this.cls.getSimpleName());
|
||||
}
|
||||
|
||||
protected String queueName(@Nullable String key) {
|
||||
if (key == null) {
|
||||
return this.queueName();
|
||||
}
|
||||
|
||||
return this.queueName() +
|
||||
this.queueNameSeparator() +
|
||||
CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_UNDERSCORE, key);
|
||||
}
|
||||
}
|
||||
146
queue/src/main/java/io/kestra/queue/AbstractSubscriber.java
Normal file
146
queue/src/main/java/io/kestra/queue/AbstractSubscriber.java
Normal file
@@ -0,0 +1,146 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractSubscriber<T extends GenericEvent> extends AbstractQueue<T> implements QueueSubscriber<T> {
|
||||
private final CountDownLatch stopped = new CountDownLatch(1);
|
||||
private final ReentrantLock pauseLock = new ReentrantLock();
|
||||
private final Condition unpaused = pauseLock.newCondition();
|
||||
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
|
||||
|
||||
public AbstractSubscriber(Class<T> cls, QueueUtils queueUtils) {
|
||||
super(cls, queueUtils);
|
||||
}
|
||||
|
||||
protected void waitIfPaused() throws QueueException {
|
||||
// return immediately if not paused.
|
||||
if (!this.state.get().equals(State.PAUSED)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// lock and wait until resumed
|
||||
pauseLock.lock();
|
||||
try {
|
||||
while (this.state.get().equals(State.PAUSED)) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] paused, waiting to resume", this.cls.getSimpleName());
|
||||
}
|
||||
|
||||
try {
|
||||
unpaused.await(); // Wait until resume() signals
|
||||
} catch (InterruptedException e) {
|
||||
throw new QueueException("[" + this.cls.getSimpleName() + "] interrupted while paused", e);
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] resumed", this.cls.getSimpleName());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
pauseLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean isRunning() {
|
||||
return this.state.get() == State.RUNNING;
|
||||
}
|
||||
|
||||
protected boolean isPaused() {
|
||||
return this.state.get() == State.PAUSED;
|
||||
}
|
||||
|
||||
private boolean changeState(State expected, State newState) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] change state requested {} to {}", this.cls.getSimpleName(), expected, newState);
|
||||
}
|
||||
|
||||
if (this.state.compareAndSet(expected, newState)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
throw new IllegalStateException("[" + this.cls.getSimpleName() + "] illegal state change to " + newState + " from " + newState + ", current state is " + this.state.get());
|
||||
}
|
||||
|
||||
protected void markReady() {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] Mark ready received", this.cls.getSimpleName());
|
||||
}
|
||||
|
||||
this.changeState(State.STOPPED, State.RUNNING);
|
||||
}
|
||||
|
||||
public void pause() {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] pause received", this.cls.getSimpleName());
|
||||
}
|
||||
|
||||
this.changeState(State.RUNNING, State.PAUSED);
|
||||
}
|
||||
|
||||
public void resume() {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] resume received", this.cls.getSimpleName());
|
||||
}
|
||||
|
||||
pauseLock.lock();
|
||||
try {
|
||||
if (this.changeState(State.PAUSED, State.RUNNING)) {
|
||||
unpaused.signalAll();
|
||||
}
|
||||
} finally {
|
||||
pauseLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected void markEnd() {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] mark end received", this.cls.getSimpleName());
|
||||
}
|
||||
|
||||
if (this.isRunning()) {
|
||||
this.changeState(State.RUNNING, State.STOPPED);
|
||||
}
|
||||
|
||||
this.stopped.countDown();
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] close received", this.cls.getSimpleName());
|
||||
}
|
||||
|
||||
// in case it's paused and blocked
|
||||
if (this.isPaused()) {
|
||||
resume();
|
||||
}
|
||||
|
||||
// already stopped
|
||||
try {
|
||||
this.changeState(State.RUNNING, State.STOPPED);
|
||||
} catch (IllegalStateException ignored) {
|
||||
return;
|
||||
}
|
||||
|
||||
// wait for the queue to be stooped
|
||||
try {
|
||||
stopped.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("[{}}] interrupted while waiting to be stopped.", this.cls.getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
public enum State {
|
||||
RUNNING,
|
||||
PAUSED,
|
||||
STOPPED
|
||||
}
|
||||
}
|
||||
4
queue/src/main/java/io/kestra/queue/BroadcastEvent.java
Normal file
4
queue/src/main/java/io/kestra/queue/BroadcastEvent.java
Normal file
@@ -0,0 +1,4 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
public interface BroadcastEvent extends GenericEvent {
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface BroadcastQueueInterface <T extends BroadcastEvent> extends GenericQueueInterface<T> {
|
||||
void emit(T message) throws QueueException;
|
||||
|
||||
void emit(List<T> messages) throws QueueException;
|
||||
|
||||
QueueSubscriber<T> subscriber() throws QueueException;
|
||||
}
|
||||
5
queue/src/main/java/io/kestra/queue/DispatchEvent.java
Normal file
5
queue/src/main/java/io/kestra/queue/DispatchEvent.java
Normal file
@@ -0,0 +1,5 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
public interface DispatchEvent extends GenericEvent {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface DispatchQueueInterface <T extends DispatchEvent> extends GenericQueueInterface<T> {
|
||||
void emit(T message) throws QueueException;
|
||||
|
||||
void emit(List<T> messages) throws QueueException;
|
||||
|
||||
QueueSubscriber<T> subscriber() throws QueueException;
|
||||
}
|
||||
5
queue/src/main/java/io/kestra/queue/GenericEvent.java
Normal file
5
queue/src/main/java/io/kestra/queue/GenericEvent.java
Normal file
@@ -0,0 +1,5 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
public interface GenericEvent {
|
||||
String key();
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
public interface GenericQueueInterface<T extends GenericEvent> {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
public interface KeyedDispatchEvent extends DispatchEvent {
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface KeyedDispatchQueueInterface<T extends KeyedDispatchEvent> extends GenericQueueInterface<T> {
|
||||
void emit(String key, T message) throws QueueException;
|
||||
|
||||
void emit(String key, List<T> messages) throws QueueException;
|
||||
|
||||
QueueSubscriber<T> subscriber(String key) throws QueueException;
|
||||
}
|
||||
24
queue/src/main/java/io/kestra/queue/QueueConfiguration.java
Normal file
24
queue/src/main/java/io/kestra/queue/QueueConfiguration.java
Normal file
@@ -0,0 +1,24 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.micronaut.context.annotation.ConfigurationProperties;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
@ConfigurationProperties(value = "kestra.queue")
|
||||
public class QueueConfiguration{
|
||||
|
||||
@NotNull
|
||||
String type;
|
||||
|
||||
@Nullable
|
||||
MessageProtection messageProtection;
|
||||
|
||||
@Getter
|
||||
@ConfigurationProperties("message-protection")
|
||||
public static class MessageProtection{
|
||||
Boolean enabled = false;
|
||||
Integer limit;
|
||||
}
|
||||
}
|
||||
55
queue/src/main/java/io/kestra/queue/QueueFactory.java
Normal file
55
queue/src/main/java/io/kestra/queue/QueueFactory.java
Normal file
@@ -0,0 +1,55 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.core.beans.BeanIntrospectionReference;
|
||||
import io.micronaut.core.io.service.ServiceDefinition;
|
||||
import io.micronaut.core.io.service.SoftServiceLoader;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
@Factory
|
||||
public class QueueFactory {
|
||||
public final static String QUEUE_EXECUTOR = "queueExecutor";
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public static <T extends GenericEvent> List<Class<T>> listAllEvent(ClassLoader classLoader, Class<T> eventClass) {
|
||||
final SoftServiceLoader<BeanIntrospectionReference> definitions = SoftServiceLoader.load(
|
||||
BeanIntrospectionReference.class,
|
||||
classLoader
|
||||
);
|
||||
|
||||
List<Class<T>> list = new ArrayList<>();
|
||||
|
||||
for (ServiceDefinition<BeanIntrospectionReference> definition : definitions) {
|
||||
if (definition.isPresent()) {
|
||||
final BeanIntrospectionReference ref = definition.load();
|
||||
Class beanType = ref.getBeanType();
|
||||
|
||||
if (Modifier.isAbstract(beanType.getModifiers())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (eventClass.isAssignableFrom(beanType)) {
|
||||
list.add((Class<T>) beanType);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
@Named(QUEUE_EXECUTOR)
|
||||
@Singleton
|
||||
@Inject
|
||||
public ExecutorService executorsUtils(ExecutorsUtils executorsUtils, QueueConfiguration queueConfiguration) {
|
||||
return executorsUtils.cachedThreadPool("queue-" + queueConfiguration.getType());
|
||||
}
|
||||
}
|
||||
32
queue/src/main/java/io/kestra/queue/QueueSubscriber.java
Normal file
32
queue/src/main/java/io/kestra/queue/QueueSubscriber.java
Normal file
@@ -0,0 +1,32 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.Rethrow;
|
||||
|
||||
public interface QueueSubscriber<T extends GenericEvent> {
|
||||
/**
|
||||
* Start a subscription.
|
||||
*
|
||||
* @param consumer the consumer that will process messages
|
||||
* @return self
|
||||
* @throws QueueException if subscription fails
|
||||
*/
|
||||
QueueSubscriber<T> subscribe(Rethrow.ConsumerChecked<Either<T, DeserializationException>, Exception> consumer) throws QueueException;
|
||||
|
||||
/**
|
||||
* Pauses this subscriber.
|
||||
*/
|
||||
void pause();
|
||||
|
||||
/**
|
||||
* Resumes this subscriber if currently paused.
|
||||
*/
|
||||
void resume();
|
||||
|
||||
/**
|
||||
* close this subscriber.
|
||||
*/
|
||||
void close();
|
||||
}
|
||||
107
queue/src/main/java/io/kestra/queue/QueueUtils.java
Normal file
107
queue/src/main/java/io/kestra/queue/QueueUtils.java
Normal file
@@ -0,0 +1,107 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.queues.MessageTooBigException;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.Either;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
@Slf4j
|
||||
@Singleton
|
||||
public class QueueUtils {
|
||||
private static final ObjectMapper MAPPER = JacksonMapper.ofJson(false).copy();
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactory.QUEUE_EXECUTOR)
|
||||
@Getter
|
||||
ExecutorService executorService;
|
||||
|
||||
@Inject
|
||||
protected QueueConfiguration queueConfiguration;
|
||||
|
||||
@Inject
|
||||
private MetricRegistry metricRegistry;
|
||||
|
||||
public void execute(Runnable runnable) {
|
||||
this.executorService.execute(runnable);
|
||||
}
|
||||
|
||||
public <T extends GenericEvent> String serialize(Class<T> cls, T message) throws QueueException {
|
||||
try {
|
||||
String serialize = MAPPER.writeValueAsString(message);
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("[{}] produced message: {}", cls.getSimpleName(), serialize);
|
||||
}
|
||||
|
||||
int byteLength = serialize.getBytes(StandardCharsets.UTF_8).length;
|
||||
|
||||
if (queueConfiguration.getMessageProtection() != null && queueConfiguration.getMessageProtection().getEnabled() && byteLength >= queueConfiguration.getMessageProtection().getLimit()) {
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_QUEUE_BIG_MESSAGE_COUNT, MetricRegistry.METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION, MetricRegistry.TAG_CLASS_NAME, cls.getSimpleName()).increment();
|
||||
|
||||
// we let terminated execution messages to go through anyway
|
||||
if (!(message instanceof Execution execution) || !execution.getState().isTerminated()) {
|
||||
throw new MessageTooBigException("Message of size " + byteLength + " has exceeded the configured limit of " + queueConfiguration.getMessageProtection().getLimit());
|
||||
}
|
||||
}
|
||||
|
||||
return serialize;
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new QueueException("Failed to produce '" + message.getClass() + "'", e);
|
||||
} finally {
|
||||
String[] tags = {MetricRegistry.TAG_QUEUE_TYPE, cls.getSimpleName()};
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_QUEUE_PRODUCE_COUNT, MetricRegistry.METRIC_QUEUE_PRODUCE_COUNT_DESCRIPTION, tags)
|
||||
.increment();
|
||||
}
|
||||
}
|
||||
|
||||
public <T extends GenericEvent> Either<T, DeserializationException> deserialize(Class<T> cls, byte[] record) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("[{}] received message: {}", cls.getSimpleName(), new String(record));
|
||||
}
|
||||
|
||||
try {
|
||||
return Either.left(MAPPER.readValue(record, cls));
|
||||
} catch (IOException e) {
|
||||
return Either.right(new DeserializationException(e, Arrays.toString(record)));
|
||||
} finally {
|
||||
String[] tags = {MetricRegistry.TAG_QUEUE_TYPE, cls.getSimpleName()};
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_QUEUE_RECEIVE_COUNT, MetricRegistry.METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION, tags)
|
||||
.increment();
|
||||
}
|
||||
}
|
||||
|
||||
public <T extends GenericEvent> Either<T, DeserializationException> deserialize(Class<T> cls, String record) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("[{}] received message: {}", cls.getSimpleName(), record);
|
||||
}
|
||||
|
||||
try {
|
||||
return Either.left(MAPPER.readValue(record, cls));
|
||||
} catch (IOException e) {
|
||||
return Either.right(new DeserializationException(e, record));
|
||||
} finally {
|
||||
String[] tags = {MetricRegistry.TAG_QUEUE_TYPE, cls.getSimpleName()};
|
||||
metricRegistry
|
||||
.counter(MetricRegistry.METRIC_QUEUE_RECEIVE_COUNT, MetricRegistry.METRIC_QUEUE_RECEIVE_COUNT_DESCRIPTION, tags)
|
||||
.increment();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class AbstractBroadcastQueueTest extends AbstractQueueTest {
|
||||
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
|
||||
|
||||
@Inject
|
||||
private BroadcastQueueInterface<TestBroadcast> broadcastQueue;
|
||||
|
||||
@Test
|
||||
void singleConsumer() throws QueueException, InterruptedException {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(3);
|
||||
Collection<Integer> list = Collections.synchronizedCollection(new ArrayList<>());
|
||||
|
||||
QueueSubscriber<TestBroadcast> subscriber = broadcastQueue
|
||||
.subscriber()
|
||||
.subscribe(e -> {
|
||||
list.add(e.getLeft().id);
|
||||
countDownLatch.countDown();
|
||||
});
|
||||
|
||||
String prefix = this.keyPrefix();
|
||||
for (int i = 1; i <= 3; i++) {
|
||||
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), i));
|
||||
}
|
||||
|
||||
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscriber.close();
|
||||
|
||||
assertThat(await).isEqualTo(true);
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
assertThat(list).contains(1, 2, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
void closingConsumer() throws QueueException, InterruptedException {
|
||||
singleConsumer();
|
||||
singleConsumer();
|
||||
}
|
||||
|
||||
@Test
|
||||
void multipleConsumer() throws QueueException, InterruptedException {
|
||||
int rand = ThreadLocalRandom.current().nextInt(10, 50);;
|
||||
CountDownLatch countDownLatch = new CountDownLatch(3 * rand);
|
||||
Collection<String> list = Collections.synchronizedCollection(new ArrayList<>());
|
||||
List<QueueSubscriber<TestBroadcast>> subscribers = new ArrayList<>();
|
||||
|
||||
IntStream.range(0, rand)
|
||||
.boxed()
|
||||
.forEach(throwConsumer(i -> subscribers.add(broadcastQueue
|
||||
.subscriber()
|
||||
.subscribe(e -> {
|
||||
list.add("c" + String.format("%03d", i) + "-i" + String.format("%03d", e.getLeft().id));
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
)));
|
||||
|
||||
String prefix = this.keyPrefix();
|
||||
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 1));
|
||||
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 2));
|
||||
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 3));
|
||||
|
||||
// rebalancing can take some time, we multiply timeout by 5
|
||||
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS * 5, TimeUnit.SECONDS);
|
||||
subscribers.parallelStream().forEach(QueueSubscriber::close);
|
||||
|
||||
assertThat(await).isEqualTo(true);
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
assertThat(list).hasSize(3 * rand);
|
||||
assertThat(list).contains("c000-i001", "c000-i002", "c000-i003");
|
||||
assertThat(list).contains("c" + String.format("%03d", (rand - 1)) +"-i001", "c" + String.format("%03d",(rand - 1)) +"-i002", "c" + String.format("%03d",(rand - 1)) +"-i003");
|
||||
}
|
||||
|
||||
@Test
|
||||
void pause() throws QueueException, InterruptedException {
|
||||
CountDownLatch countDownLatchFirst = new CountDownLatch(1);
|
||||
CountDownLatch countDownLatchSecond = new CountDownLatch(2);
|
||||
CountDownLatch countDownLatchOthers = new CountDownLatch(2);
|
||||
Collection<Pair<Instant, Integer>> list = Collections.synchronizedCollection(new ArrayList<>());
|
||||
|
||||
QueueSubscriber<TestBroadcast> subscriber = broadcastQueue
|
||||
.subscriber()
|
||||
.subscribe(e -> {
|
||||
list.add(Pair.of(Instant.now(), e.getLeft().id));
|
||||
if (e.getLeft().id == 1) {
|
||||
countDownLatchFirst.countDown();
|
||||
} else if (e.getLeft().id <= 3) {
|
||||
countDownLatchSecond.countDown();
|
||||
} else {
|
||||
countDownLatchOthers.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
String prefix = this.keyPrefix();
|
||||
// first round
|
||||
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 1));
|
||||
|
||||
boolean await1 = countDownLatchFirst.await(DEFAULT_TIMEOUT_SECONDS + 10, TimeUnit.SECONDS);
|
||||
subscriber.pause();
|
||||
assertThat(await1).isTrue();
|
||||
|
||||
// second round
|
||||
Instant resumeTime = Instant.now();
|
||||
subscriber.resume();
|
||||
|
||||
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 2));
|
||||
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 3));
|
||||
|
||||
boolean await2 = countDownLatchSecond.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscriber.pause();
|
||||
assertThat(await2).isTrue();
|
||||
|
||||
// last round
|
||||
Instant resumeTime2 = Instant.now();
|
||||
subscriber.resume();
|
||||
|
||||
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 4));
|
||||
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 5));
|
||||
|
||||
boolean await3 = countDownLatchOthers.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscriber.close();
|
||||
|
||||
assertThat(await3).isEqualTo(true);
|
||||
assertThat(list).hasSize(5);
|
||||
assertThat(list.stream().filter(i -> i.getLeft().isBefore(resumeTime)).count()).isEqualTo(1);
|
||||
assertThat(list.stream().filter(i -> i.getLeft().isAfter(resumeTime)).count()).isEqualTo(4);
|
||||
assertThat(list.stream().filter(i -> i.getLeft().isAfter(resumeTime2)).count()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public record TestBroadcast(String key, Integer id) implements BroadcastEvent {}
|
||||
}
|
||||
@@ -0,0 +1,190 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class AbstractDispatchQueueTest extends AbstractQueueTest {
|
||||
private static final int DEFAULT_TIMEOUT_SECONDS = 5;
|
||||
|
||||
@Inject
|
||||
private DispatchQueueInterface<TestDispatch> dispatchQueue;
|
||||
|
||||
@Test
|
||||
void singleConsumer() throws QueueException, InterruptedException, IOException {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(2);
|
||||
Collection<Integer> list = Collections.synchronizedCollection(new ArrayList<>());
|
||||
|
||||
QueueSubscriber<TestDispatch> subscriber = dispatchQueue
|
||||
.subscriber()
|
||||
.subscribe(e -> {
|
||||
list.add(e.getLeft().id);
|
||||
countDownLatch.countDown();
|
||||
});
|
||||
|
||||
String prefix = this.keyPrefix();
|
||||
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 1));
|
||||
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 2));
|
||||
|
||||
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscriber.close();
|
||||
|
||||
assertThat(await).isEqualTo(true);
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
assertThat(list).containsExactlyInAnyOrder(1, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void closingConsumer() throws QueueException, InterruptedException, IOException {
|
||||
singleConsumer();
|
||||
singleConsumer();
|
||||
}
|
||||
|
||||
@Test
|
||||
void multipleConsumer() throws QueueException, InterruptedException {
|
||||
int rand = ThreadLocalRandom.current().nextInt(10, 50);;
|
||||
CountDownLatch countDownLatch = new CountDownLatch(rand);
|
||||
Collection<String> list = Collections.synchronizedCollection(new ArrayList<>());
|
||||
List<QueueSubscriber<TestDispatch>> subscribers = new ArrayList<>();
|
||||
|
||||
IntStream.range(0, 3)
|
||||
.boxed()
|
||||
.forEach(throwConsumer(i -> subscribers.add(
|
||||
dispatchQueue
|
||||
.subscriber()
|
||||
.subscribe(e -> {
|
||||
list.add("c" + String.format("%03d", i) + "-i" + String.format("%03d", e.getLeft().id));
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
)));
|
||||
|
||||
String prefix = this.keyPrefix();
|
||||
for (int i = 0; i < rand; i++) {
|
||||
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), i));
|
||||
}
|
||||
|
||||
// rebalancing can take some time, we multiply timeout by 5
|
||||
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS * 5, TimeUnit.SECONDS);
|
||||
subscribers.parallelStream().forEach(QueueSubscriber::close);
|
||||
|
||||
assertThat(await).isEqualTo(true);
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
assertThat(list).hasSize(rand);
|
||||
assertThat(list.stream().map(s -> s.substring(0, s.indexOf("-"))).toList()).contains("c000", "c001", "c002");
|
||||
assertThat(list.stream().map(s -> s.substring(s.indexOf("-") + 1)).toList()).contains("i001", String.format("i%03d", rand - 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
void errorProcessing() throws QueueException, InterruptedException {
|
||||
// @TODO: failed on rabbitmq, the published message seems to be not durable
|
||||
String prefix = this.keyPrefix();
|
||||
|
||||
dispatchQueue.emit(IntStream.range(1, 15)
|
||||
.boxed()
|
||||
.map(i -> new TestDispatch(prefix + "_" + IdUtils.create(), i))
|
||||
.toList()
|
||||
);
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(15);
|
||||
Collection<Integer> list = Collections.synchronizedCollection(new ArrayList<>());
|
||||
|
||||
var crashed = new AtomicBoolean(false);
|
||||
|
||||
QueueSubscriber<TestDispatch> subscriber = dispatchQueue
|
||||
.subscriber()
|
||||
.subscribe(e -> {
|
||||
countDownLatch.countDown();
|
||||
|
||||
if (e.getLeft().id == 2 && crashed.compareAndSet(false, true)) {
|
||||
throw new Exception("Boom");
|
||||
}
|
||||
|
||||
list.add(e.getLeft().id);
|
||||
});
|
||||
|
||||
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscriber.close();
|
||||
|
||||
assertThat(await).isEqualTo(true);
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
assertThat(list).containsExactlyInAnyOrder(IntStream.range(1, 15).boxed().toArray(Integer[]::new));
|
||||
}
|
||||
|
||||
@Test
|
||||
void pause() throws QueueException, InterruptedException {
|
||||
CountDownLatch countDownLatchFirst = new CountDownLatch(1);
|
||||
CountDownLatch countDownLatchSecond = new CountDownLatch(2);
|
||||
CountDownLatch countDownLatchOthers = new CountDownLatch(2);
|
||||
Collection<Pair<Instant, Integer>> list = Collections.synchronizedCollection(new ArrayList<>());
|
||||
|
||||
QueueSubscriber<TestDispatch> subscriber = dispatchQueue
|
||||
.subscriber()
|
||||
.subscribe(e -> {
|
||||
list.add(Pair.of(Instant.now(), e.getLeft().id));
|
||||
if (e.getLeft().id == 1) {
|
||||
countDownLatchFirst.countDown();
|
||||
} else if (e.getLeft().id <= 3) {
|
||||
countDownLatchSecond.countDown();
|
||||
} else {
|
||||
countDownLatchOthers.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// first round
|
||||
String prefix = this.keyPrefix();
|
||||
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 1));
|
||||
|
||||
boolean await1 = countDownLatchFirst.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscriber.pause();
|
||||
assertThat(await1).isTrue();
|
||||
|
||||
// second round
|
||||
Instant resumeTime = Instant.now();
|
||||
subscriber.resume();
|
||||
|
||||
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 2));
|
||||
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 3));
|
||||
|
||||
boolean await2 = countDownLatchSecond.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscriber.pause();
|
||||
assertThat(await2).isTrue();
|
||||
|
||||
// last round
|
||||
Instant resumeTime2 = Instant.now();
|
||||
subscriber.resume();
|
||||
|
||||
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 4));
|
||||
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 5));
|
||||
|
||||
boolean await3 = countDownLatchOthers.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscriber.close();
|
||||
|
||||
assertThat(await3).isTrue();
|
||||
assertThat(list).hasSize(5);
|
||||
assertThat(list.stream().filter(i -> i.getLeft().isBefore(resumeTime)).count()).isEqualTo(1);
|
||||
assertThat(list.stream().filter(i -> i.getLeft().isAfter(resumeTime)).count()).isEqualTo(4);
|
||||
assertThat(list.stream().filter(i -> i.getLeft().isAfter(resumeTime2)).count()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public record TestDispatch(String key, Integer id) implements DispatchEvent {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,123 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class AbstractKeyedDispatchQueueTest extends AbstractQueueTest {
|
||||
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
|
||||
|
||||
@Inject
|
||||
private KeyedDispatchQueueInterface<TestKeyedDispatch> keyDispatchQueue;
|
||||
|
||||
@Test
|
||||
void singleConsumer() throws QueueException, InterruptedException {
|
||||
String groupKey = IdUtils.create();
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(2);
|
||||
Collection<Integer> list = Collections.synchronizedCollection(new ArrayList<>());
|
||||
|
||||
QueueSubscriber<TestKeyedDispatch> subscriber = keyDispatchQueue
|
||||
.subscriber(groupKey)
|
||||
.subscribe(e -> {
|
||||
list.add(e.getLeft().id);
|
||||
countDownLatch.countDown();
|
||||
});
|
||||
|
||||
String prefix = this.keyPrefix();
|
||||
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 1));
|
||||
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 2));
|
||||
|
||||
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscriber.close();
|
||||
|
||||
assertThat(await).isEqualTo(true);
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
assertThat(list).containsExactlyInAnyOrder(1, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void multipleConsumer() throws QueueException, InterruptedException {
|
||||
String groupKey = IdUtils.create();
|
||||
|
||||
int rand = ThreadLocalRandom.current().nextInt(10, 50);;
|
||||
CountDownLatch countDownLatch = new CountDownLatch(rand);
|
||||
Collection<String> list = Collections.synchronizedCollection(new ArrayList<>());
|
||||
List<QueueSubscriber<TestKeyedDispatch>> subscribers = new ArrayList<>();
|
||||
|
||||
IntStream.range(0, 3)
|
||||
.boxed()
|
||||
.forEach(throwConsumer(i -> subscribers.add(keyDispatchQueue
|
||||
.subscriber(groupKey)
|
||||
.subscribe(e -> {
|
||||
list.add("c" + String.format("%03d", i) + "-i" + String.format("%03d", e.getLeft().id));
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
)));
|
||||
|
||||
String prefix = this.keyPrefix();
|
||||
for (int i = 0; i < rand; i++) {
|
||||
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), i));
|
||||
}
|
||||
|
||||
// rebalancing can take some time, we multiply timeout by 5
|
||||
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS * 5, TimeUnit.SECONDS);
|
||||
subscribers.parallelStream().forEach(QueueSubscriber::close);
|
||||
|
||||
assertThat(await).isEqualTo(true);
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
assertThat(list).hasSize(rand);
|
||||
assertThat(list.stream().map(s -> s.substring(0, s.indexOf("-"))).toList()).contains("c000", "c001", "c002");
|
||||
assertThat(list.stream().map(s -> s.substring(s.indexOf("-") + 1)).toList()).contains("i001", String.format("i%03d", rand - 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
void multipleGroup() throws InterruptedException, QueueException {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(6);
|
||||
List<QueueSubscriber<TestKeyedDispatch>> subscribers = new ArrayList<>();
|
||||
Map<Integer, Collection<Integer>> map = new HashMap<>();
|
||||
|
||||
IntStream.range(0, 3)
|
||||
.boxed()
|
||||
.forEach(throwConsumer(i -> {
|
||||
map.put(i, Collections.synchronizedCollection(new ArrayList<>()));
|
||||
|
||||
subscribers.add(keyDispatchQueue
|
||||
.subscriber("group-" + i)
|
||||
.subscribe(e -> {
|
||||
map.get(i).add(e.getLeft().id);
|
||||
countDownLatch.countDown();
|
||||
}));
|
||||
}));
|
||||
|
||||
String prefix = this.keyPrefix();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
keyDispatchQueue.emit("group-" + i, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 1));
|
||||
keyDispatchQueue.emit("group-" + i, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 2));
|
||||
}
|
||||
|
||||
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
subscribers.forEach(QueueSubscriber::close);
|
||||
|
||||
assertThat(await).isEqualTo(true);
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
assertThat(map.entrySet().stream().flatMap(s -> s.getValue().stream()).toList()).hasSize(6);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
assertThat(map.get(i)).containsExactlyInAnyOrder(1, 2);
|
||||
}
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public record TestKeyedDispatch(String key, Integer id) implements KeyedDispatchEvent {}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package io.kestra.queue;
|
||||
|
||||
public class AbstractQueueTest {
|
||||
protected String keyPrefix() {
|
||||
StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
|
||||
StackTraceElement e = stacktrace[2];
|
||||
return e.getMethodName();
|
||||
}
|
||||
}
|
||||
1
queue/src/test/resources/allure.properties
Normal file
1
queue/src/test/resources/allure.properties
Normal file
@@ -0,0 +1 @@
|
||||
allure.results.directory=build/allure-results
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user