mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
fix(system): avoid trigger locking after scheduler restart
closes #11434
This commit is contained in:
@@ -301,6 +301,8 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
// Initialized local trigger state,
|
||||
// and if some flows were created outside the box, for example from the CLI,
|
||||
// then we may have some triggers that are not created yet.
|
||||
/* FIXME: There is a race between Kafka stream consumption & initializedTriggers: we can override a trigger update coming from a stream consumption with an old one because stream consumption is not waiting for trigger initialization
|
||||
* Example: we see a SUCCESS execution so we reset the trigger's executionId but then the initializedTriggers resubmits an old trigger state for some reasons (evaluationDate for eg.) */
|
||||
private void initializedTriggers(List<FlowWithSource> flows) {
|
||||
record FlowAndTrigger(FlowWithSource flow, AbstractTrigger trigger) {
|
||||
@Override
|
||||
@@ -371,10 +373,13 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
|
||||
this.triggerState.update(lastUpdate);
|
||||
}
|
||||
} else if (recoverMissedSchedules == RecoverMissedSchedules.NONE) {
|
||||
lastUpdate = trigger.get().toBuilder().nextExecutionDate(schedule.nextEvaluationDate()).build();
|
||||
} else {
|
||||
ZonedDateTime nextEvaluationDate = schedule.nextEvaluationDate();
|
||||
if (recoverMissedSchedules == RecoverMissedSchedules.NONE && !Objects.equals(trigger.get().getNextExecutionDate(), nextEvaluationDate)) {
|
||||
lastUpdate = trigger.get().toBuilder().nextExecutionDate(nextEvaluationDate).build();
|
||||
|
||||
this.triggerState.update(lastUpdate);
|
||||
this.triggerState.update(lastUpdate);
|
||||
}
|
||||
}
|
||||
// Used for schedulableNextDate
|
||||
FlowWithWorkerTrigger flowWithWorkerTrigger = FlowWithWorkerTrigger.builder()
|
||||
|
||||
Reference in New Issue
Block a user