mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
3 Commits
run-develo
...
feat/execu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09a16d16c2 | ||
|
|
f500276894 | ||
|
|
868e88679f |
@@ -11,6 +11,7 @@ import io.kestra.core.runners.*;
|
||||
|
||||
public interface QueueFactoryInterface {
|
||||
String EXECUTION_NAMED = "executionQueue";
|
||||
String EXECUTION_STATE_CHANGE_NAMED = "executionStateChangeQueue";
|
||||
String EXECUTOR_NAMED = "executorQueue";
|
||||
String WORKERJOB_NAMED = "workerJobQueue";
|
||||
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
|
||||
@@ -30,6 +31,8 @@ public interface QueueFactoryInterface {
|
||||
|
||||
QueueInterface<Execution> execution();
|
||||
|
||||
QueueInterface<ExecutionStateChange> executionStateChange();
|
||||
|
||||
QueueInterface<Executor> executor();
|
||||
|
||||
WorkerJobQueueInterface workerJob();
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class ExecutionStateChange implements HasUID {
|
||||
@NotNull
|
||||
Execution execution;
|
||||
|
||||
@NotNull
|
||||
State.Type oldState;
|
||||
|
||||
@NotNull
|
||||
State.Type newState;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return execution.getId();
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,14 @@ public class H2QueueFactory implements QueueFactoryInterface {
|
||||
return new H2Queue<>(Execution.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_STATE_CHANGE_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionStateChange> executionStateChange() {
|
||||
return new H2Queue<>(ExecutionStateChange.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.ExecutionRunning',
|
||||
'io.kestra.core.runners.ExecutionStateChange'
|
||||
) NOT NULL;
|
||||
@@ -33,6 +33,14 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
|
||||
return new MysqlQueue<>(Execution.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_STATE_CHANGE_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionStateChange> executionStateChange() {
|
||||
return new MysqlQueue<>(ExecutionStateChange.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.ExecutionRunning',
|
||||
'io.kestra.core.runners.ExecutionStateChange'
|
||||
) NOT NULL;
|
||||
@@ -33,6 +33,14 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
|
||||
return new PostgresQueue<>(Execution.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_STATE_CHANGE_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionStateChange> executionStateChange() {
|
||||
return new PostgresQueue<>(ExecutionStateChange.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTOR_NAMED)
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.ExecutionStateChange';
|
||||
@@ -82,6 +82,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
private QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_STATE_CHANGE_NAMED)
|
||||
private QueueInterface<ExecutionStateChange> executionStateChangeQueue;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
|
||||
private QueueInterface<WorkerJob> workerJobQueue;
|
||||
@@ -308,6 +312,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
));
|
||||
this.receiveCancellations.addFirst(this.executionStateChangeQueue.receive(Executor.class, this::executionStateChangeQueue));
|
||||
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
|
||||
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
|
||||
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
|
||||
@@ -1078,11 +1083,12 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
}
|
||||
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
|
||||
|
||||
// purge the executionQueue
|
||||
// purge the executionQueue and the executionStateChangeQueue
|
||||
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
|
||||
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
|
||||
if (cleanExecutionQueue && isTerminated) {
|
||||
((JdbcQueue<Execution>) executionQueue).deleteByKey(executor.getExecution().getId());
|
||||
((JdbcQueue<ExecutionStateChange>) executionStateChangeQueue).deleteByKey(executor.getExecution().getId());
|
||||
}
|
||||
|
||||
// emit for other consumers than the executor if no failure
|
||||
@@ -1093,79 +1099,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
}
|
||||
|
||||
Execution execution = executor.getExecution();
|
||||
// handle flow triggers on state change
|
||||
if (!execution.getState().getCurrent().equals(executor.getOriginalState())) {
|
||||
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage))
|
||||
.forEach(throwConsumer(executionFromFlowTrigger -> this.executionQueue.emit(executionFromFlowTrigger)));
|
||||
}
|
||||
|
||||
// handle actions on terminated state
|
||||
if (isTerminated) {
|
||||
// if there is a parent, we send a subflow execution result to it
|
||||
if (ExecutableUtils.isSubflow(execution)) {
|
||||
// locate the parent execution to find the parent task run
|
||||
String parentExecutionId = (String) execution.getTrigger().getVariables().get("executionId");
|
||||
String taskRunId = (String) execution.getTrigger().getVariables().get("taskRunId");
|
||||
String taskId = (String) execution.getTrigger().getVariables().get("taskId");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> outputs = (Map<String, Object>) execution.getTrigger().getVariables().get("taskRunOutputs");
|
||||
Variables variables = variablesService.of(StorageContext.forExecution(executor.getExecution()), outputs);
|
||||
SubflowExecutionEnd subflowExecutionEnd = new SubflowExecutionEnd(executor.getExecution(), parentExecutionId, taskRunId, taskId, execution.getState().getCurrent(), variables);
|
||||
this.subflowExecutionEndQueue.emit(subflowExecutionEnd);
|
||||
}
|
||||
|
||||
// purge SLA monitors
|
||||
if (!ListUtils.isEmpty(executor.getFlow().getSla()) && executor.getFlow().getSla().stream().anyMatch(ExecutionMonitoringSLA.class::isInstance)) {
|
||||
slaMonitorStorage.purge(executor.getExecution().getId());
|
||||
}
|
||||
|
||||
// purge execution running
|
||||
if (executor.getFlow().getConcurrency() != null) {
|
||||
executionRunningStorage.remove(execution);
|
||||
}
|
||||
|
||||
// check if there exist a queued execution and submit it to the execution queue
|
||||
if (executor.getFlow().getConcurrency() != null && executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
|
||||
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
|
||||
executor.getFlow().getNamespace(),
|
||||
executor.getFlow().getId(),
|
||||
throwConsumer(queued -> {
|
||||
var newExecution = queued.withState(State.Type.RUNNING);
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(newExecution.getTenantId())
|
||||
.namespace(newExecution.getNamespace())
|
||||
.flowId(newExecution.getFlowId())
|
||||
.execution(newExecution)
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
|
||||
.build();
|
||||
executionRunningStorage.save(executionRunning);
|
||||
executionQueue.emit(newExecution);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
// purge the trigger: reset scheduler trigger at end
|
||||
if (execution.getTrigger() != null) {
|
||||
FlowWithSource flow = executor.getFlow();
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.ifPresent(trigger -> {
|
||||
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
|
||||
});
|
||||
}
|
||||
|
||||
// Purge the workerTaskResultQueue and the workerJobQueue
|
||||
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
|
||||
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
|
||||
// If any of these assumptions changed, this code would not be safe anymore.
|
||||
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
|
||||
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
|
||||
.map(taskRun -> taskRun.getId())
|
||||
.toList();
|
||||
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
|
||||
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
|
||||
}
|
||||
// send a message to the executionStateChange queue for post-execution actions if the state change or the execution is terminated
|
||||
// we must always send terminated execution to the queue or afterExecution execution update would not be detected.
|
||||
if (!execution.getState().getCurrent().equals(executor.getOriginalState()) || isTerminated) {
|
||||
executionStateChangeQueue.emit(new ExecutionStateChange(execution, executor.getOriginalState(), execution.getState().getCurrent()));
|
||||
}
|
||||
} catch (QueueException e) {
|
||||
if (!ignoreFailure) {
|
||||
@@ -1323,6 +1260,102 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
});
|
||||
}
|
||||
|
||||
private void executionStateChangeQueue(Either<ExecutionStateChange, DeserializationException> either) {
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize an execution state change: {}", either.getRight().getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
final ExecutionStateChange executionStateChange = either.getLeft();
|
||||
final Execution execution = executionStateChange.getExecution();
|
||||
|
||||
if (skipExecutionService.skipExecution(execution)) {
|
||||
log.warn("Skipping execution {}", execution.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage))
|
||||
.forEach(throwConsumer(executionFromFlowTrigger -> this.executionQueue.emit(executionFromFlowTrigger)));
|
||||
|
||||
FlowWithSource flow = findFlow(execution);
|
||||
boolean isTerminated = executionService.isTerminated(flow, execution);
|
||||
|
||||
// handle actions on terminated state
|
||||
if (isTerminated) {
|
||||
// if there is a parent, we send a subflow execution result to it
|
||||
if (ExecutableUtils.isSubflow(execution)) {
|
||||
// locate the parent execution to find the parent task run
|
||||
String parentExecutionId = (String) execution.getTrigger().getVariables().get("executionId");
|
||||
String taskRunId = (String) execution.getTrigger().getVariables().get("taskRunId");
|
||||
String taskId = (String) execution.getTrigger().getVariables().get("taskId");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> outputs = (Map<String, Object>) execution.getTrigger().getVariables().get("taskRunOutputs");
|
||||
Variables variables = variablesService.of(StorageContext.forExecution(execution), outputs);
|
||||
SubflowExecutionEnd subflowExecutionEnd = new SubflowExecutionEnd(execution, parentExecutionId, taskRunId, taskId, execution.getState().getCurrent(), variables);
|
||||
this.subflowExecutionEndQueue.emit(subflowExecutionEnd);
|
||||
}
|
||||
|
||||
// purge SLA monitors
|
||||
if (!ListUtils.isEmpty(flow.getSla()) && flow.getSla().stream().anyMatch(ExecutionMonitoringSLA.class::isInstance)) {
|
||||
slaMonitorStorage.purge(execution.getId());
|
||||
}
|
||||
|
||||
// purge execution running
|
||||
if (flow.getConcurrency() != null) {
|
||||
executionRunningStorage.remove(execution);
|
||||
}
|
||||
|
||||
// check if there exist a queued execution and submit it to the execution queue
|
||||
if (flow.getConcurrency() != null && flow.getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
|
||||
executionQueuedStorage.pop(flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
throwConsumer(queued -> {
|
||||
var newExecution = queued.withState(State.Type.RUNNING);
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(newExecution.getTenantId())
|
||||
.namespace(newExecution.getNamespace())
|
||||
.flowId(newExecution.getFlowId())
|
||||
.execution(newExecution)
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
|
||||
.build();
|
||||
executionRunningStorage.save(executionRunning);
|
||||
executionQueue.emit(newExecution);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
|
||||
// send an execution state change event so we can use a flow trigger on the queued state
|
||||
executionStateChangeQueue.emit(new ExecutionStateChange(newExecution, State.Type.QUEUED, State.Type.RUNNING));
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
// purge the trigger: reset scheduler trigger at end
|
||||
if (execution.getTrigger() != null) {
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.ifPresent(trigger -> {
|
||||
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
|
||||
});
|
||||
}
|
||||
|
||||
// Purge the workerTaskResultQueue and the workerJobQueue
|
||||
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
|
||||
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
|
||||
// If any of these assumptions changed, this code would not be safe anymore.
|
||||
if (cleanWorkerJobQueue && !ListUtils.isEmpty(execution.getTaskRunList())) {
|
||||
List<String> taskRunKeys = execution.getTaskRunList().stream()
|
||||
.map(taskRun -> taskRun.getId())
|
||||
.toList();
|
||||
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
|
||||
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
|
||||
}
|
||||
}
|
||||
} catch (QueueException e) {
|
||||
//FIXME
|
||||
}
|
||||
}
|
||||
|
||||
private boolean deduplicateNexts(Execution execution, ExecutorState executorState, List<TaskRun> taskRuns) {
|
||||
return taskRuns
|
||||
.stream()
|
||||
|
||||
Reference in New Issue
Block a user