mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
1 Commits
develop
...
executor_v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b243716820 |
@@ -4,6 +4,7 @@ import io.kestra.core.runners.*;
|
||||
import io.kestra.core.server.Service;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.kestra.executor.DefaultExecutor;
|
||||
import io.kestra.worker.DefaultWorker;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
@@ -49,7 +50,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
|
||||
running.set(true);
|
||||
|
||||
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
|
||||
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
|
||||
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
|
||||
|
||||
if (workerEnabled) {
|
||||
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
|
||||
|
||||
@@ -11,6 +11,7 @@ import io.kestra.core.runners.*;
|
||||
|
||||
public interface QueueFactoryInterface {
|
||||
String EXECUTION_NAMED = "executionQueue";
|
||||
String EXECUTION_EVENT_NAMED = "executionEventQueue";
|
||||
String EXECUTOR_NAMED = "executorQueue";
|
||||
String WORKERJOB_NAMED = "workerJobQueue";
|
||||
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
|
||||
@@ -30,6 +31,8 @@ public interface QueueFactoryInterface {
|
||||
|
||||
QueueInterface<Execution> execution();
|
||||
|
||||
QueueInterface<ExecutionEvent> executionEvent();
|
||||
|
||||
QueueInterface<Executor> executor();
|
||||
|
||||
WorkerJobQueueInterface workerJob();
|
||||
|
||||
@@ -35,6 +35,24 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
|
||||
void delete(String consumerGroup, T message) throws QueueException;
|
||||
|
||||
/**
|
||||
* Delete all messages of the queue for this key.
|
||||
* This is used to purge a queue for a specific key.
|
||||
* A queue implementation may omit to implement it and purge records differently.
|
||||
*/
|
||||
default void deleteByKey(String key) throws QueueException {
|
||||
// by default do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all messages of the queue for a set of keys.
|
||||
* This is used to purge a queue for specific keys.
|
||||
* A queue implementation may omit to implement it and purge records differently.
|
||||
*/
|
||||
default void deleteByKeys(List<String> keys) throws QueueException {
|
||||
// by default do nothing
|
||||
}
|
||||
|
||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||
return receive(null, consumer, false);
|
||||
}
|
||||
@@ -54,4 +72,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
}
|
||||
|
||||
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
|
||||
|
||||
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
|
||||
return receiveBatch(null, queueType, consumer);
|
||||
}
|
||||
|
||||
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
|
||||
return receiveBatch(consumerGroup, queueType, consumer, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Consumer a batch of messages.
|
||||
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
|
||||
*/
|
||||
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
|
||||
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,8 +19,6 @@ public class QueueService {
|
||||
return ((SubflowExecution<?>) object).getExecution().getId();
|
||||
} else if (object.getClass() == SubflowExecutionResult.class) {
|
||||
return ((SubflowExecutionResult) object).getExecutionId();
|
||||
} else if (object.getClass() == ExecutorState.class) {
|
||||
return ((ExecutorState) object).getExecutionId();
|
||||
} else if (object.getClass() == Setting.class) {
|
||||
return ((Setting) object).getKey();
|
||||
} else if (object.getClass() == Executor.class) {
|
||||
|
||||
@@ -2,12 +2,12 @@ package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
|
||||
import io.kestra.core.models.executions.statistics.ExecutionCount;
|
||||
import io.kestra.core.models.executions.statistics.Flow;
|
||||
import io.kestra.core.models.flows.FlowScope;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.Executor;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.plugin.core.dashboard.data.Executions;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
@@ -156,4 +156,6 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
);
|
||||
|
||||
Executor lock(String executionId, Function<Execution, Executor> function);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public record ExecutionEvent(String tenantId, String executionId, Instant eventDate, ExecutionEventType eventType) implements HasUID {
|
||||
public ExecutionEvent(Execution execution, ExecutionEventType eventType) {
|
||||
this(execution.getTenantId(), execution.getId(), Instant.now(), eventType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return executionId;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
public enum ExecutionEventType {
|
||||
CREATED,
|
||||
UPDATED,
|
||||
TERMINATED,
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.flows.State;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class ExecutorState {
|
||||
private String executionId;
|
||||
private Map<String, State.Type> workerTaskDeduplication = new ConcurrentHashMap<>();
|
||||
private Map<String, String> childDeduplication = new ConcurrentHashMap<>();
|
||||
private Map<String, State.Type> subflowExecutionDeduplication = new ConcurrentHashMap<>();
|
||||
|
||||
public ExecutorState(String executionId) {
|
||||
this.executionId = executionId;
|
||||
}
|
||||
}
|
||||
398
executor/src/main/java/io/kestra/executor/DefaultExecutor.java
Normal file
398
executor/src/main/java/io/kestra/executor/DefaultExecutor.java
Normal file
@@ -0,0 +1,398 @@
|
||||
package io.kestra.executor;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.core.services.WorkerGroupService;
|
||||
import io.kestra.core.trace.Tracer;
|
||||
import io.kestra.core.trace.TracerFactory;
|
||||
import io.kestra.core.utils.*;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class DefaultExecutor implements ExecutorInterface {
|
||||
@Inject
|
||||
private ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
private QueueInterface<Execution> executionQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
private QueueInterface<ExecutionEvent> executionEventQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
|
||||
private QueueInterface<WorkerJob> workerJobQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
|
||||
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
private QueueInterface<LogEntry> logQueue;
|
||||
|
||||
@Inject
|
||||
private SkipExecutionService skipExecutionService;
|
||||
@Inject
|
||||
private PluginDefaultService pluginDefaultService;
|
||||
@Inject
|
||||
private ExecutorService executorService;
|
||||
@Inject
|
||||
private WorkerGroupService workerGroupService;
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
@Inject
|
||||
private FlowMetaStoreInterface flowMetaStore;
|
||||
|
||||
// FIXME change config names
|
||||
@Value("${kestra.jdbc.executor.clean.execution-queue:true}")
|
||||
private boolean cleanExecutionQueue;
|
||||
@Value("${kestra.jdbc.executor.clean.worker-queue:true}")
|
||||
private boolean cleanWorkerJobQueue;
|
||||
|
||||
private final AtomicReference<ServiceState> state = new AtomicReference<>();
|
||||
private final String id = IdUtils.create();
|
||||
private final List<Runnable> receiveCancellations = new ArrayList<>();
|
||||
|
||||
private final Tracer tracer;
|
||||
private final java.util.concurrent.ExecutorService workerTaskResultExecutorService;
|
||||
private final java.util.concurrent.ExecutorService executionExecutorService;
|
||||
|
||||
@Inject
|
||||
public DefaultExecutor(TracerFactory tracerFactory, ExecutorsUtils executorsUtils, @Value("${kestra.jdbc.executor.thread-count:0}") int threadCount) {
|
||||
this.tracer = tracerFactory.getTracer(DefaultExecutor.class, "EXECUTOR");
|
||||
|
||||
// By default, we start available processors count threads with a minimum of 4 by executor service
|
||||
// for the worker task result queue and the execution queue.
|
||||
// Other queues would not benefit from more consumers.
|
||||
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(4, Runtime.getRuntime().availableProcessors());
|
||||
this.workerTaskResultExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-worker-task-result-executor");
|
||||
this.executionExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-execution-executor");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
setState(ServiceState.CREATED);
|
||||
|
||||
// listen to executor related queues
|
||||
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, execution -> executionQueue(execution)));
|
||||
this.receiveCancellations.addFirst(this.executionEventQueue.receiveBatch(
|
||||
Executor.class,
|
||||
executionEvents -> {
|
||||
List<CompletableFuture<Void>> futures = executionEvents.stream()
|
||||
.map(executionEvent -> CompletableFuture.runAsync(() -> executionEventQueue(executionEvent), executionExecutorService))
|
||||
.toList();
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
));
|
||||
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receiveBatch(
|
||||
Executor.class,
|
||||
workerTaskResults -> {
|
||||
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
|
||||
.map(workerTaskResult -> CompletableFuture.runAsync(() -> workerTaskResultQueue(workerTaskResult), workerTaskResultExecutorService))
|
||||
.toList();
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
));
|
||||
|
||||
setState(ServiceState.RUNNING);
|
||||
log.info("Executor started");
|
||||
}
|
||||
|
||||
// This serves as a temporal bridge between the old execution queue and the new execution event queue to avoid updating all code that uses the old queue
|
||||
private void executionQueue(Either<Execution, DeserializationException> either) {
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
Execution message = either.getLeft();
|
||||
if (skipExecutionService.skipExecution(message)) {
|
||||
log.warn("Skipping execution {}", message.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
executionEventQueue.emit(new ExecutionEvent(message, ExecutionEventType.CREATED));
|
||||
} catch (QueueException e) {
|
||||
// If we cannot send the execution event we fail the execution
|
||||
executionRepository.lock(message.getId(), execution -> {
|
||||
try {
|
||||
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
|
||||
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.UPDATED); // TODO terminated
|
||||
// TODO transaction between repo and queue
|
||||
this.executionRepository.update(failed);
|
||||
this.executionEventQueue.emit(event);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the execution {}", execution.getId(), ex);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void executionEventQueue(Either<ExecutionEvent, DeserializationException> either) {
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
ExecutionEvent message = either.getLeft();
|
||||
if (skipExecutionService.skipExecution(message.executionId())) { // TODO we may add tenant/namespace/flow for skip them
|
||||
log.warn("Skipping execution {}", message.executionId());
|
||||
return;
|
||||
}
|
||||
|
||||
Executor result = executionRepository.lock(message.executionId(), execution -> {
|
||||
return tracer.inCurrentContext(
|
||||
execution,
|
||||
FlowId.uidWithoutRevision(execution),
|
||||
() -> {
|
||||
final FlowWithSource flow = findFlow(execution);
|
||||
Executor executor = new Executor(execution, null).withFlow(flow);
|
||||
|
||||
// process the execution
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, executor);
|
||||
}
|
||||
executor = executorService.process(executor);
|
||||
|
||||
if (!executor.getNexts().isEmpty()) {
|
||||
executor.withExecution(
|
||||
executorService.onNexts(executor.getExecution(), executor.getNexts()),
|
||||
"onNexts"
|
||||
);
|
||||
}
|
||||
|
||||
// worker task
|
||||
if (!executor.getWorkerTasks().isEmpty()) {
|
||||
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
|
||||
executor
|
||||
.getWorkerTasks()
|
||||
.forEach(throwConsumer(workerTask -> {
|
||||
try {
|
||||
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
|
||||
} else {
|
||||
if (workerTask.getTask().isSendToWorkerTask()) {
|
||||
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
|
||||
String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey())))
|
||||
.orElse(null);
|
||||
workerJobQueue.emit(workerGroupKey, workerTask);
|
||||
}
|
||||
if (workerTask.getTask().isFlowable()) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
|
||||
workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", workerTask.getTask().getId(), e.getMessage(), e);
|
||||
}
|
||||
}));
|
||||
|
||||
try {
|
||||
executorService.addWorkerTaskResults(executor, workerTaskResults);
|
||||
} catch (InternalException e) {
|
||||
log.error("Unable to add a worker task result to the execution", e);
|
||||
}
|
||||
}
|
||||
|
||||
return executor;
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
if (result != null) {
|
||||
this.toExecution(result);
|
||||
}
|
||||
}
|
||||
|
||||
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
|
||||
if (either == null) {
|
||||
// FIXME it happens in Kafka but sould not? or maybe it should...
|
||||
return;
|
||||
}
|
||||
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage(), either.getRight());
|
||||
return;
|
||||
}
|
||||
|
||||
WorkerTaskResult message = either.getLeft();
|
||||
if (skipExecutionService.skipExecution(message.getTaskRun())) {
|
||||
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
|
||||
return;
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, true, message);
|
||||
}
|
||||
|
||||
Executor executor = executionRepository.lock(message.getTaskRun().getExecutionId(), execution -> {
|
||||
Executor current = new Executor(execution, null);
|
||||
|
||||
if (execution == null) {
|
||||
throw new IllegalStateException("Execution state don't exist for " + message.getTaskRun().getExecutionId() + ", receive " + message);
|
||||
}
|
||||
|
||||
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
|
||||
try {
|
||||
// process worker task result
|
||||
executorService.addWorkerTaskResult(current, () -> findFlow(execution), message);
|
||||
// join worker result
|
||||
return current;
|
||||
} catch (InternalException e) {
|
||||
return handleFailedExecutionFromExecutor(current, e);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
|
||||
if (executor != null) {
|
||||
this.toExecution(executor);
|
||||
}
|
||||
}
|
||||
|
||||
private void toExecution(Executor executor) {
|
||||
try {
|
||||
boolean shouldSend = false;
|
||||
|
||||
if (executor.getException() != null) {
|
||||
executor = handleFailedExecutionFromExecutor(executor, executor.getException());
|
||||
shouldSend = true;
|
||||
} else if (executor.isExecutionUpdated()) {
|
||||
shouldSend = true;
|
||||
}
|
||||
|
||||
if (!shouldSend) {
|
||||
// delete the execution from the state storage if ended
|
||||
// IMPORTANT: it must be done here as it's when the execution arrives 'again' with a terminated state,
|
||||
// so we are sure at this point that no new executions will be created otherwise the tate storage would be re-created by the execution queue.
|
||||
if (executorService.canBePurged(executor)) {
|
||||
// TODO executorStateStorage.delete(executor.getExecution());
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
executorService.log(log, false, executor);
|
||||
}
|
||||
|
||||
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
|
||||
// or from a worker task in an afterExecution block, in this case we need to load the flow
|
||||
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
|
||||
executor = executor.withFlow(findFlow(executor.getExecution()));
|
||||
}
|
||||
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
|
||||
|
||||
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
|
||||
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
|
||||
if (isTerminated) {
|
||||
if (cleanExecutionQueue) {
|
||||
executionEventQueue.deleteByKey(executor.getExecution().getId());
|
||||
executionQueue.deleteByKey(executor.getExecution().getId());
|
||||
}
|
||||
|
||||
// Purge the workerTaskResultQueue and the workerJobQueue
|
||||
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
|
||||
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
|
||||
// If any of these assumptions changed, this code would not be safe anymore.
|
||||
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
|
||||
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
|
||||
.map(taskRun -> taskRun.getId())
|
||||
.toList();
|
||||
workerTaskResultQueue.deleteByKeys(taskRunKeys);
|
||||
workerJobQueue.deleteByKeys(taskRunKeys);
|
||||
}
|
||||
|
||||
ExecutionEvent event = new ExecutionEvent(executor.getExecution(), ExecutionEventType.TERMINATED);
|
||||
this.executionEventQueue.emit(event);
|
||||
} else {
|
||||
ExecutionEvent event = new ExecutionEvent(executor.getExecution(), ExecutionEventType.UPDATED);
|
||||
this.executionEventQueue.emit(event);
|
||||
}
|
||||
} catch (QueueException e) {
|
||||
// If we cannot add the new worker task result to the execution, we fail it
|
||||
executionRepository.lock(executor.getExecution().getId(), execution -> {
|
||||
try {
|
||||
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
|
||||
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.TERMINATED);
|
||||
this.executionEventQueue.emit(event);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the execution {}", execution.getId(), ex);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private FlowWithSource findFlow(Execution execution) {
|
||||
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
|
||||
return pluginDefaultService.injectDefaults(flow, execution);
|
||||
}
|
||||
|
||||
private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
|
||||
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
|
||||
|
||||
try {
|
||||
logQueue.emitAsync(failedExecutionWithLog.getLogs());
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
|
||||
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceType getType() {
|
||||
return ServiceType.EXECUTOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceState getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
private void setState(final ServiceState state) {
|
||||
this.state.set(state);
|
||||
eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package io.kestra.repository.h2;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
@@ -23,9 +22,8 @@ public class H2ExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
@Inject
|
||||
public H2ExecutionRepository(@Named("executions") H2Repository<Execution> repository,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, executorStateStorage, filterService);
|
||||
super(repository, applicationContext, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.runners.ExecutorState;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.repository.h2.H2Repository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@H2QueueEnabled
|
||||
public class H2ExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
|
||||
public H2ExecutorStateStorage(@Named("executorstate") H2Repository<ExecutorState> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,14 @@ public class H2QueueFactory implements QueueFactoryInterface {
|
||||
return new H2Queue<>(Execution.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionEvent> executionEvent() {
|
||||
return new H2Queue<>(ExecutionEvent.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.MultipleConditionEvent',
|
||||
'io.kestra.core.runners.ExecutionEvent'
|
||||
) NOT NULL
|
||||
|
||||
DROP TABLE IF EXISTS executorstate;
|
||||
@@ -4,7 +4,6 @@ import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -24,9 +23,8 @@ public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
@Inject
|
||||
public MysqlExecutionRepository(@Named("executions") MysqlRepository<Execution> repository,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, executorStateStorage, filterService);
|
||||
super(repository, applicationContext, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.core.runners.ExecutorState;
|
||||
import io.kestra.repository.mysql.MysqlRepository;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@MysqlQueueEnabled
|
||||
public class MysqlExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
|
||||
public MysqlExecutorStateStorage(@Named("executorstate") MysqlRepository<ExecutorState> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,14 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
|
||||
return new MysqlQueue<>(Execution.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionEvent> executionEvent() {
|
||||
return new MysqlQueue<>(ExecutionEvent.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.MultipleConditionEvent',
|
||||
'io.kestra.core.runners.ExecutionEvent'
|
||||
) NOT NULL
|
||||
|
||||
DROP TABLE IF EXISTS executorstate;
|
||||
@@ -5,7 +5,6 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -26,9 +25,8 @@ public class PostgresExecutionRepository extends AbstractJdbcExecutionRepository
|
||||
@Inject
|
||||
public PostgresExecutionRepository(@Named("executions") PostgresRepository<Execution> repository,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, executorStateStorage, filterService);
|
||||
super(repository, applicationContext, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.runners.ExecutorState;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.repository.postgres.PostgresRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@PostgresQueueEnabled
|
||||
public class PostgresExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
|
||||
public PostgresExecutorStateStorage(@Named("executorstate") PostgresRepository<ExecutorState> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,14 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
|
||||
return new PostgresQueue<>(Execution.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionEvent> executionEvent() {
|
||||
return new PostgresQueue<>(ExecutionEvent.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.ExecutionEvent';
|
||||
|
||||
DROP TABLE IF EXISTS executorstate;
|
||||
@@ -71,12 +71,6 @@ public class JdbcTableConfigsFactory {
|
||||
return new InstantiableJdbcTableConfig("multipleconditions", MultipleConditionWindow.class, "multipleconditions");
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Named("executorstate")
|
||||
public InstantiableJdbcTableConfig executorState() {
|
||||
return new InstantiableJdbcTableConfig("executorstate", ExecutorState.class, "executorstate");
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Named("executordelayed")
|
||||
public InstantiableJdbcTableConfig executorDelayed() {
|
||||
|
||||
@@ -20,11 +20,9 @@ import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.Executor;
|
||||
import io.kestra.core.runners.ExecutorState;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.kestra.plugin.core.dashboard.data.Executions;
|
||||
@@ -35,7 +33,6 @@ import io.micronaut.inject.qualifiers.Qualifiers;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Getter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.jooq.*;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.impl.DSL;
|
||||
@@ -66,7 +63,6 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
|
||||
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
||||
private final ApplicationContext applicationContext;
|
||||
protected final AbstractJdbcExecutorStateStorage executorStateStorage;
|
||||
|
||||
private QueueInterface<Execution> executionQueue;
|
||||
private final NamespaceUtils namespaceUtils;
|
||||
@@ -100,11 +96,9 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
public AbstractJdbcExecutionRepository(
|
||||
io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository,
|
||||
ApplicationContext applicationContext,
|
||||
AbstractJdbcExecutorStateStorage executorStateStorage,
|
||||
JdbcFilterService filterService
|
||||
) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
this.executorStateStorage = executorStateStorage;
|
||||
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
|
||||
this.namespaceUtils = applicationContext.getBean(NamespaceUtils.class);
|
||||
|
||||
@@ -980,7 +974,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
});
|
||||
}
|
||||
|
||||
public Executor lock(String executionId, Function<Pair<Execution, ExecutorState>, Pair<Executor, ExecutorState>> function) {
|
||||
@Override
|
||||
public Executor lock(String executionId, Function<Execution, Executor> function) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
@@ -1000,14 +995,11 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
return null;
|
||||
}
|
||||
|
||||
ExecutorState executorState = executorStateStorage.get(context, execution.get());
|
||||
Pair<Executor, ExecutorState> pair = function.apply(Pair.of(execution.get(), executorState));
|
||||
Executor executor = function.apply(execution.get());
|
||||
|
||||
if (pair != null) {
|
||||
this.jdbcRepository.persist(pair.getKey().getExecution(), context, null);
|
||||
this.executorStateStorage.save(context, pair.getRight());
|
||||
|
||||
return pair.getKey();
|
||||
if (executor != null) {
|
||||
this.jdbcRepository.persist(executor.getExecution(), context, null);
|
||||
return executor;
|
||||
}
|
||||
|
||||
return null;
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.runners.ExecutorState;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcRepository;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.Record1;
|
||||
import org.jooq.SelectConditionStep;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class AbstractJdbcExecutorStateStorage {
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutorState> jdbcRepository;
|
||||
|
||||
public AbstractJdbcExecutorStateStorage(io.kestra.jdbc.AbstractJdbcRepository<ExecutorState> jdbcRepository) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
}
|
||||
|
||||
public ExecutorState get(DSLContext dslContext, Execution execution) {
|
||||
SelectConditionStep<Record1<Object>> select = dslContext
|
||||
.select(AbstractJdbcRepository.field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(
|
||||
AbstractJdbcRepository.field("key").eq(execution.getId())
|
||||
);
|
||||
|
||||
return this.jdbcRepository.fetchOne(select)
|
||||
.orElse(new ExecutorState(execution.getId()));
|
||||
}
|
||||
|
||||
public void save(DSLContext dslContext, ExecutorState executorState) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executorState);
|
||||
this.jdbcRepository.persist(executorState, dslContext, fields);
|
||||
}
|
||||
|
||||
public void delete(Execution execution) {
|
||||
this.jdbcRepository.delete(new ExecutorState(execution.getId()));
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -185,10 +185,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
// and the queue has its own cleaner, which we better not mess with, as the 'queues' table is selected with a lock.
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all messages of the queue for this key.
|
||||
* This is used to purge a queue for a specific key.
|
||||
*/
|
||||
@Override
|
||||
public void deleteByKey(String key) throws QueueException {
|
||||
dslContextWrapper.transaction(configuration -> {
|
||||
int deleted = DSL
|
||||
@@ -336,14 +333,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
);
|
||||
}
|
||||
|
||||
public Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
|
||||
return receiveBatch(null, queueType, consumer);
|
||||
}
|
||||
|
||||
public Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
|
||||
return receiveBatch(consumerGroup, queueType, consumer, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
|
||||
return this.receiveImpl(
|
||||
consumerGroup,
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.server.ServiceInstance;
|
||||
import io.kestra.core.server.ServiceRegistry;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.server.WorkerTaskRestartStrategy;
|
||||
import io.kestra.executor.DefaultExecutor;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
@@ -37,7 +38,7 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
|
||||
|
||||
private final static Logger log = LoggerFactory.getLogger(JdbcServiceLivenessCoordinator.class);
|
||||
|
||||
private final AtomicReference<JdbcExecutor> executor = new AtomicReference<>();
|
||||
private final AtomicReference<DefaultExecutor> executor = new AtomicReference<>();
|
||||
private final AbstractJdbcServiceInstanceRepository serviceInstanceRepository;
|
||||
private final Duration purgeRetention;
|
||||
|
||||
@@ -89,7 +90,8 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
|
||||
.toList();
|
||||
if (!ids.isEmpty()) {
|
||||
log.info("Trigger task restart for non-responding workers after termination grace period: {}.", ids);
|
||||
executor.get().reEmitWorkerJobsForWorkers(configuration, ids);
|
||||
// FIXME
|
||||
//executor.get().reEmitWorkerJobsForWorkers(configuration, ids);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,7 +145,8 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
|
||||
|
||||
if (!workerIdsHavingTasksToRestart.isEmpty()) {
|
||||
log.info("Trigger task restart for non-responding workers after timeout: {}.", workerIdsHavingTasksToRestart);
|
||||
executor.get().reEmitWorkerJobsForWorkers(configuration, workerIdsHavingTasksToRestart);
|
||||
// FIXME
|
||||
//executor.get().reEmitWorkerJobsForWorkers(configuration, workerIdsHavingTasksToRestart);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -155,7 +158,7 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
|
||||
}
|
||||
|
||||
|
||||
synchronized void setExecutor(final JdbcExecutor executor) {
|
||||
synchronized void setExecutor(final DefaultExecutor executor) {
|
||||
this.executor.set(executor);
|
||||
}
|
||||
|
||||
|
||||
@@ -595,6 +595,8 @@ public class ExecutionController {
|
||||
propagator.get().inject(Context.current(), result, ExecutionTextMapSetter.INSTANCE);
|
||||
}
|
||||
|
||||
// TODO this is temporal the time we create a queue indexer for it like in JDBC
|
||||
executionRepository.save(result);
|
||||
executionQueue.emit(result);
|
||||
eventPublisher.publishEvent(new CrudEvent<>(result, CrudEventType.CREATE));
|
||||
|
||||
@@ -709,6 +711,9 @@ public class ExecutionController {
|
||||
.map(ContextPropagators::getTextMapPropagator)
|
||||
.ifPresent(propagator -> propagator.inject(Context.current(), executionWithInputs, ExecutionTextMapSetter.INSTANCE));
|
||||
|
||||
// FIXME we should have the same as for JDBC a "queue indexer" that would index sync executions
|
||||
// for now we store it via the executionRepository instead
|
||||
executionRepository.save(executionWithInputs);
|
||||
executionQueue.emit(executionWithInputs);
|
||||
eventPublisher.publishEvent(new CrudEvent<>(executionWithInputs, CrudEventType.CREATE));
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.ExecutionEvent;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
@@ -18,6 +20,7 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@@ -34,18 +37,21 @@ public class ExecutionStreamingService {
|
||||
private final Map<String, Map<String, Pair<FluxSink<Event<Execution>>, Flow>>> subscribers = new ConcurrentHashMap<>();
|
||||
private final Object subscriberLock = new Object();
|
||||
|
||||
private final QueueInterface<Execution> executionQueue;
|
||||
private final QueueInterface<ExecutionEvent> executionQueue;
|
||||
private final ExecutionService executionService;
|
||||
private final ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
private Runnable queueConsumer;
|
||||
|
||||
@Inject
|
||||
public ExecutionStreamingService(
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface<Execution> executionQueue,
|
||||
ExecutionService executionService
|
||||
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED) QueueInterface<ExecutionEvent> executionQueue,
|
||||
ExecutionService executionService,
|
||||
ExecutionRepositoryInterface executionRepository
|
||||
) {
|
||||
this.executionQueue = executionQueue;
|
||||
this.executionService = executionService;
|
||||
this.executionRepository = executionRepository;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
@@ -57,22 +63,27 @@ public class ExecutionStreamingService {
|
||||
return;
|
||||
}
|
||||
|
||||
Execution execution = either.getLeft();
|
||||
String executionId = execution.getId();
|
||||
ExecutionEvent event = either.getLeft();
|
||||
|
||||
// Get all subscribers for this execution
|
||||
Map<String, Pair<FluxSink<Event<Execution>>, Flow>> executionSubscribers = subscribers.get(executionId);
|
||||
Map<String, Pair<FluxSink<Event<Execution>>, Flow>> executionSubscribers = subscribers.get(event.executionId());
|
||||
|
||||
if (!MapUtils.isEmpty(executionSubscribers)) {
|
||||
Optional<Execution> execution = executionRepository.findById(event.tenantId(), event.executionId());
|
||||
if (execution.isEmpty()) {
|
||||
log.error("Unable to find the execution id {}", event.executionId());
|
||||
return;
|
||||
}
|
||||
|
||||
executionSubscribers.values().forEach(pair -> {
|
||||
var sink = pair.getLeft();
|
||||
var flow = pair.getRight();
|
||||
try {
|
||||
if (isStopFollow(flow, execution)) {
|
||||
sink.next(Event.of(execution).id("end"));
|
||||
if (isStopFollow(flow, execution.get())) {
|
||||
sink.next(Event.of(execution.get()).id("end"));
|
||||
sink.complete();
|
||||
} else {
|
||||
sink.next(Event.of(execution).id("progress"));
|
||||
sink.next(Event.of(execution.get()).id("progress"));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error sending execution update", e);
|
||||
|
||||
Reference in New Issue
Block a user