feat(trigger): add support for concurrent trigger execution (#311)

Fixes: #311
This commit is contained in:
Florian Hussonnois
2025-12-08 10:58:10 +01:00
committed by Florian Hussonnois
parent b6e4df8de2
commit 216b124294
6 changed files with 31 additions and 17 deletions

View File

@@ -82,6 +82,12 @@ abstract public class AbstractTrigger implements TriggerInterface {
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
private boolean failOnTriggerError = false;
@PluginProperty(group = PluginProperty.CORE_GROUP)
@Schema(
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
)
private boolean allowConcurrent = false;
/**
* For backward compatibility: we rename minLogLevel to logLevel.
* @deprecated use {@link #logLevel} instead

View File

@@ -16,8 +16,8 @@ import java.util.function.Function;
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
Optional<Trigger> findLast(TriggerContext trigger);
Optional<Trigger> findByExecution(Execution execution);
Optional<Trigger> findByUid(String uid);
List<Trigger> findAll(String tenantId);
List<Trigger> findAllForAllTenants();

View File

@@ -170,10 +170,11 @@ class JsonSchemaGeneratorTest {
Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class);
assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf(
Matchers.aMapWithSize(3),
Matchers.aMapWithSize(4),
hasKey("conditions"),
hasKey("stopAfter"),
hasKey("type")
hasKey("type"),
hasKey("allowConcurrent")
));
});
}

View File

@@ -72,12 +72,12 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
@Override
public Optional<Trigger> findLast(TriggerContext trigger) {
return findOne(DSL.trueCondition(), field("key").eq(trigger.uid()));
return findByUid(trigger.uid());
}
@Override
public Optional<Trigger> findByExecution(Execution execution) {
return findOne(execution.getTenantId(), field("execution_id").eq(execution.getId()));
public Optional<Trigger> findByUid(String uid) {
return findOne(DSL.trueCondition(), field("key").eq(uid));
}
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) {

View File

@@ -13,6 +13,7 @@ import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueException;
@@ -1138,9 +1139,7 @@ public class JdbcExecutor implements ExecutorInterface {
execution.getTrigger().getId()
);
} else {
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
}
}
@@ -1241,11 +1240,7 @@ public class JdbcExecutor implements ExecutorInterface {
// purge the trigger: reset scheduler trigger at end
if (execution.getTrigger() != null) {
FlowWithSource flow = executor.getFlow();
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> {
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
});
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
}
// Purge the workerTaskResultQueue and the workerJobQueue

View File

@@ -288,7 +288,7 @@ public abstract class AbstractScheduler implements Scheduler {
disableInvalidTrigger(workerTriggerResult.getTriggerContext(), e);
return;
}
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate);
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate, workerTriggerResult.getTrigger());
} else {
ZonedDateTime nextExecutionDate;
try {
@@ -768,7 +768,7 @@ public abstract class AbstractScheduler implements Scheduler {
}
private void handleEvaluateWorkerTriggerResult(SchedulerExecutionWithTrigger result, ZonedDateTime
nextExecutionDate) {
nextExecutionDate, AbstractTrigger abstractTrigger) {
Optional.ofNullable(result)
.ifPresent(executionWithTrigger -> {
log(executionWithTrigger);
@@ -779,6 +779,12 @@ public abstract class AbstractScheduler implements Scheduler {
nextExecutionDate
);
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
// i.e., the trigger will not be locked
if (abstractTrigger.isAllowConcurrent()) {
trigger = trigger.toBuilder().executionId(null).build();
}
// Worker triggers result is evaluated in another thread with the workerTriggerResultQueue.
// We can then update the trigger directly.
this.saveLastTriggerAndEmitExecution(executionWithTrigger.getExecution(), trigger, triggerToSave -> this.triggerState.update(triggerToSave));
@@ -800,6 +806,12 @@ public abstract class AbstractScheduler implements Scheduler {
if (result.getExecution().getState().getCurrent() == State.Type.FAILED) {
trigger = trigger.resetExecution(State.Type.FAILED);
}
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
// i.e., the trigger will not be locked
if (((AbstractTrigger)schedule).isAllowConcurrent()) {
trigger = trigger.toBuilder().executionId(null).build();
}
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
// So we must save them by passing the scheduleContext.