fix(system): trigger an execution once per condition on flow triggers

Fixes #12560
This commit is contained in:
Loïc Mathieu
2025-11-05 10:58:26 +01:00
parent ca3e765e58
commit 9d3d40ade8
6 changed files with 64 additions and 3 deletions

View File

@@ -267,6 +267,12 @@ public abstract class AbstractRunnerTest {
multipleConditionTriggerCaseTest.flowTriggerMultiplePreconditions();
}
@Test
@LoadFlows({"flows/valids/flow-trigger-multiple-conditions-flow-a.yaml", "flows/valids/flow-trigger-multiple-conditions-flow-listen.yaml"})
void flowTriggerMultipleConditions() throws Exception {
multipleConditionTriggerCaseTest.flowTriggerMultipleConditions();
}
@Test
@LoadFlows({"flows/valids/each-null.yaml"})
void eachWithNull() throws Exception {

View File

@@ -212,4 +212,24 @@ public class MultipleConditionTriggerCaseTest {
e -> e.getState().getCurrent().equals(Type.SUCCESS),
MAIN_TENANT, "io.kestra.tests.trigger.multiple.preconditions", "flow-trigger-multiple-preconditions-flow-listen", Duration.ofSeconds(1)));
}
public void flowTriggerMultipleConditions() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.multiple.conditions",
"flow-trigger-multiple-conditions-flow-a");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// trigger is done
Execution triggerExecution = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS),
MAIN_TENANT, "io.kestra.tests.trigger.multiple.conditions", "flow-trigger-multiple-conditions-flow-listen");
executionRepository.delete(triggerExecution);
assertThat(triggerExecution.getTaskRunList().size()).isEqualTo(1);
assertThat(triggerExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// we assert that we didn't have any other flow triggered
assertThrows(RuntimeException.class, () -> runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS),
MAIN_TENANT, "io.kestra.tests.trigger.multiple.conditions", "flow-trigger-multiple-conditions-flow-listen", Duration.ofSeconds(1)));
}
}

View File

@@ -0,0 +1,10 @@
id: flow-trigger-multiple-conditions-flow-a
namespace: io.kestra.tests.trigger.multiple.conditions
labels:
some: label
tasks:
- id: only
type: io.kestra.plugin.core.debug.Return
format: "from parents: {{execution.id}}"

View File

@@ -0,0 +1,23 @@
id: flow-trigger-multiple-conditions-flow-listen
namespace: io.kestra.tests.trigger.multiple.conditions
triggers:
- id: on_completion
type: io.kestra.plugin.core.trigger.Flow
states: [ SUCCESS ]
conditions:
- type: io.kestra.plugin.core.condition.ExecutionFlow
namespace: io.kestra.tests.trigger.multiple.conditions
flowId: flow-trigger-multiple-conditions-flow-a
- id: on_failure
type: io.kestra.plugin.core.trigger.Flow
states: [ FAILED ]
conditions:
- type: io.kestra.plugin.core.condition.ExecutionFlow
namespace: io.kestra.tests.trigger.multiple.conditions
flowId: flow-trigger-multiple-conditions-flow-a
tasks:
- id: only
type: io.kestra.plugin.core.debug.Return
format: "It works"

View File

@@ -1230,8 +1230,10 @@ public class JdbcExecutor implements ExecutorInterface {
private void processFlowTriggers(Execution execution) throws QueueException {
// directly process simple conditions
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
.filter(f ->ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f.getFlow()), Optional.empty()).stream())
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.map(f -> f.getFlow())
.distinct() // as computeExecutionsFromFlowTriggers is based on flow, we must map FlowWithFlowTrigger to a flow and distinct to avoid multiple execution for the same flow
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f), Optional.empty()).stream())
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
// send multiple conditions to the multiple condition queue for later processing

View File

@@ -531,7 +531,7 @@ class FlowControllerTest {
List<String> namespaces = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/main/flows/distinct-namespaces"), Argument.listOf(String.class));
assertThat(namespaces.size()).isEqualTo(11);
assertThat(namespaces.size()).isEqualTo(12);
}
@Test