fix(executions): evaluate multiple conditions in a separate queue

By evaluating multiple condition in a separate queue, we serialize their evaluation which avoir races when we compute the outputs for flow triggers.
This is because evaluation is a multi step process: first you get the existing condtion, then you evaluate, then you store the result. As this is not guarded by a lock you must not do it concurrently.

The race can still occurs if muiltiple executors run but this is less probable. A re-implementation would be needed probably in 2.0 for that.

Fixes https://github.com/kestra-io/kestra-ee/issues/4602
This commit is contained in:
Loïc Mathieu
2025-10-02 12:12:48 +02:00
parent 8fee5fc172
commit 5c24308e71
15 changed files with 212 additions and 5 deletions

View File

@@ -27,6 +27,7 @@ public interface QueueFactoryInterface {
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
QueueInterface<Execution> execution();
@@ -59,4 +60,6 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
}

View File

@@ -0,0 +1,13 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.utils.IdUtils;
public record MultipleConditionEvent(Flow flow, Execution execution) implements HasUID {
@Override
public String uid() {
return IdUtils.fromParts(flow.uidWithoutRevision(), execution.getId());
}
}

View File

@@ -257,6 +257,12 @@ public abstract class AbstractRunnerTest {
multipleConditionTriggerCaseTest.flowTriggerOnPaused();
}
@Test
@LoadFlows({"flows/valids/flow-trigger-for-each-item-parent.yaml", "flows/valids/flow-trigger-for-each-item-child.yaml", "flows/valids/flow-trigger-for-each-item-grandchild.yaml"})
void forEachItemWithFlowTrigger() throws Exception {
multipleConditionTriggerCaseTest.forEachItemWithFlowTrigger();
}
@Test
@LoadFlows({"flows/valids/each-null.yaml"})
void eachWithNull() throws Exception {

View File

@@ -10,6 +10,8 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.data.model.Pageable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@@ -166,4 +168,26 @@ public class MultipleConditionTriggerCaseTest {
assertThat(triggerExecution.getTaskRunList().size()).isEqualTo(1);
assertThat(triggerExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
public void forEachItemWithFlowTrigger() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.foreachitem",
"flow-trigger-for-each-item-parent");
assertThat(execution.getTaskRunList().size()).isEqualTo(5);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// trigger is done
List<Execution> childExecutions = runnerUtils.awaitFlowExecutionNumber(5, MAIN_TENANT, "io.kestra.tests.trigger.foreachitem", "flow-trigger-for-each-item-child");
assertThat(childExecutions).hasSize(5);
childExecutions.forEach(exec -> {
assertThat(exec.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(exec.getTaskRunList().size()).isEqualTo(1);
});
List<Execution> grandchildExecutions = runnerUtils.awaitFlowExecutionNumber(5, MAIN_TENANT, "io.kestra.tests.trigger.foreachitem", "flow-trigger-for-each-item-grandchild");
assertThat(grandchildExecutions).hasSize(5);
grandchildExecutions.forEach(exec -> {
assertThat(exec.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(exec.getTaskRunList().size()).isEqualTo(2);
});
}
}

View File

@@ -0,0 +1,13 @@
id: flow-trigger-for-each-item-child
namespace: io.kestra.tests.trigger.foreachitem
tasks:
- id: write_file
type: io.kestra.plugin.core.storage.Write
content: Hello World
extension: .txt
outputs:
- id: myFile
type: FILE
value: "{{ outputs.write_file.uri }}"

View File

@@ -0,0 +1,30 @@
id: flow-trigger-for-each-item-grandchild
namespace: io.kestra.tests.trigger.foreachitem
inputs:
- id: testFile
type: FILE
tasks:
- id: test_if_empty
type: io.kestra.plugin.core.flow.If
condition: "{{ isFileEmpty(inputs.testFile) }}"
then:
- id: empty_file
type: io.kestra.plugin.core.log.Log
message: "I am empty inside"
else:
- id: not_empty_file
type: io.kestra.plugin.core.log.Log
message: "{{ read(inputs.testFile) }}"
triggers:
- id: 01_complete
type: io.kestra.plugin.core.trigger.Flow
inputs:
testFile: "{{ trigger.outputs.myFile }}"
preconditions:
id: output_01_success
flows:
- namespace: io.kestra.tests.trigger.foreachitem
flowId: flow-trigger-for-each-item-child
states: [SUCCESS]

View File

@@ -0,0 +1,18 @@
id: flow-trigger-for-each-item-parent
namespace: io.kestra.tests.trigger.foreachitem
tasks:
- id: manifest
type: io.kestra.plugin.core.storage.Write
content: |-
0
1
2
3
4
extension: .txt
- id: forEachItem
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.manifest.uri }}"
namespace: io.kestra.tests.trigger.foreachitem
flowId: flow-trigger-for-each-item-child

View File

@@ -35,7 +35,6 @@ public class FlowTriggerService {
this.flowService = flowService;
}
// used in EE only
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
return allFlows
.filter(flow -> !flow.isDisabled())

View File

@@ -152,4 +152,12 @@ public class H2QueueFactory implements QueueFactoryInterface {
public QueueInterface<ExecutionRunning> executionRunning() {
return new H2Queue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<MultipleConditionEvent> multipleConditionEvent() {
return new H2Queue<>(MultipleConditionEvent.class, applicationContext);
}
}

View File

@@ -0,0 +1,20 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning',
'io.kestra.core.runners.MultipleConditionEvent'
) NOT NULL

View File

@@ -152,4 +152,12 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
public QueueInterface<ExecutionRunning> executionRunning() {
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<MultipleConditionEvent> multipleConditionEvent() {
return new MysqlQueue<>(MultipleConditionEvent.class, applicationContext);
}
}

View File

@@ -0,0 +1,20 @@
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning',
'io.kestra.core.runners.MultipleConditionEvent'
) NOT NULL;

View File

@@ -152,4 +152,12 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
public QueueInterface<ExecutionRunning> executionRunning() {
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<MultipleConditionEvent> multipleConditionEvent() {
return new PostgresQueue<>(MultipleConditionEvent.class, applicationContext);
}
}

View File

@@ -0,0 +1 @@
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.MultipleConditionEvent';

View File

@@ -12,6 +12,7 @@ import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
@@ -120,6 +121,10 @@ public class JdbcExecutor implements ExecutorInterface {
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
private QueueInterface<ExecutionRunning> executionRunningQueue;
@Inject
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
private QueueInterface<MultipleConditionEvent> multipleConditionEventQueue;
@Inject
private RunContextFactory runContextFactory;
@@ -314,6 +319,7 @@ public class JdbcExecutor implements ExecutorInterface {
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, this::multipleConditionEventQueue));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
executionDelayFuture = scheduledDelay.scheduleAtFixedRate(
@@ -415,6 +421,24 @@ public class JdbcExecutor implements ExecutorInterface {
log.info("Executor started with {} thread(s)", numberOfThreads);
}
private void multipleConditionEventQueue(Either<MultipleConditionEvent, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a multiple condition event: {}", either.getRight().getMessage());
return;
}
MultipleConditionEvent multipleConditionEvent = either.getLeft();
flowTriggerService.computeExecutionsFromFlowTriggers(multipleConditionEvent.execution(), List.of(multipleConditionEvent.flow()), Optional.of(multipleConditionStorage))
.forEach(exec -> {
try {
executionQueue.emit(exec);
} catch (QueueException e) {
log.error("Unable to emit the execution {}", exec.getId(), e);
}
});
}
private void clusterEventQueue(Either<ClusterEvent, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a cluster event: {}", either.getRight().getMessage());
@@ -1111,8 +1135,7 @@ public class JdbcExecutor implements ExecutorInterface {
Execution execution = executor.getExecution();
// handle flow triggers on state change
if (!execution.getState().getCurrent().equals(executor.getOriginalState())) {
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage))
.forEach(throwConsumer(executionFromFlowTrigger -> this.executionQueue.emit(executionFromFlowTrigger)));
processFlowTriggers(execution);
}
// handle actions on terminated state
@@ -1160,8 +1183,7 @@ public class JdbcExecutor implements ExecutorInterface {
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
// process flow triggers to allow listening on RUNNING state after a QUEUED state
flowTriggerService.computeExecutionsFromFlowTriggers(newExecution, allFlows, Optional.of(multipleConditionStorage))
.forEach(throwConsumer(executionFromFlowTrigger -> this.executionQueue.emit(executionFromFlowTrigger)));
processFlowTriggers(newExecution);
})
);
}
@@ -1206,6 +1228,20 @@ public class JdbcExecutor implements ExecutorInterface {
}
}
private void processFlowTriggers(Execution execution) throws QueueException {
// directly process simple conditions
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
.filter(f ->ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f.getFlow()), Optional.empty()).stream())
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
// send multiple conditions to the multiple condition queue for later processing
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().anyMatch(c -> c instanceof MultipleCondition) || f.getTrigger().getPreconditions() != null)
.map(f -> new MultipleConditionEvent(f.getFlow(), execution))
.forEach(throwConsumer(multipleCondition -> multipleConditionEventQueue.emit(multipleCondition)));
}
private FlowWithSource findFlow(Execution execution) {
FlowInterface flow = this.flowMetaStore.findByExecution(execution).orElseThrow();
FlowWithSource flowWithSource = pluginDefaultService.injectDefaults(flow, execution);