Compare commits

...

3 Commits

Author SHA1 Message Date
Loïc Mathieu
09a16d16c2 WIP 2025-08-27 13:52:15 +02:00
Loïc Mathieu
f500276894 feat(execution): allow to listen the RUNNING state after being queued 2025-08-27 13:03:06 +02:00
Loïc Mathieu
868e88679f feat(system): execution state change queue
When inside the execution queue, we have process the execution, if the state of the execution change, send an ExecutionChangeMEssage.
Inside the Executor, process this new message end do all actions that was previously on the execution queue on terminated and state change.

The idea is to lower the work to be done synchronously when processing an execution message and process it later (async) in a new queue consumer.
2025-08-27 13:03:06 +02:00
9 changed files with 203 additions and 74 deletions

View File

@@ -11,6 +11,7 @@ import io.kestra.core.runners.*;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTION_STATE_CHANGE_NAMED = "executionStateChangeQueue";
String EXECUTOR_NAMED = "executorQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
@@ -30,6 +31,8 @@ public interface QueueFactoryInterface {
QueueInterface<Execution> execution();
QueueInterface<ExecutionStateChange> executionStateChange();
QueueInterface<Executor> executor();
WorkerJobQueueInterface workerJob();

View File

@@ -0,0 +1,28 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
@Value
@AllArgsConstructor
@Builder
public class ExecutionStateChange implements HasUID {
@NotNull
Execution execution;
@NotNull
State.Type oldState;
@NotNull
State.Type newState;
@Override
public String uid() {
return execution.getId();
}
}

View File

@@ -33,6 +33,14 @@ public class H2QueueFactory implements QueueFactoryInterface {
return new H2Queue<>(Execution.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_STATE_CHANGE_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionStateChange> executionStateChange() {
return new H2Queue<>(ExecutionStateChange.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTOR_NAMED)

View File

@@ -0,0 +1,20 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning',
'io.kestra.core.runners.ExecutionStateChange'
) NOT NULL;

View File

@@ -33,6 +33,14 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
return new MysqlQueue<>(Execution.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_STATE_CHANGE_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionStateChange> executionStateChange() {
return new MysqlQueue<>(ExecutionStateChange.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTOR_NAMED)

View File

@@ -0,0 +1,20 @@
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning',
'io.kestra.core.runners.ExecutionStateChange'
) NOT NULL;

View File

@@ -33,6 +33,14 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
return new PostgresQueue<>(Execution.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_STATE_CHANGE_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionStateChange> executionStateChange() {
return new PostgresQueue<>(ExecutionStateChange.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTOR_NAMED)

View File

@@ -0,0 +1 @@
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.ExecutionStateChange';

View File

@@ -82,6 +82,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_STATE_CHANGE_NAMED)
private QueueInterface<ExecutionStateChange> executionStateChangeQueue;
@Inject
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
private QueueInterface<WorkerJob> workerJobQueue;
@@ -308,6 +312,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
));
this.receiveCancellations.addFirst(this.executionStateChangeQueue.receive(Executor.class, this::executionStateChangeQueue));
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
@@ -1078,11 +1083,12 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
// purge the executionQueue
// purge the executionQueue and the executionStateChangeQueue
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
if (cleanExecutionQueue && isTerminated) {
((JdbcQueue<Execution>) executionQueue).deleteByKey(executor.getExecution().getId());
((JdbcQueue<ExecutionStateChange>) executionStateChangeQueue).deleteByKey(executor.getExecution().getId());
}
// emit for other consumers than the executor if no failure
@@ -1093,79 +1099,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
Execution execution = executor.getExecution();
// handle flow triggers on state change
if (!execution.getState().getCurrent().equals(executor.getOriginalState())) {
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage))
.forEach(throwConsumer(executionFromFlowTrigger -> this.executionQueue.emit(executionFromFlowTrigger)));
}
// handle actions on terminated state
if (isTerminated) {
// if there is a parent, we send a subflow execution result to it
if (ExecutableUtils.isSubflow(execution)) {
// locate the parent execution to find the parent task run
String parentExecutionId = (String) execution.getTrigger().getVariables().get("executionId");
String taskRunId = (String) execution.getTrigger().getVariables().get("taskRunId");
String taskId = (String) execution.getTrigger().getVariables().get("taskId");
@SuppressWarnings("unchecked")
Map<String, Object> outputs = (Map<String, Object>) execution.getTrigger().getVariables().get("taskRunOutputs");
Variables variables = variablesService.of(StorageContext.forExecution(executor.getExecution()), outputs);
SubflowExecutionEnd subflowExecutionEnd = new SubflowExecutionEnd(executor.getExecution(), parentExecutionId, taskRunId, taskId, execution.getState().getCurrent(), variables);
this.subflowExecutionEndQueue.emit(subflowExecutionEnd);
}
// purge SLA monitors
if (!ListUtils.isEmpty(executor.getFlow().getSla()) && executor.getFlow().getSla().stream().anyMatch(ExecutionMonitoringSLA.class::isInstance)) {
slaMonitorStorage.purge(executor.getExecution().getId());
}
// purge execution running
if (executor.getFlow().getConcurrency() != null) {
executionRunningStorage.remove(execution);
}
// check if there exist a queued execution and submit it to the execution queue
if (executor.getFlow().getConcurrency() != null && executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
executor.getFlow().getNamespace(),
executor.getFlow().getId(),
throwConsumer(queued -> {
var newExecution = queued.withState(State.Type.RUNNING);
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(newExecution.getTenantId())
.namespace(newExecution.getNamespace())
.flowId(newExecution.getFlowId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
.build();
executionRunningStorage.save(executionRunning);
executionQueue.emit(newExecution);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
})
);
}
// purge the trigger: reset scheduler trigger at end
if (execution.getTrigger() != null) {
FlowWithSource flow = executor.getFlow();
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> {
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
});
}
// Purge the workerTaskResultQueue and the workerJobQueue
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
// If any of these assumptions changed, this code would not be safe anymore.
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
}
// send a message to the executionStateChange queue for post-execution actions if the state change or the execution is terminated
// we must always send terminated execution to the queue or afterExecution execution update would not be detected.
if (!execution.getState().getCurrent().equals(executor.getOriginalState()) || isTerminated) {
executionStateChangeQueue.emit(new ExecutionStateChange(execution, executor.getOriginalState(), execution.getState().getCurrent()));
}
} catch (QueueException e) {
if (!ignoreFailure) {
@@ -1323,6 +1260,102 @@ public class JdbcExecutor implements ExecutorInterface, Service {
});
}
private void executionStateChangeQueue(Either<ExecutionStateChange, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize an execution state change: {}", either.getRight().getMessage());
return;
}
final ExecutionStateChange executionStateChange = either.getLeft();
final Execution execution = executionStateChange.getExecution();
if (skipExecutionService.skipExecution(execution)) {
log.warn("Skipping execution {}", execution.getId());
return;
}
try {
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage))
.forEach(throwConsumer(executionFromFlowTrigger -> this.executionQueue.emit(executionFromFlowTrigger)));
FlowWithSource flow = findFlow(execution);
boolean isTerminated = executionService.isTerminated(flow, execution);
// handle actions on terminated state
if (isTerminated) {
// if there is a parent, we send a subflow execution result to it
if (ExecutableUtils.isSubflow(execution)) {
// locate the parent execution to find the parent task run
String parentExecutionId = (String) execution.getTrigger().getVariables().get("executionId");
String taskRunId = (String) execution.getTrigger().getVariables().get("taskRunId");
String taskId = (String) execution.getTrigger().getVariables().get("taskId");
@SuppressWarnings("unchecked")
Map<String, Object> outputs = (Map<String, Object>) execution.getTrigger().getVariables().get("taskRunOutputs");
Variables variables = variablesService.of(StorageContext.forExecution(execution), outputs);
SubflowExecutionEnd subflowExecutionEnd = new SubflowExecutionEnd(execution, parentExecutionId, taskRunId, taskId, execution.getState().getCurrent(), variables);
this.subflowExecutionEndQueue.emit(subflowExecutionEnd);
}
// purge SLA monitors
if (!ListUtils.isEmpty(flow.getSla()) && flow.getSla().stream().anyMatch(ExecutionMonitoringSLA.class::isInstance)) {
slaMonitorStorage.purge(execution.getId());
}
// purge execution running
if (flow.getConcurrency() != null) {
executionRunningStorage.remove(execution);
}
// check if there exist a queued execution and submit it to the execution queue
if (flow.getConcurrency() != null && flow.getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
executionQueuedStorage.pop(flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
throwConsumer(queued -> {
var newExecution = queued.withState(State.Type.RUNNING);
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(newExecution.getTenantId())
.namespace(newExecution.getNamespace())
.flowId(newExecution.getFlowId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
.build();
executionRunningStorage.save(executionRunning);
executionQueue.emit(newExecution);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
// send an execution state change event so we can use a flow trigger on the queued state
executionStateChangeQueue.emit(new ExecutionStateChange(newExecution, State.Type.QUEUED, State.Type.RUNNING));
})
);
}
// purge the trigger: reset scheduler trigger at end
if (execution.getTrigger() != null) {
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> {
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
});
}
// Purge the workerTaskResultQueue and the workerJobQueue
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
// If any of these assumptions changed, this code would not be safe anymore.
if (cleanWorkerJobQueue && !ListUtils.isEmpty(execution.getTaskRunList())) {
List<String> taskRunKeys = execution.getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
}
}
} catch (QueueException e) {
//FIXME
}
}
private boolean deduplicateNexts(Execution execution, ExecutorState executorState, List<TaskRun> taskRuns) {
return taskRuns
.stream()