mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
fix(scheduler): better handling of locked triggers (#3603)
This commit is contained in:
@@ -373,6 +373,11 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.conditionContext(flowWithTriggers.getConditionContext())
|
||||
.triggerContext(flowWithTriggers.TriggerContext.toBuilder().date(now()).stopAfter(flowWithTriggers.getAbstractTrigger().getStopAfter()).build())
|
||||
.build())
|
||||
.peek(f -> {
|
||||
if (f.getTriggerContext().getEvaluateRunningDate() != null || isExecutionNotRunning(f)) {
|
||||
this.triggerState.unlock(f.getTriggerContext());
|
||||
}
|
||||
})
|
||||
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
|
||||
.filter(this::isExecutionNotRunning)
|
||||
.map(FlowWithPollingTriggerNextDate::of)
|
||||
|
||||
@@ -26,6 +26,13 @@ public interface SchedulerTriggerStateInterface {
|
||||
|
||||
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||
|
||||
// Required for Kafka
|
||||
/**
|
||||
* Required for Kafka
|
||||
*/
|
||||
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||
|
||||
/**
|
||||
* Required for Kafka
|
||||
*/
|
||||
void unlock(Trigger trigger);
|
||||
}
|
||||
|
||||
@@ -84,4 +84,7 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
|
||||
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock(Trigger trigger) {}
|
||||
}
|
||||
|
||||
@@ -79,4 +79,7 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
|
||||
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock(Trigger trigger) {}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user