chore(scheduler): revert back changes at handle() and create failed execution with emitting it directly from the catch block

This commit is contained in:
mustafatarek
2025-12-12 15:59:46 +02:00
committed by Loïc Mathieu
parent 41a11abf16
commit 8e7ad9ae25

View File

@@ -712,10 +712,19 @@ public abstract class AbstractScheduler implements Scheduler {
// This is the Schedule, all other triggers should have an interval.
// So we evaluate it now as there is no need to send it to the worker.
// Schedule didn't use the triggerState to allow backfill.
Optional<SchedulerExecutionWithTrigger> schedulerExecutionWithTrigger = evaluateScheduleTrigger(f, scheduleContext);
Optional<SchedulerExecutionWithTrigger> schedulerExecutionWithTrigger = evaluateScheduleTrigger(f);
if (schedulerExecutionWithTrigger.isPresent()) {
this.handleEvaluateSchedulingTriggerResult(schedule, schedulerExecutionWithTrigger.get(), f.getConditionContext(), scheduleContext);
}
else{
// compute next date and save the trigger to avoid evaluating it each second
Trigger trigger = Trigger.fromEvaluateFailed(
f.getTriggerContext(),
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
);
trigger = trigger.checkBackfill();
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
}
} else {
Logs.logTrigger(
f.getTriggerContext(),
@@ -920,7 +929,7 @@ public abstract class AbstractScheduler implements Scheduler {
return ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS);
}
private Optional<SchedulerExecutionWithTrigger> evaluateScheduleTrigger(FlowWithWorkerTrigger flowWithTrigger, ScheduleContextInterface scheduleContext) {
private Optional<SchedulerExecutionWithTrigger> evaluateScheduleTrigger(FlowWithWorkerTrigger flowWithTrigger) {
return metricRegistry.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION_DESCRIPTION, metricRegistry.tags(flowWithTrigger.getAbstractTrigger()))
.record(() -> {
try {
@@ -956,13 +965,13 @@ public abstract class AbstractScheduler implements Scheduler {
));
} catch (Exception e) {
logError(flowWithTrigger, e);
handleFailedEvaluatedTrigger(flowWithTrigger, scheduleContext, e );
Execution failedExecution = createFailedExecution( flowWithTrigger, e);
this.emitExecution(failedExecution, flowWithTrigger.getTriggerContext());
return Optional.empty();
}
});
}
void handleFailedEvaluatedTrigger(FlowWithWorkerTrigger flowWithTrigger, ScheduleContextInterface scheduleContext, Throwable e ){
private Execution createFailedExecution(FlowWithWorkerTrigger flowWithTrigger, Throwable e){
Execution execution = Execution.builder()
.id(IdUtils.create())
.tenantId(flowWithTrigger.getTriggerContext().getTenantId())
@@ -974,6 +983,11 @@ public abstract class AbstractScheduler implements Scheduler {
.build();
Logger logger = runContextFactory.of(flowWithTrigger.getFlow(), execution).logger();
logger.error("[trigger: {}] [date: {}] Evaluate Failed with error '{}'" , flowWithTrigger.getAbstractTrigger().getId(), now(), e.getMessage());
return execution;
}
private void handleFailedEvaluatedTrigger(FlowWithWorkerTrigger flowWithTrigger, ScheduleContextInterface scheduleContext, Throwable e ){
Execution execution = createFailedExecution(flowWithTrigger, e);
ZonedDateTime nextExecutionDate;
try {
nextExecutionDate = this.nextEvaluationDate(flowWithTrigger.getAbstractTrigger());