diff --git a/core/src/main/java/io/kestra/core/models/triggers/AbstractTrigger.java b/core/src/main/java/io/kestra/core/models/triggers/AbstractTrigger.java index 31b2187b2d..110bfb8072 100644 --- a/core/src/main/java/io/kestra/core/models/triggers/AbstractTrigger.java +++ b/core/src/main/java/io/kestra/core/models/triggers/AbstractTrigger.java @@ -82,6 +82,12 @@ abstract public class AbstractTrigger implements TriggerInterface { @PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP) private boolean failOnTriggerError = false; + @PluginProperty(group = PluginProperty.CORE_GROUP) + @Schema( + title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress." + ) + private boolean allowConcurrent = false; + /** * For backward compatibility: we rename minLogLevel to logLevel. * @deprecated use {@link #logLevel} instead diff --git a/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java index 3c3fbb757a..1c9d111375 100644 --- a/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java @@ -16,8 +16,8 @@ import java.util.function.Function; public interface TriggerRepositoryInterface extends QueryBuilderInterface { Optional findLast(TriggerContext trigger); - Optional findByExecution(Execution execution); - + Optional findByUid(String uid); + List findAll(String tenantId); List findAllForAllTenants(); diff --git a/core/src/test/java/io/kestra/core/docs/JsonSchemaGeneratorTest.java b/core/src/test/java/io/kestra/core/docs/JsonSchemaGeneratorTest.java index b12cfdf476..54e6eca604 100644 --- a/core/src/test/java/io/kestra/core/docs/JsonSchemaGeneratorTest.java +++ b/core/src/test/java/io/kestra/core/docs/JsonSchemaGeneratorTest.java @@ -170,10 +170,11 @@ class JsonSchemaGeneratorTest { Map jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class); assertThat((Map) jsonSchema.get("properties"), allOf( - Matchers.aMapWithSize(3), + Matchers.aMapWithSize(4), hasKey("conditions"), hasKey("stopAfter"), - hasKey("type") + hasKey("type"), + hasKey("allowConcurrent") )); }); } diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java index 5d2c973a08..d40b840be4 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java @@ -72,12 +72,12 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo @Override public Optional findLast(TriggerContext trigger) { - return findOne(DSL.trueCondition(), field("key").eq(trigger.uid())); + return findByUid(trigger.uid()); } @Override - public Optional findByExecution(Execution execution) { - return findOne(execution.getTenantId(), field("execution_id").eq(execution.getId())); + public Optional findByUid(String uid) { + return findOne(DSL.trueCondition(), field("key").eq(uid)); } public List findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) { diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index cac17e7dc4..2d0a1e169a 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -13,6 +13,7 @@ import io.kestra.core.models.tasks.ExecutableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.WorkerGroup; import io.kestra.core.models.topologies.FlowTopology; +import io.kestra.core.models.triggers.Trigger; import io.kestra.core.models.triggers.multipleflows.MultipleCondition; import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface; import io.kestra.core.queues.QueueException; @@ -1138,9 +1139,7 @@ public class JdbcExecutor implements ExecutorInterface { execution.getTrigger().getId() ); } else { - triggerRepository - .findByExecution(execution) - .ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger))); + triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger))); } } @@ -1241,11 +1240,7 @@ public class JdbcExecutor implements ExecutorInterface { // purge the trigger: reset scheduler trigger at end if (execution.getTrigger() != null) { FlowWithSource flow = executor.getFlow(); - triggerRepository - .findByExecution(execution) - .ifPresent(trigger -> { - this.triggerState.update(executionService.resetExecution(flow, execution, trigger)); - }); + triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger))); } // Purge the workerTaskResultQueue and the workerJobQueue diff --git a/scheduler/src/main/java/io/kestra/scheduler/AbstractScheduler.java b/scheduler/src/main/java/io/kestra/scheduler/AbstractScheduler.java index 00054dd58a..27028f2f7d 100644 --- a/scheduler/src/main/java/io/kestra/scheduler/AbstractScheduler.java +++ b/scheduler/src/main/java/io/kestra/scheduler/AbstractScheduler.java @@ -288,7 +288,7 @@ public abstract class AbstractScheduler implements Scheduler { disableInvalidTrigger(workerTriggerResult.getTriggerContext(), e); return; } - this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate); + this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate, workerTriggerResult.getTrigger()); } else { ZonedDateTime nextExecutionDate; try { @@ -768,7 +768,7 @@ public abstract class AbstractScheduler implements Scheduler { } private void handleEvaluateWorkerTriggerResult(SchedulerExecutionWithTrigger result, ZonedDateTime - nextExecutionDate) { + nextExecutionDate, AbstractTrigger abstractTrigger) { Optional.ofNullable(result) .ifPresent(executionWithTrigger -> { log(executionWithTrigger); @@ -779,6 +779,12 @@ public abstract class AbstractScheduler implements Scheduler { nextExecutionDate ); + // if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state + // i.e., the trigger will not be locked + if (abstractTrigger.isAllowConcurrent()) { + trigger = trigger.toBuilder().executionId(null).build(); + } + // Worker triggers result is evaluated in another thread with the workerTriggerResultQueue. // We can then update the trigger directly. this.saveLastTriggerAndEmitExecution(executionWithTrigger.getExecution(), trigger, triggerToSave -> this.triggerState.update(triggerToSave)); @@ -800,6 +806,12 @@ public abstract class AbstractScheduler implements Scheduler { if (result.getExecution().getState().getCurrent() == State.Type.FAILED) { trigger = trigger.resetExecution(State.Type.FAILED); } + + // if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state + // i.e., the trigger will not be locked + if (((AbstractTrigger)schedule).isAllowConcurrent()) { + trigger = trigger.toBuilder().executionId(null).build(); + } // Schedule triggers are being executed directly from the handle method within the context where triggers are locked. // So we must save them by passing the scheduleContext.