fix(core): add failed execution with logs when scheduled triggers fail during evaluation

This commit is contained in:
mustafatarek
2025-12-11 22:40:54 +02:00
committed by Loïc Mathieu
parent eae5eb80cb
commit 12b89588a6

View File

@@ -712,17 +712,9 @@ public abstract class AbstractScheduler implements Scheduler {
// This is the Schedule, all other triggers should have an interval.
// So we evaluate it now as there is no need to send it to the worker.
// Schedule didn't use the triggerState to allow backfill.
Optional<SchedulerExecutionWithTrigger> schedulerExecutionWithTrigger = evaluateScheduleTrigger(f);
Optional<SchedulerExecutionWithTrigger> schedulerExecutionWithTrigger = evaluateScheduleTrigger(f, scheduleContext);
if (schedulerExecutionWithTrigger.isPresent()) {
this.handleEvaluateSchedulingTriggerResult(schedule, schedulerExecutionWithTrigger.get(), f.getConditionContext(), scheduleContext);
} else {
// compute next date and save the trigger to avoid evaluating it each second
Trigger trigger = Trigger.fromEvaluateFailed(
f.getTriggerContext(),
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
);
trigger = trigger.checkBackfill();
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
}
} else {
Logs.logTrigger(
@@ -750,26 +742,7 @@ public abstract class AbstractScheduler implements Scheduler {
// validate schedule condition can fail to render variables
// in this case, we send a failed execution so the trigger is not evaluated each second.
logger.error("Unable to evaluate the trigger '{}'", f.getAbstractTrigger().getId(), ie);
Execution execution = Execution.builder()
.id(IdUtils.create())
.tenantId(f.getTriggerContext().getTenantId())
.namespace(f.getTriggerContext().getNamespace())
.flowId(f.getTriggerContext().getFlowId())
.flowRevision(f.getFlow().getRevision())
.labels(LabelService.labelsExcludingSystem(f.getFlow()))
.state(new State().withState(State.Type.FAILED))
.build();
ZonedDateTime nextExecutionDate;
try {
nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
} catch (InvalidTriggerConfigurationException e2) {
logError(f, e2);
disableInvalidTrigger(f, e2);
return;
}
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
handleFailedEvaluatedTrigger(f, scheduleContext, ie);
}
});
});
@@ -947,7 +920,7 @@ public abstract class AbstractScheduler implements Scheduler {
return ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS);
}
private Optional<SchedulerExecutionWithTrigger> evaluateScheduleTrigger(FlowWithWorkerTrigger flowWithTrigger) {
private Optional<SchedulerExecutionWithTrigger> evaluateScheduleTrigger(FlowWithWorkerTrigger flowWithTrigger, ScheduleContextInterface scheduleContext) {
return metricRegistry.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION_DESCRIPTION, metricRegistry.tags(flowWithTrigger.getAbstractTrigger()))
.record(() -> {
try {
@@ -983,11 +956,37 @@ public abstract class AbstractScheduler implements Scheduler {
));
} catch (Exception e) {
logError(flowWithTrigger, e);
handleFailedEvaluatedTrigger(flowWithTrigger, scheduleContext, e );
return Optional.empty();
}
});
}
void handleFailedEvaluatedTrigger(FlowWithWorkerTrigger flowWithTrigger, ScheduleContextInterface scheduleContext, Throwable e ){
Execution execution = Execution.builder()
.id(IdUtils.create())
.tenantId(flowWithTrigger.getTriggerContext().getTenantId())
.namespace(flowWithTrigger.getTriggerContext().getNamespace())
.flowId(flowWithTrigger.getTriggerContext().getFlowId())
.flowRevision(flowWithTrigger.getFlow().getRevision())
.labels(LabelService.labelsExcludingSystem(flowWithTrigger.getFlow()))
.state(new State().withState(State.Type.FAILED))
.build();
Logger logger = runContextFactory.of(flowWithTrigger.getFlow(), execution).logger();
logger.error("[trigger: {}] [date: {}] Evaluate Failed with error '{}'" , flowWithTrigger.getAbstractTrigger().getId(), now(), e.getMessage());
ZonedDateTime nextExecutionDate;
try {
nextExecutionDate = this.nextEvaluationDate(flowWithTrigger.getAbstractTrigger());
} catch (InvalidTriggerConfigurationException e2) {
logError(flowWithTrigger, e2);
disableInvalidTrigger(flowWithTrigger, e2);
return;
}
var trigger = flowWithTrigger.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
}
private void logError(FlowWithWorkerTrigger flowWithWorkerTriggerNextDate, Throwable e) {
Logger logger = flowWithWorkerTriggerNextDate.getConditionContext().getRunContext().logger();