fix(executions): evaluate multiple conditions in a separate queue

By evaluating multiple condition in a separate queue, we serialize their evaluation which avoir races when we compute the outputs for flow triggers.
This is because evaluation is a multi step process: first you get the existing condtion, then you evaluate, then you store the result. As this is not guarded by a lock you must not do it concurrently.

The race can still occurs if muiltiple executors run but this is less probable. A re-implementation would be needed probably in 2.0 for that.

Fixes https://github.com/kestra-io/kestra-ee/issues/4602
This commit is contained in:
Loïc Mathieu
2025-10-03 11:08:05 +02:00
parent 650e8fac17
commit 5efeae2a94
13 changed files with 181 additions and 3 deletions

View File

@@ -29,6 +29,7 @@ public interface QueueFactoryInterface {
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
QueueInterface<Execution> execution();
@@ -61,4 +62,6 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
}

View File

@@ -0,0 +1,13 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.utils.IdUtils;
public record MultipleConditionEvent(Flow flow, Execution execution) implements HasUID {
@Override
public String uid() {
return IdUtils.fromParts(flow.uidWithoutRevision(), execution.getId());
}
}

View File

@@ -34,7 +34,6 @@ public class FlowTriggerService {
this.flowService = flowService;
}
// used in EE only
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
return allFlows
.filter(flow -> !flow.isDisabled())

View File

@@ -0,0 +1,13 @@
id: flow-trigger-for-each-item-child
namespace: io.kestra.tests.trigger.foreachitem
tasks:
- id: write_file
type: io.kestra.plugin.core.storage.Write
content: Hello World
extension: .txt
outputs:
- id: myFile
type: FILE
value: "{{ outputs.write_file.uri }}"

View File

@@ -0,0 +1,30 @@
id: flow-trigger-for-each-item-grandchild
namespace: io.kestra.tests.trigger.foreachitem
inputs:
- id: testFile
type: FILE
tasks:
- id: test_if_empty
type: io.kestra.plugin.core.flow.If
condition: "{{ isFileEmpty(inputs.testFile) }}"
then:
- id: empty_file
type: io.kestra.plugin.core.log.Log
message: "I am empty inside"
else:
- id: not_empty_file
type: io.kestra.plugin.core.log.Log
message: "{{ read(inputs.testFile) }}"
triggers:
- id: 01_complete
type: io.kestra.plugin.core.trigger.Flow
inputs:
testFile: "{{ trigger.outputs.myFile }}"
preconditions:
id: output_01_success
flows:
- namespace: io.kestra.tests.trigger.foreachitem
flowId: flow-trigger-for-each-item-child
states: [SUCCESS]

View File

@@ -0,0 +1,18 @@
id: flow-trigger-for-each-item-parent
namespace: io.kestra.tests.trigger.foreachitem
tasks:
- id: manifest
type: io.kestra.plugin.core.storage.Write
content: |-
0
1
2
3
4
extension: .txt
- id: forEachItem
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.manifest.uri }}"
namespace: io.kestra.tests.trigger.foreachitem
flowId: flow-trigger-for-each-item-child

View File

@@ -152,4 +152,12 @@ public class H2QueueFactory implements QueueFactoryInterface {
public QueueInterface<ExecutionRunning> executionRunning() {
return new H2Queue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<MultipleConditionEvent> multipleConditionEvent() {
return new H2Queue<>(MultipleConditionEvent.class, applicationContext);
}
}

View File

@@ -0,0 +1,20 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning',
'io.kestra.core.runners.MultipleConditionEvent'
) NOT NULL

View File

@@ -152,4 +152,12 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
public QueueInterface<ExecutionRunning> executionRunning() {
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<MultipleConditionEvent> multipleConditionEvent() {
return new MysqlQueue<>(MultipleConditionEvent.class, applicationContext);
}
}

View File

@@ -0,0 +1,20 @@
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning',
'io.kestra.core.runners.MultipleConditionEvent'
) NOT NULL;

View File

@@ -152,4 +152,12 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
public QueueInterface<ExecutionRunning> executionRunning() {
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<MultipleConditionEvent> multipleConditionEvent() {
return new PostgresQueue<>(MultipleConditionEvent.class, applicationContext);
}
}

View File

@@ -0,0 +1 @@
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.MultipleConditionEvent';

View File

@@ -12,6 +12,7 @@ import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
@@ -119,6 +120,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
private QueueInterface<ExecutionRunning> executionRunningQueue;
@Inject
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
private QueueInterface<MultipleConditionEvent> multipleConditionEventQueue;
@Inject
private RunContextFactory runContextFactory;
@@ -313,6 +318,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, this::multipleConditionEventQueue));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
executionDelayFuture = scheduledDelay.scheduleAtFixedRate(
@@ -414,6 +420,24 @@ public class JdbcExecutor implements ExecutorInterface, Service {
log.info("Executor started with {} thread(s)", numberOfThreads);
}
private void multipleConditionEventQueue(Either<MultipleConditionEvent, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a multiple condition event: {}", either.getRight().getMessage());
return;
}
MultipleConditionEvent multipleConditionEvent = either.getLeft();
flowTriggerService.computeExecutionsFromFlowTriggers(multipleConditionEvent.execution(), List.of(multipleConditionEvent.flow()), Optional.of(multipleConditionStorage))
.forEach(exec -> {
try {
executionQueue.emit(exec);
} catch (QueueException e) {
log.error("Unable to emit the execution {}", exec.getId(), e);
}
});
}
private void clusterEventQueue(Either<ClusterEvent, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a cluster event: {}", either.getRight().getMessage());
@@ -1096,8 +1120,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
Execution execution = executor.getExecution();
// handle flow triggers on state change
if (!execution.getState().getCurrent().equals(executor.getOriginalState())) {
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage))
.forEach(throwConsumer(executionFromFlowTrigger -> this.executionQueue.emit(executionFromFlowTrigger)));
processFlowTriggers(execution);
}
// handle actions on terminated state
@@ -1184,6 +1207,20 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
}
private void processFlowTriggers(Execution execution) throws QueueException {
// directly process simple conditions
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
.filter(f ->ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f.getFlow()), Optional.empty()).stream())
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
// send multiple conditions to the multiple condition queue for later processing
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().anyMatch(c -> c instanceof MultipleCondition) || f.getTrigger().getPreconditions() != null)
.map(f -> new MultipleConditionEvent(f.getFlow(), execution))
.forEach(throwConsumer(multipleCondition -> multipleConditionEventQueue.emit(multipleCondition)));
}
private FlowWithSource findFlow(Execution execution) {
FlowInterface flow = this.flowMetaStore.findByExecution(execution).orElseThrow();
FlowWithSource flowWithSource = pluginDefaultService.injectDefaults(flow, execution);