Revert "fix(executions): evaluate multiple conditions in a separate queue"

This reverts commit e8cc0fe2e5.
This commit is contained in:
Loïc Mathieu
2025-10-07 10:15:06 +02:00
parent 826d245fad
commit 0e81417ccc
14 changed files with 3 additions and 179 deletions

View File

@@ -27,7 +27,6 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
QueueInterface<Execution> execution();
@@ -62,5 +61,4 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
}

View File

@@ -1,13 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.utils.IdUtils;
public record MultipleConditionEvent(Flow flow, Execution execution) implements HasUID {
@Override
public String uid() {
return IdUtils.fromParts(flow.uidWithoutRevision(), execution.getId());
}
}

View File

@@ -33,6 +33,7 @@ public class FlowTriggerService {
@Inject
private FlowService flowService;
// used in EE only
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
return allFlows
.filter(flow -> !flow.isDisabled())

View File

@@ -10,8 +10,6 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.data.model.Pageable;
import java.time.Duration;
import java.util.List;
import java.util.Map;

View File

@@ -1,13 +0,0 @@
id: flow-trigger-for-each-item-child
namespace: io.kestra.tests.trigger.foreachitem
tasks:
- id: write_file
type: io.kestra.plugin.core.storage.Write
content: Hello World
extension: .txt
outputs:
- id: myFile
type: FILE
value: "{{ outputs.write_file.uri }}"

View File

@@ -1,30 +0,0 @@
id: flow-trigger-for-each-item-grandchild
namespace: io.kestra.tests.trigger.foreachitem
inputs:
- id: testFile
type: FILE
tasks:
- id: test_if_empty
type: io.kestra.plugin.core.flow.If
condition: "{{ isFileEmpty(inputs.testFile) }}"
then:
- id: empty_file
type: io.kestra.plugin.core.log.Log
message: "I am empty inside"
else:
- id: not_empty_file
type: io.kestra.plugin.core.log.Log
message: "{{ read(inputs.testFile) }}"
triggers:
- id: 01_complete
type: io.kestra.plugin.core.trigger.Flow
inputs:
testFile: "{{ trigger.outputs.myFile }}"
preconditions:
id: output_01_success
flows:
- namespace: io.kestra.tests.trigger.foreachitem
flowId: flow-trigger-for-each-item-child
states: [SUCCESS]

View File

@@ -1,18 +0,0 @@
id: flow-trigger-for-each-item-parent
namespace: io.kestra.tests.trigger.foreachitem
tasks:
- id: manifest
type: io.kestra.plugin.core.storage.Write
content: |-
0
1
2
3
4
extension: .txt
- id: forEachItem
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.manifest.uri }}"
namespace: io.kestra.tests.trigger.foreachitem
flowId: flow-trigger-for-each-item-child

View File

@@ -160,11 +160,4 @@ public class H2QueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new H2Queue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<MultipleConditionEvent> multipleConditionEvent() {
return new H2Queue<>(MultipleConditionEvent.class, applicationContext);
}
}

View File

@@ -1,20 +0,0 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning',
'io.kestra.core.runners.MultipleConditionEvent'
) NOT NULL

View File

@@ -160,11 +160,4 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<MultipleConditionEvent> multipleConditionEvent() {
return new MysqlQueue<>(MultipleConditionEvent.class, applicationContext);
}
}

View File

@@ -1,20 +0,0 @@
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning',
'io.kestra.core.runners.MultipleConditionEvent'
) NOT NULL;

View File

@@ -160,11 +160,4 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<MultipleConditionEvent> multipleConditionEvent() {
return new PostgresQueue<>(MultipleConditionEvent.class, applicationContext);
}
}

View File

@@ -1 +0,0 @@
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.MultipleConditionEvent';

View File

@@ -14,7 +14,6 @@ import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
@@ -117,10 +116,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Named(QueueFactoryInterface.CLUSTER_EVENT_NAMED)
private Optional<QueueInterface<ClusterEvent>> clusterEventQueue;
@Inject
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
private QueueInterface<MultipleConditionEvent> multipleConditionEventQueue;
@Inject
private RunContextFactory runContextFactory;
@@ -287,7 +282,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, this::multipleConditionEventQueue));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
ScheduledFuture<?> scheduledDelayFuture = scheduledDelay.scheduleAtFixedRate(
@@ -385,24 +379,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
log.info("Executor started with {} thread(s)", numberOfThreads);
}
private void multipleConditionEventQueue(Either<MultipleConditionEvent, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a multiple condition event: {}", either.getRight().getMessage());
return;
}
MultipleConditionEvent multipleConditionEvent = either.getLeft();
flowTriggerService.computeExecutionsFromFlowTriggers(multipleConditionEvent.execution(), List.of(multipleConditionEvent.flow()), Optional.of(multipleConditionStorage))
.forEach(exec -> {
try {
executionQueue.emit(exec);
} catch (QueueException e) {
log.error("Unable to emit the execution {}", exec.getId(), e);
}
});
}
private void clusterEventQueue(Either<ClusterEvent, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a cluster event: {}", either.getRight().getMessage());
@@ -1034,7 +1010,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
Execution execution = executor.getExecution();
// handle flow triggers on state change
if (!execution.getState().getCurrent().equals(executor.getOriginalState())) {
processFlowTriggers(execution);
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows.stream().map(flow -> flow.toFlow()).toList(), Optional.of(multipleConditionStorage))
.forEach(throwConsumer(executionFromFlowTrigger -> this.executionQueue.emit(executionFromFlowTrigger)));
}
// handle actions on terminated state
@@ -1103,20 +1080,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
}
private void processFlowTriggers(Execution execution) throws QueueException {
// directly process simple conditions
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
.filter(f ->ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().noneMatch(c -> c instanceof MultipleCondition) && f.getTrigger().getPreconditions() == null)
.flatMap(f -> flowTriggerService.computeExecutionsFromFlowTriggers(execution, List.of(f.getFlow()), Optional.empty()).stream())
.forEach(throwConsumer(exec -> executionQueue.emit(exec)));
// send multiple conditions to the multiple condition queue for later processing
flowTriggerService.withFlowTriggersOnly(allFlows.stream())
.filter(f -> ListUtils.emptyOnNull(f.getTrigger().getConditions()).stream().anyMatch(c -> c instanceof MultipleCondition) || f.getTrigger().getPreconditions() != null)
.map(f -> new MultipleConditionEvent(f.getFlow(), execution))
.forEach(throwConsumer(multipleCondition -> multipleConditionEventQueue.emit(multipleCondition)));
}
private FlowWithSource transform(FlowWithSource flow, Execution execution) {
if (templateExecutorInterface.isPresent()) {
try {