fix(scheduler): emit ExecutionKilled event when real-time trigger is deleted

This commit is contained in:
Florian Hussonnois
2025-12-16 17:41:29 +01:00
committed by Florian Hussonnois
parent b4a87c3ef3
commit 6599ce59df
6 changed files with 132 additions and 36 deletions

View File

@@ -42,16 +42,17 @@ public final class TriggerState implements TriggerId {
private final int vnode;
private final boolean locked;
private final String workerId;
private final TriggerType type;
// the last-event id that mutate this state.
private final EventId lastEventId;
@JsonProperty
public Long getNextEvaluationEpoch() {
return Optional.ofNullable(nextEvaluationDate)
.map(Instant::toEpochMilli)
.orElse(null);
}
public TriggerContext context() {
return TriggerContext.builder()
.tenantId(tenantId)
@@ -64,22 +65,22 @@ public final class TriggerState implements TriggerId {
.disabled(disabled)
.build();
}
/**
* Factory method for constructing a new {@link TriggerState}.
*
* @return a new {@link TriggerState}
*/
public static TriggerState of(FlowId flowId, AbstractTrigger trigger, Integer vnode) {
return of(TriggerId.of(flowId, trigger), trigger.getStopAfter(), trigger.isDisabled(), vnode);
return of(TriggerId.of(flowId, trigger), TriggerType.from(trigger), trigger.getStopAfter(), trigger.isDisabled(), vnode);
}
/**
* Factory method for constructing a new {@link TriggerState}.
*
* @return a new {@link TriggerState}
*/
public static TriggerState of(TriggerId id, List<State.Type> stopAfter, Boolean disabled, Integer vnode) {
public static TriggerState of(TriggerId id, TriggerType type, List<State.Type> stopAfter, Boolean disabled, Integer vnode) {
return new TriggerState(
id.getTenantId(),
id.getNamespace(),
@@ -94,10 +95,11 @@ public final class TriggerState implements TriggerId {
vnode,
false,
null,
type,
null
);
}
/**
* Updates this trigger state based on the trigger.
*
@@ -105,9 +107,13 @@ public final class TriggerState implements TriggerId {
* @return a new {@link TriggerState}
*/
public TriggerState update(Clock clock, AbstractTrigger trigger) {
return update(clock).stopAfter(trigger.getStopAfter()).disabled(trigger.isDisabled()).build();
return update(clock)
.stopAfter(trigger.getStopAfter())
.disabled(trigger.isDisabled())
.type(TriggerType.from(trigger))
.build();
}
/**
* Updates the vNode of this trigger state.
*
@@ -127,7 +133,7 @@ public final class TriggerState implements TriggerId {
public TriggerState evaluatedAt(final Clock clock, final ZonedDateTime evaluatedAt) {
return evaluatedAt(clock, evaluatedAt.toInstant());
}
/**
* Updates this trigger state with the given {@literal evaluatedAt}.
*
@@ -137,7 +143,7 @@ public final class TriggerState implements TriggerId {
public TriggerState evaluatedAt(final Clock clock, final Instant evaluatedAt) {
return update(clock).evaluatedAt(evaluatedAt).build();
}
/**
* Disabled this trigger state.
*
@@ -147,7 +153,7 @@ public final class TriggerState implements TriggerId {
public TriggerState disabled(final Clock clock, boolean disabled) {
return update(clock).disabled(disabled).build();
}
/**
* Attaches a worker-id to this trigger state.
*
@@ -157,7 +163,7 @@ public final class TriggerState implements TriggerId {
public TriggerState workerId(final Clock clock, String workerId) {
return update(clock).workerId(workerId).build();
}
/**
* Locks this trigger state.
*
@@ -178,7 +184,7 @@ public final class TriggerState implements TriggerId {
public TriggerState updateForNextEvaluationDate(final Clock clock, final ZonedDateTime nextEvaluationDate) {
return updateForNextEvaluationDate(clock, nextEvaluationDate.toInstant());
}
/**
* Updates this trigger state for the given {@code nextEvaluationDate}.
*
@@ -192,12 +198,12 @@ public final class TriggerState implements TriggerId {
.backfill(getBackFillForNextEvaluationDate(nextEvaluationDate))
.build();
}
/**
* Updates this trigger state for the given {@link Backfill}.
*
* @param clock the scheduler clock.
* @param backfill the backfill.
* @param clock the scheduler clock.
* @param backfill the backfill.
* @return a new {@link TriggerState}
*/
public TriggerState backfill(final Clock clock, Backfill backfill) {
@@ -211,7 +217,7 @@ public final class TriggerState implements TriggerId {
}
return update(clock).backfill(backfill).build();
}
/**
* Updates this trigger state for the given {@link Execution}.
*
@@ -222,18 +228,18 @@ public final class TriggerState implements TriggerId {
public TriggerState updateForExecution(final Clock clock, final Execution execution) {
return updateForExecutionState(clock, execution.getState().getCurrent());
}
/**
* Updates this trigger state for the given executions.
*
* @param clock the scheduler clock.
* @param state the execution state.
* @param clock the scheduler clock.
* @param state the execution state.
* @return a new {@link TriggerState}
*/
public TriggerState updateForExecutionState(final Clock clock, final State.Type state) {
// switch disabled automatically if the executionEndState is one of the stopAfter states
boolean disabled = getStopAfter() != null ? getStopAfter().contains(state) : isDisabled();
return update(clock).disabled(disabled).build();
}
@@ -250,7 +256,7 @@ public final class TriggerState implements TriggerId {
.workerId(null)
.build();
}
/**
* Sets the tenant of this trigger state.
*
@@ -261,6 +267,7 @@ public final class TriggerState implements TriggerId {
.tenantId(tenantId)
.build();
}
/**
* Sets the tenant of this trigger state.
*
@@ -271,7 +278,7 @@ public final class TriggerState implements TriggerId {
.lastEventId(eventId)
.build();
}
private Backfill getBackFillForNextEvaluationDate(final Instant nextEvaluationDate) {
final ZonedDateTime localNextEvaluationDate = toZonedDateTime(nextEvaluationDate);
if (backfill != null && !backfill.getPaused()) {
@@ -284,11 +291,10 @@ public final class TriggerState implements TriggerId {
return backfill;
}
private static ZonedDateTime toZonedDateTime(Instant instant) {
return Optional.ofNullable(instant).map(it -> it.atZone(SchedulerClock.getClock().getZone())).orElse(null);
}
private TriggerStateBuilder update(final Clock clock) {
return TriggerState.builder()
.tenantId(tenantId)
@@ -304,6 +310,7 @@ public final class TriggerState implements TriggerId {
.workerId(workerId)
.vnode(vnode)
.disabled(disabled)
.type(type)
.lastEventId(lastEventId);
}

View File

@@ -0,0 +1,30 @@
package io.kestra.core.scheduler.model;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.RealtimeTriggerInterface;
import io.kestra.core.models.triggers.Schedulable;
/**
* Types of triggers supported by the scheduler.
*/
public enum TriggerType {
SCHEDULE,
POLLING,
REALTIME;
/**
* Resolves the trigger type for the given trigger class.
*
* @param trigger the trigger object.
* @return the {@link TriggerType} of {@code null} if the trigger is not supported.
*/
public static TriggerType from(final AbstractTrigger trigger) {
return switch (trigger) {
case Schedulable unused -> SCHEDULE;
case RealtimeTriggerInterface unused -> REALTIME;
case PollingTriggerInterface unused -> POLLING;
case null, default -> null;
};
}
}

View File

@@ -3,12 +3,17 @@ package io.kestra.scheduler;
import io.kestra.core.events.EventId;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledTrigger;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Backfill;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.scheduler.events.CreateBackfillTrigger;
@@ -24,6 +29,7 @@ import io.kestra.core.scheduler.events.TriggerExecutionTerminated;
import io.kestra.core.scheduler.events.TriggerReceived;
import io.kestra.core.scheduler.events.TriggerUpdated;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.core.scheduler.model.TriggerType;
import io.kestra.core.services.ConditionService;
import io.kestra.core.utils.Logs;
import io.kestra.scheduler.internals.NextEvaluationDate;
@@ -31,6 +37,7 @@ import io.kestra.scheduler.pubsub.TriggerExecutionPublisher;
import io.kestra.scheduler.stores.FlowMetaStore;
import io.kestra.scheduler.stores.TriggerStateStore;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
@@ -55,18 +62,21 @@ public class TriggerEventHandler {
private final TriggerExecutionPublisher triggerExecutionPublisher;
private final RunContextFactory runContextFactory;
private final ConditionService conditionService;
private final QueueInterface<ExecutionKilled> executionKilledQueue;
@Inject
public TriggerEventHandler(TriggerStateStore triggerStateStore,
FlowMetaStore flowStateStore,
TriggerExecutionPublisher triggerExecutionPublisher,
RunContextFactory runContextFactory,
ConditionService conditionService) {
ConditionService conditionService,
@Named(QueueFactoryInterface.KILL_NAMED) QueueInterface<ExecutionKilled> executionKilledQueue) {
this.triggerStateStore = triggerStateStore;
this.flowStateStore = flowStateStore;
this.triggerExecutionPublisher = triggerExecutionPublisher;
this.conditionService = conditionService;
this.runContextFactory = runContextFactory;
this.executionKilledQueue = executionKilledQueue;
}
/**
@@ -290,7 +300,27 @@ public class TriggerEventHandler {
* @param event the event.
*/
void onTriggerDeleted(TriggerDeleted event) {
triggerStateStore.delete(event.id());
triggerStateStore.find(event.id()).ifPresent(state -> {
triggerStateStore.delete(event.id());
maySendExecutionKilled(event, state);
});
}
private void maySendExecutionKilled(TriggerDeleted event, TriggerState state) {
if (TriggerType.REALTIME.equals(state.getType())) {
try {
this.executionKilledQueue.emit(ExecutionKilledTrigger
.builder()
.tenantId(state.getTenantId())
.namespace(state.getNamespace())
.flowId(state.getFlowId())
.triggerId(state.getTriggerId())
.build()
);
} catch (QueueException e) {
Logs.logTrigger(event.id(), Level.WARN, "Cannot kill a real-time trigger, it will continue processing until Kestra is restarted. Cause: {}", e.getMessage(), e);
}
}
}
/**
@@ -302,7 +332,7 @@ public class TriggerEventHandler {
Pair<Flow, AbstractTrigger> data = findTrigger(event, event.revision());
if (data.getRight() != null) {
TriggerState state = TriggerState
.of(event.id(), data.getRight().getStopAfter(), data.getRight().isDisabled(), vNode)
.of(event.id(), TriggerType.from(data.getRight()), data.getRight().getStopAfter(), data.getRight().isDisabled(), vNode)
.lastEventId(clock, event.eventId());
triggerStateStore.save(state);
}

View File

@@ -3,6 +3,7 @@ package io.kestra.scheduler;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.lock.LockService;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.scheduler.SchedulerClock;
import io.kestra.core.scheduler.SchedulerConfiguration;
@@ -38,6 +39,7 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.time.Duration;
@@ -293,7 +295,8 @@ class DefaultSchedulerTest {
flowMetaStore,
triggerExecutionPublisher,
runContextFactory,
conditionService
conditionService,
Mockito.mock(QueueInterface.class)
);
}

View File

@@ -2,10 +2,13 @@ package io.kestra.scheduler;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Backfill;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.scheduler.SchedulerClock;
import io.kestra.core.scheduler.events.CreateBackfillTrigger;
@@ -19,6 +22,7 @@ import io.kestra.core.scheduler.events.TriggerEvaluated;
import io.kestra.core.scheduler.events.TriggerExecutionTerminated;
import io.kestra.core.scheduler.events.TriggerUpdated;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.core.scheduler.model.TriggerType;
import io.kestra.core.services.ConditionService;
import io.kestra.core.utils.IdUtils;
import io.kestra.scheduler.utils.CollectorTriggerExecutionPublisher;
@@ -27,6 +31,7 @@ import io.kestra.scheduler.utils.InMemoryTriggerStateStore;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.time.Clock;
import java.time.Instant;
@@ -56,13 +61,15 @@ class TriggerEventHandlerTest {
private InMemoryTriggerStateStore triggerStateStore;
private CollectorTriggerExecutionPublisher triggerExecutionPublisher;
private QueueInterface<ExecutionKilled> executionKilledQueue;
@BeforeEach
void setUp() {
triggerExecutionPublisher = new CollectorTriggerExecutionPublisher();
triggerStateStore = new InMemoryTriggerStateStore();
triggerId = Fixtures.triggerId();
triggerState = TriggerState.of(triggerId, null, false, 0);
triggerState = TriggerState.of(triggerId, TriggerType.SCHEDULE, null, false, 0);
executionKilledQueue = Mockito.mock(QueueInterface.class);
}
TriggerEventHandler newTriggerEventHandler(List<FlowWithSource> flows) {
@@ -71,7 +78,8 @@ class TriggerEventHandlerTest {
new InMemoryFlowMetaStore(TEST_VNODE_COUNT, flows),
triggerExecutionPublisher,
runContextFactory,
conditionService
conditionService,
executionKilledQueue
);
}
@@ -106,7 +114,7 @@ class TriggerEventHandlerTest {
}
@Test
void shouldDeleteTriggerGivenTriggerDeletedEventWhenHandled() {
void shouldDeleteTriggerGivenTriggerDeletedEventWhenHandled() throws QueueException {
// GIVEN
triggerStateStore.save(triggerState);
handler = newTriggerEventHandler(List.of());
@@ -117,8 +125,25 @@ class TriggerEventHandlerTest {
// THEN
assertThat(triggerStateStore.find(triggerId)).isEmpty();
Mockito.verify(executionKilledQueue, Mockito.never()).emit(Mockito.any(ExecutionKilled.class));
}
@Test
void shouldSendKillGivenTriggerDeletedEventForRealTimeTriggerWhenHandled() throws QueueException {
// GIVEN
triggerStateStore.save(TriggerState.of(triggerId, TriggerType.REALTIME, null, false, 0));
handler = newTriggerEventHandler(List.of());
TriggerDeleted event = new TriggerDeleted(triggerId);
// WHEN
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
assertThat(triggerStateStore.find(triggerId)).isEmpty();
Mockito.verify(executionKilledQueue, Mockito.only()).emit(Mockito.any(ExecutionKilled.class));
}
@Test
void shouldUpdateTriggerGivenExistingFlowWhenTriggerUpdated() {
// GIVEN

View File

@@ -19,6 +19,7 @@ import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.scheduler.SchedulerClock;
import io.kestra.core.scheduler.SchedulerConfiguration;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.core.scheduler.model.TriggerType;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.plugin.core.condition.DayWeekInMonth;
@@ -311,7 +312,7 @@ class TriggerSchedulerTest {
);
// Create an initial state with a prior evaluation date
TriggerState initialState = TriggerState
.of(Fixtures.triggerId(), List.of(), false, 0)
.of(Fixtures.triggerId(), TriggerType.SCHEDULE, List.of(), false, 0)
.evaluatedAt(SchedulerClock.getClock(), SchedulerClock.now().minusMinutes(15))
.updateForNextEvaluationDate(SchedulerClock.getClock(), SchedulerClock.now());
triggerStateStore.save(initialState);
@@ -371,7 +372,7 @@ class TriggerSchedulerTest {
);
// Create an initial state with a prior evaluation date
TriggerState initialState = TriggerState
.of(Fixtures.triggerId(), List.of(), false, 0)
.of(Fixtures.triggerId(), TriggerType.SCHEDULE, List.of(), false, 0)
.evaluatedAt(SchedulerClock.getClock(), SchedulerClock.now().minusMinutes(15))
.updateForNextEvaluationDate(SchedulerClock.getClock(), SchedulerClock.now());
triggerStateStore.save(initialState);
@@ -433,7 +434,7 @@ class TriggerSchedulerTest {
);
// Create an initial state with a prior evaluation date
TriggerState initialState = TriggerState
.of(Fixtures.triggerId(), List.of(), false, 0)
.of(Fixtures.triggerId(), TriggerType.SCHEDULE, List.of(), false, 0)
.evaluatedAt(SchedulerClock.getClock(), SchedulerClock.now().minusMinutes(15))
.updateForNextEvaluationDate(SchedulerClock.getClock(), SchedulerClock.now());
triggerStateStore.save(initialState);