fix(scheduler): ensure events are not handled twice on queue re-consumption

Adds EventId to all trigger events and keep track of the last event
that modified a trigger state
This commit is contained in:
Florian Hussonnois
2025-12-03 09:50:07 +01:00
committed by Loïc Mathieu
parent 3252b695bc
commit 638d9979fd
21 changed files with 401 additions and 87 deletions

View File

@@ -0,0 +1,83 @@
package io.kestra.core.events;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedEpochGenerator;
import java.util.UUID;
/**
* Strongly-typed wrapper around a UUIDv7 identifier used for Kestra events.
* <p>
* UUIDv7 values are time-ordered, which allows lexicographic and unsigned
* 128-bit comparison to reflect chronological ordering.
*/
public record EventId(UUID value) implements Comparable<EventId> {
// Generator that generates UUID using version 7 (Unix Epoch time+random based).
private static final TimeBasedEpochGenerator GENERATOR = Generators.timeBasedEpochGenerator();
public EventId {
if (value == null) {
throw new IllegalArgumentException("EventId UUID cannot be null");
}
}
/**
* Factory method for creating a new {@link EventId}.
*
* @return a new {@link EventId}.
*/
public static EventId create() {
return new EventId(GENERATOR.generate());
}
@JsonCreator
public static EventId fromString(String str) {
return new EventId(UUID.fromString(str));
}
/**
* Compares two UUIDv7 values chronologically. UUIDv7 ordering corresponds
* to treating the UUID as a 128-bit unsigned integer.
*
* @param other the other {@code EventId} to compare against
* @return a negative value if this ID is older; zero if equal; positive if newer
*/
@Override
public int compareTo(EventId other) {
int cmp = Long.compareUnsigned(this.value.getMostSignificantBits(), other.value.getMostSignificantBits());
if (cmp != 0) return cmp;
return Long.compareUnsigned(this.value.getLeastSignificantBits(), other.value.getLeastSignificantBits());
}
/**
* Checks whether this ID is chronologically newer (greater) than the given ID.
*
* @param other the ID to compare against
* @return {@code true} if this ID is newer; {@code false} otherwise
*/
public boolean isNewerThan(final EventId other) {
return this.compareTo(other) > 0;
}
/**
* Checks whether this ID is chronologically older (less) than the given ID.
*
* @param other the ID to compare against
* @return {@code true} if this ID is older; {@code false} otherwise
*/
public boolean isOlderThan(final EventId other) {
return this.compareTo(other) < 0;
}
/**
* Returns the string representation of the underlying UUID.
*
* @return the UUID string
*/
@Override
public String toString() {
return value.toString();
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.core.scheduler.events;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.events.EventId;
import io.kestra.core.models.Label;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
@@ -19,11 +20,12 @@ import java.util.Map;
public record CreateBackfillTrigger(
TriggerId id,
Backfill backfill,
Instant timestamp
Instant timestamp,
EventId eventId
) implements TriggerEvent {
public CreateBackfillTrigger(TriggerId id, Backfill backfill) {
this(id, backfill, Instant.now());
this(id, backfill, Instant.now(), EventId.create());
}
public record Backfill(

View File

@@ -1,5 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.triggers.TriggerId;
import java.time.Instant;
@@ -9,7 +10,11 @@ import java.time.Instant;
*/
public record DeleteBackfillTrigger(
TriggerId id,
Instant timestamp
Instant timestamp,
EventId eventId
) implements TriggerEvent {
public DeleteBackfillTrigger(TriggerId id) {
this(id, Instant.now(), EventId.create());
}
}

View File

@@ -1,6 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.triggers.TriggerId;
import java.time.Instant;
@@ -10,10 +10,11 @@ import java.time.Instant;
*/
public record ResetTrigger(
TriggerId id,
Instant timestamp
Instant timestamp,
EventId eventId
) implements TriggerEvent {
public ResetTrigger(TriggerId id) {
this(id, Instant.now());
this(id, Instant.now(), EventId.create());
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.triggers.TriggerId;
import java.time.Instant;
@@ -9,11 +10,12 @@ import java.time.Instant;
*/
public record SetDisableTrigger(
TriggerId id,
boolean disabled,
Instant timestamp,
boolean disabled
EventId eventId
) implements TriggerEvent {
public SetDisableTrigger(TriggerId id, Boolean disabled) {
this(id, Instant.now(), disabled);
this(id, disabled, Instant.now(), EventId.create());
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.triggers.TriggerId;
import java.time.Instant;
@@ -9,11 +10,12 @@ import java.time.Instant;
*/
public record SetPauseBackfillTrigger(
TriggerId id,
boolean pause,
Instant timestamp,
boolean pause
EventId eventId
) implements TriggerEvent {
public SetPauseBackfillTrigger(TriggerId id, boolean pause) {
this(id, Instant.now(), pause);
this(id, pause, Instant.now(), EventId.create());
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.triggers.TriggerId;
import java.time.Instant;
@@ -9,11 +10,12 @@ import java.time.Instant;
*/
public record TriggerCreated(
TriggerId id,
int revision,
Instant timestamp,
int revision
EventId eventId
) implements TriggerEvent {
public TriggerCreated(TriggerId id, int revision) {
this(id, Instant.now(), revision);
this(id, revision, Instant.now(), EventId.create());
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.triggers.TriggerId;
import java.time.Instant;
@@ -9,9 +10,10 @@ import java.time.Instant;
*/
public record TriggerDeleted(
TriggerId id,
Instant timestamp
Instant timestamp,
EventId eventId
) implements TriggerEvent {
public TriggerDeleted(TriggerId id) {
this(id, Instant.now());
this(id, Instant.now(), EventId.create());
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.TriggerId;
@@ -12,11 +13,12 @@ public record TriggerEvaluated(
TriggerId id,
// TODO we could have a dedicated class to simplify the model
Execution execution,
Instant timestamp
Instant timestamp,
EventId eventId
) implements TriggerEvent {
public TriggerEvaluated(TriggerId id, Execution execution) {
this(id, execution, Instant.now());
this(id, execution, Instant.now(), EventId.create());
}
}

View File

@@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.kestra.core.events.EventId;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.utils.Enums;
@@ -33,20 +34,30 @@ import java.util.Map;
@JsonSubTypes.Type(value = TriggerEvent.Invalid.class, name = "INVALID"),
})
public interface TriggerEvent extends HasUID {
/**
* @return the trigger identifier.
*/
@JsonProperty
@JsonDeserialize(as = TriggerId.Default.class)
TriggerId id();
/**
* @return the event timestamp.
*/
@JsonProperty
Instant timestamp();
/**
* The event unique identifier.
* <p>
* Can be used to de-duplicate events.
*
* @return the event identifier.
*/
@JsonProperty
EventId eventId();
/**
* @return the event type.
*/
@@ -54,7 +65,7 @@ public interface TriggerEvent extends HasUID {
default TriggerEventType type() {
return Enums.fromClassName(this, TriggerEventType.class);
}
/**
* {@inheritDoc}
*/
@@ -63,18 +74,20 @@ public interface TriggerEvent extends HasUID {
default String uid() {
return this.id().uid();
}
record Invalid(TriggerId id,
Instant timestamp,
EventId eventId,
Map<String, Object> properties
) implements TriggerEvent {
@JsonCreator
public Invalid(@JsonProperty("id") TriggerId id,
@JsonProperty("timestamp") Instant timestamp) {
this(id, timestamp, new HashMap<>());
@JsonProperty("timestamp") Instant timestamp,
@JsonProperty("eventId") EventId eventId) {
this(id, timestamp, eventId, new HashMap<>());
}
@JsonAnySetter
public void addProperty(String key, Object value) {
this.properties.put(key, value);

View File

@@ -1,5 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.TriggerId;
@@ -12,10 +13,11 @@ public record TriggerExecutionTerminated(
TriggerId id,
String executionId,
State.Type executionState,
Instant timestamp
Instant timestamp,
EventId eventId
) implements TriggerEvent {
public TriggerExecutionTerminated(TriggerId id, String executionId, State.Type executionState){
this(id, executionId, executionState, Instant.now());
this(id, executionId, executionState, Instant.now(), EventId.create());
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.triggers.TriggerId;
import java.time.Instant;
@@ -9,7 +10,12 @@ import java.time.Instant;
*/
public record TriggerReceived(
TriggerId id,
String workerId,
Instant timestamp,
String workerId
EventId eventId
) implements TriggerEvent{
public TriggerReceived(TriggerId id, String workerId) {
this(id, workerId, Instant.now(), EventId.create());
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.scheduler.events;
import io.kestra.core.events.EventId;
import io.kestra.core.models.triggers.TriggerId;
import java.time.Instant;
@@ -10,9 +11,10 @@ import java.time.Instant;
public record TriggerUpdated(
TriggerId id,
int revision,
Instant timestamp
Instant timestamp,
EventId eventId
) implements TriggerEvent {
public TriggerUpdated(TriggerId id, int revision) {
this(id, revision, Instant.now());
this(id, revision, Instant.now(), EventId.create());
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.scheduler.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.events.EventId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.State;
@@ -41,6 +42,8 @@ public final class TriggerState implements TriggerId {
private final int vnode;
private final boolean locked;
private final String workerId;
// the last-event id that mutate this state.
private final EventId lastEventId;
@JsonProperty
public Long getNextEvaluationEpoch() {
@@ -90,6 +93,7 @@ public final class TriggerState implements TriggerId {
disabled,
vnode,
false,
null,
null
);
}
@@ -257,6 +261,16 @@ public final class TriggerState implements TriggerId {
.tenantId(tenantId)
.build();
}
/**
* Sets the tenant of this trigger state.
*
* @return a new {@link TriggerState}
*/
public TriggerState lastEventId(Clock clock, EventId eventId) {
return update(clock)
.lastEventId(eventId)
.build();
}
private Backfill getBackFillForNextEvaluationDate(final Instant nextEvaluationDate) {
final ZonedDateTime localNextEvaluationDate = toZonedDateTime(nextEvaluationDate);
@@ -289,7 +303,8 @@ public final class TriggerState implements TriggerId {
.locked(locked)
.workerId(workerId)
.vnode(vnode)
.disabled(disabled);
.disabled(disabled)
.lastEventId(lastEventId);
}
// Lombok hack to properly generate Javadoc

View File

@@ -25,7 +25,6 @@ import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import java.time.Clock;
import java.time.Instant;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -35,12 +34,12 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
*/
@Singleton
public class TriggerStateService {
private final TriggerRepositoryInterface triggerRepository;
private final FlowRepositoryInterface flowRepository;
private final TriggerEventQueue triggerEventQueue;
private final QueueInterface<ExecutionKilled> executionKilledQueue;
@Inject
public TriggerStateService(final TriggerRepositoryInterface triggerRepository,
final FlowRepositoryInterface flowRepository,
@@ -51,7 +50,7 @@ public class TriggerStateService {
this.executionKilledQueue = executionKilledQueue;
this.flowRepository = flowRepository;
}
/**
* Toggles all triggers matching the given filters.
*
@@ -72,7 +71,7 @@ public class TriggerStateService {
.blockOptional()
.orElse(0);
}
/**
* Toggles the trigger for given identifier.
*
@@ -82,21 +81,21 @@ public class TriggerStateService {
public void toggleTriggerById(TriggerId trigger, boolean disabled) throws NotFoundException, ConflictException {
Flow flow = this.flowRepository.findById(trigger.getTenantId(), trigger.getNamespace(), trigger.getFlowId())
.orElseThrow(() -> new NotFoundException("Flow not found for trigger: %s".formatted(trigger)));
AbstractTrigger abstractTrigger = flow.getTriggers().stream().filter(t -> t.getId().equals(trigger.getTriggerId())).findFirst()
.orElseThrow(() -> new NotFoundException("Trigger not found: %s".formatted(trigger)));
if (abstractTrigger instanceof RealtimeTriggerInterface) {
throw new ConflictException("Realtime triggers can not be updated through the API, please edit the trigger from the flow.");
}
triggerEventQueue.send(new SetDisableTrigger(TriggerId.of(trigger), Instant.now(), disabled));
triggerEventQueue.send(new SetDisableTrigger(TriggerId.of(trigger), disabled));
}
public int deleteAllBackfills(List<TriggerId> triggers) {
return backfillsAction(Flux.fromIterable(triggers), BackfillAction.DELETE);
}
/**
* Deletes all backfills for triggers matching the given filters.
*
@@ -107,28 +106,28 @@ public class TriggerStateService {
public int deleteBackfillMatching(String tenant, List<QueryFilter> filters) {
return backfillsAction(triggerRepository.find(tenant, filters), BackfillAction.DELETE);
}
/**
* Pauses all backfills for the given triggers.
*
* @param triggers the triggers.
* @param triggers the triggers.
* @return the number of backfill paused.
*/
public int pauseAllBackfillByIds(List<TriggerId> triggers) {
return backfillsAction(Flux.fromIterable(triggers), BackfillAction.PAUSE);
}
/**
* Resumes all backfills for the given triggers.
*
* @param triggers the triggers.
* @param triggers the triggers.
* @return the number of backfill resumed.
*/
public int resumeAllBackfillByIds(List<TriggerId> triggers) {
return backfillsAction(Flux.fromIterable(triggers), BackfillAction.RESUME);
}
/**
* Pauses all backfills for triggers matching the given filters.
*
@@ -139,7 +138,7 @@ public class TriggerStateService {
public int pauseAllBackfillMatching(String tenant, List<QueryFilter> filters) {
return backfillsAction(triggerRepository.find(tenant, filters), BackfillAction.PAUSE);
}
/**
* Resumes all backfills for triggers matching the given filters.
*
@@ -150,7 +149,7 @@ public class TriggerStateService {
public int resumeAllBackfillMatching(String tenant, List<QueryFilter> filters) {
return backfillsAction(triggerRepository.find(tenant, filters), BackfillAction.RESUME);
}
/**
* Create a trigger backfill for the given identifier.
*
@@ -161,7 +160,7 @@ public class TriggerStateService {
getTriggerState(triggerId); // check if state exists.
triggerEventQueue.send(new CreateBackfillTrigger(TriggerId.of(triggerId), backfill));
}
/**
* Deletes the trigger backfill for the given identifier.
*
@@ -170,9 +169,9 @@ public class TriggerStateService {
*/
public void deleteBackfill(TriggerId triggerId) {
getTriggerState(triggerId); // check if state exists.
triggerEventQueue.send(new DeleteBackfillTrigger(TriggerId.of(triggerId), Instant.now()));
triggerEventQueue.send(new DeleteBackfillTrigger(TriggerId.of(triggerId)));
}
/**
* Pauses the trigger backfill for the given identifier.
*
@@ -181,9 +180,9 @@ public class TriggerStateService {
*/
public void setBackfillPaused(TriggerId triggerId, boolean paused) {
getTriggerState(triggerId); // check if state exists.
triggerEventQueue.send(new SetPauseBackfillTrigger(TriggerId.of(triggerId), Instant.now(), paused));
triggerEventQueue.send(new SetPauseBackfillTrigger(TriggerId.of(triggerId), paused));
}
/**
* Resets a trigger for the given identifier.
*
@@ -200,9 +199,9 @@ public class TriggerStateService {
.triggerId(triggerId.getTriggerId())
.build()
);
triggerEventQueue.send(new ResetTrigger(TriggerId.of(triggerId), Instant.now()));
triggerEventQueue.send(new ResetTrigger(TriggerId.of(triggerId)));
}
/**
* Unlocks a trigger.
*
@@ -215,13 +214,13 @@ public class TriggerStateService {
return triggerRepository.findById(trigger)
.map(state -> {
if (state.isLocked()) {
triggerEventQueue.send(new ResetTrigger(TriggerId.of(trigger), Instant.now()));
triggerEventQueue.send(new ResetTrigger(TriggerId.of(trigger)));
return state.locked(Clock.systemDefaultZone(), false);
}
throw new ConflictException("trigger %s is already unlocked".formatted(trigger));
}).orElseThrow(() -> new NotFoundException("Trigger %s not found".formatted(trigger)));
}
/**
* Unlocks multiple triggers.
*
@@ -240,11 +239,11 @@ public class TriggerStateService {
* @return the number of triggers successfully unlocked.
* @throws NotFoundException if a trigger can't be found.
*/
public int unlockAllTriggersMatching(String tenant, List<QueryFilter> filters) throws NotFoundException{
public int unlockAllTriggersMatching(String tenant, List<QueryFilter> filters) throws NotFoundException {
Flux<TriggerId> flux = triggerRepository.find(tenant, filters).filter(TriggerState::isLocked).map(TriggerId::of);
return unlockAllTriggersByIds(flux);
}
private int unlockAllTriggersByIds(final Flux<TriggerId> triggers) {
return triggers.map(trigger -> {
try {
@@ -258,7 +257,7 @@ public class TriggerStateService {
.blockOptional()
.orElse(0);
}
private int backfillsAction(Flux<? extends TriggerId> triggers, BackfillAction action) {
return triggers.map(trigger -> {
try {
@@ -280,11 +279,11 @@ public class TriggerStateService {
}
}).reduce(Integer::sum).blockOptional().orElse(0);
}
private TriggerState getTriggerState(TriggerId triggerId) {
return triggerRepository.findById(triggerId).orElseThrow(() -> new NotFoundException("Trigger %s not found".formatted(triggerId)));
}
private enum BackfillAction {
PAUSE,
RESUME,

View File

@@ -0,0 +1,83 @@
package io.kestra.core.events;
import org.junit.jupiter.api.Test;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
class EventIdTest {
@Test
void shouldCreateNewEventIdGivenFactoryMethod() {
// Given / When
EventId id = EventId.create();
// Then
assertThat(id).isNotNull();
assertThat(id.value()).isNotNull();
assertThat(id.toString()).isEqualTo(id.value().toString());
}
@Test
void shouldFailGivenNullUuid() {
// When / Then
assertThatThrownBy(() -> new EventId(null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("cannot be null");
}
@Test
void shouldDeserializeGivenValidUuidString() {
// Given
String uuidStr = UUID.randomUUID().toString();
// When
EventId id = EventId.fromString(uuidStr);
// Then
assertThat(id.value().toString()).isEqualTo(uuidStr);
}
@Test
void shouldCompareCorrectlyGivenChronologicalOrdering() {
// Given two generated IDs (UUIDv7 ensures creation order is chronological)
EventId first = EventId.create();
EventId second = EventId.create();
// When / Then
assertThat(first.compareTo(second)).isLessThan(0);
assertThat(second.compareTo(first)).isGreaterThan(0);
assertThat(second.isNewerThan(first)).isTrue();
assertThat(first.isOlderThan(second)).isTrue();
}
@Test
void shouldBeEqualGivenSameUnderlyingUuid() {
// Given
UUID uuid = UUID.randomUUID();
EventId id1 = new EventId(uuid);
EventId id2 = new EventId(uuid);
// When / Then
assertThat(id1).isEqualTo(id2);
assertThat(id1.compareTo(id2)).isZero();
assertThat(id1.isNewerThan(id2)).isFalse();
assertThat(id1.isOlderThan(id2)).isFalse();
}
@Test
void shouldReturnStringRepresentationGivenToStringCall() {
// Given
UUID uuid = UUID.randomUUID();
EventId id = new EventId(uuid);
// When
String str = id.toString();
// Then
assertThat(str).isEqualTo(uuid.toString());
}
}

View File

@@ -20,7 +20,7 @@ class TriggerEventTest {
void shouldSerializeEvent() throws JsonProcessingException {
// Given
TriggerId id = new TriggerId.Default("tenant", "namespace", "flow", "trigger");
TriggerCreated event = new TriggerCreated(id, Instant.now(), 1);
TriggerCreated event = new TriggerCreated(id, 1);
// When - then
String serialized = JacksonMapper.ofJson().writeValueAsString(event);
@@ -29,6 +29,6 @@ class TriggerEventTest {
@Test
void shouldGetTriggerEventType() {
assertThat(Enums.fromClassName(new ResetTrigger(null, null), TriggerEventType.class)).isEqualTo(TriggerEventType.RESET_TRIGGER);
assertThat(Enums.fromClassName(new ResetTrigger(null), TriggerEventType.class)).isEqualTo(TriggerEventType.RESET_TRIGGER);
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.scheduler;
import io.kestra.core.events.EventId;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
@@ -78,7 +79,7 @@ public class TriggerEventHandler {
LOG.debug("Received event {} for {} at {}", event.type(), event.id(), event.timestamp());
switch (event) {
// Events
case TriggerCreated evt -> onTriggerCreated(evt, vNode);
case TriggerCreated evt -> onTriggerCreated(clock, evt, vNode);
case TriggerDeleted evt -> onTriggerDeleted(evt);
case TriggerUpdated evt -> onTriggerUpdated(clock, evt);
case TriggerExecutionTerminated evt -> onTriggerExecutionTerminated(clock, evt);
@@ -101,6 +102,7 @@ public class TriggerEventHandler {
void onCreateBackfill(Clock clock, CreateBackfillTrigger event) {
findTriggerState(event).ifPresent(state -> {
state = state
.lastEventId(clock, event.eventId())
.backfill(clock, Backfill
.builder()
.start(event.backfill().start())
@@ -138,7 +140,10 @@ public class TriggerEventHandler {
void onSetTriggerDisable(Clock clock, SetPauseBackfillTrigger event) {
findTriggerState(event).ifPresent(state -> {
if (state.getBackfill() != null) {
triggerStateStore.save(state.backfill(clock, state.getBackfill().toBuilder().paused(event.pause()).build()));
state = state
.lastEventId(clock, event.eventId())
.backfill(clock, state.getBackfill().toBuilder().paused(event.pause()).build());
triggerStateStore.save(state);
}
});
}
@@ -151,7 +156,10 @@ public class TriggerEventHandler {
*/
void onSetTriggerDisable(Clock clock, SetDisableTrigger event) {
findTriggerState(event).ifPresent(state -> {
triggerStateStore.save(state.disabled(clock, event.disabled()));
state = state
.lastEventId(clock, event.eventId())
.disabled(clock, event.disabled());
triggerStateStore.save(state);
});
}
@@ -163,6 +171,7 @@ public class TriggerEventHandler {
void onTriggerExecutionTerminated(Clock clock, TriggerExecutionTerminated event) {
findTriggerState(event).ifPresent(state -> {
triggerStateStore.save(state
.lastEventId(clock, event.eventId())
.locked(clock, false)
.updateForExecutionState(clock, event.executionState())
);
@@ -190,6 +199,7 @@ public class TriggerEventHandler {
newState.updateForExecution(clock, event.execution());
}
newState = state.lastEventId(clock, event.eventId());
triggerStateStore.save(newState);
if (event.execution() != null) {
@@ -206,7 +216,10 @@ public class TriggerEventHandler {
*/
void onTriggerReceived(Clock clock, TriggerReceived event) {
findTriggerState(event).ifPresent(state -> {
triggerStateStore.save(state.workerId(clock, event.workerId()));
state = state
.lastEventId(clock, event.eventId())
.workerId(clock, event.workerId());
triggerStateStore.save(state);
});
}
@@ -217,7 +230,10 @@ public class TriggerEventHandler {
*/
void onResetTrigger(Clock clock, ResetTrigger event) {
findTriggerState(event).ifPresent(state -> {
triggerStateStore.save(state.reset(clock));
state = state
.lastEventId(clock, event.eventId())
.reset(clock);
triggerStateStore.save(state);
});
}
@@ -230,7 +246,10 @@ public class TriggerEventHandler {
findTriggerState(event).ifPresent(state -> {
Pair<Flow, AbstractTrigger> data = findTrigger(event, event.revision());
if (data.getRight() != null) {
triggerStateStore.save(state.update(clock, data.getRight()));
state = state
.lastEventId(clock, event.eventId())
.update(clock, data.getRight());
triggerStateStore.save(state);
}
});
}
@@ -249,10 +268,13 @@ public class TriggerEventHandler {
*
* @param event the event.
*/
void onTriggerCreated(TriggerCreated event, Integer vNode) {
void onTriggerCreated(Clock clock, TriggerCreated event, Integer vNode) {
Pair<Flow, AbstractTrigger> data = findTrigger(event, event.revision());
if (data.getRight() != null) {
triggerStateStore.save(TriggerState.of(event.id(), data.getRight().getStopAfter(), data.getRight().isDisabled(), vNode));
TriggerState state = TriggerState
.of(event.id(), data.getRight().getStopAfter(), data.getRight().isDisabled(), vNode)
.lastEventId(clock, event.eventId());
triggerStateStore.save(state);
}
}
@@ -288,7 +310,18 @@ public class TriggerEventHandler {
Optional<TriggerState> state = triggerStateStore.find(event.id());
if (state.isEmpty()) {
Logs.logTrigger(event.id(), Level.WARN, "Cannot process event {}. Cause: Trigger state not found.", event.type());
return Optional.empty();
}
return state;
// Ensure event can't be process twice - most queuing systems provide at-least once semantic
TriggerState current = state.get();
EventId lastEventId = current.getLastEventId();
if (lastEventId == null || event.eventId().isNewerThan(lastEventId)) {
return state;
}
// Ignore because it's an older or duplicate event
Logs.logTrigger(event.id(), Level.WARN, "Skipping event {}. Cause: Event is older than last applied event.", event.type());
return Optional.empty();
}
}

View File

@@ -82,7 +82,7 @@ public class TriggerEventPublisher implements AutoCloseable {
FlowService.findRemovedTrigger(flow, previous);
triggersDeleted.forEach(trigger ->
sendEvent(new TriggerDeleted(TriggerId.of(flow, trigger), Instant.now()))
sendEvent(new TriggerDeleted(TriggerId.of(flow, trigger)))
);
}
@@ -91,7 +91,7 @@ public class TriggerEventPublisher implements AutoCloseable {
.stream()
.filter(trigger -> trigger instanceof WorkerTriggerInterface)
.forEach(trigger ->
sendEvent(new TriggerUpdated(TriggerId.of(flow, trigger), flow.getRevision(), Instant.now()))
sendEvent(new TriggerUpdated(TriggerId.of(flow, trigger), flow.getRevision()))
);
return;
}
@@ -101,7 +101,7 @@ public class TriggerEventPublisher implements AutoCloseable {
.stream()
.filter(trigger -> trigger instanceof WorkerTriggerInterface)
.forEach(trigger ->
sendEvent(new TriggerCreated(TriggerId.of(flow, trigger), Instant.now(), flow.getRevision()))
sendEvent(new TriggerCreated(TriggerId.of(flow, trigger), flow.getRevision()))
);
}
}));

View File

@@ -28,6 +28,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Clock;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
@@ -77,7 +78,7 @@ class TriggerEventHandlerTest {
void shouldCreateTriggerGivenTriggerCreatedEventWhenFlowDoesExist() {
// GIVEN
handler = newTriggerEventHandler(List.of(Fixtures.defaultFlow()));
TriggerCreated event = new TriggerCreated(triggerId, 1);
TriggerCreated event = new TriggerCreated(triggerId, 1);
// WHEN
handler.handle(CLOCK, TEST_VNODE, event);
@@ -86,6 +87,7 @@ class TriggerEventHandlerTest {
Optional<TriggerState> saved = triggerStateStore.find(triggerId);
assertThat(saved).isPresent();
assertThat(TriggerId.of(saved.get())).isEqualTo(triggerId);
assertThat(saved.get().getLastEventId()).isNotNull();
}
@Test
@@ -133,7 +135,8 @@ class TriggerEventHandlerTest {
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().isDisabled()).isTrue();
assertThat(updated.get().getUpdatedAt()).isNotEqualTo(triggerState.getUpdatedAt());
assertThat(updated.get().getUpdatedAt()).isAfter(triggerState.getUpdatedAt());
assertThat(updated.get().getLastEventId()).isEqualTo(event.eventId());
}
@Test
@@ -150,7 +153,8 @@ class TriggerEventHandlerTest {
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().isLocked()).isFalse();
assertThat(updated.get().getUpdatedAt()).isNotEqualTo(triggerState.getUpdatedAt());
assertThat(updated.get().getUpdatedAt()).isAfter(triggerState.getUpdatedAt());
assertThat(updated.get().getLastEventId()).isEqualTo(event.eventId());
}
@Test
@@ -167,7 +171,8 @@ class TriggerEventHandlerTest {
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().isDisabled()).isTrue();
assertThat(updated.get().getUpdatedAt()).isNotEqualTo(triggerState.getUpdatedAt());
assertThat(updated.get().getUpdatedAt()).isAfter(triggerState.getUpdatedAt());
assertThat(updated.get().getLastEventId()).isEqualTo(event.eventId());
}
@Test
@@ -186,10 +191,12 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
assertThat(triggerStateStore.find(triggerId))
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
assertThat(updated)
.get()
.extracting(t -> t.getBackfill().getPaused())
.isEqualTo(true);
assertThat(updated.get().getLastEventId()).isEqualTo(event.eventId());
}
@Test
@@ -208,10 +215,12 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
assertThat(triggerStateStore.find(triggerId))
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
assertThat(updated)
.get()
.extracting(t -> t.getBackfill().getPaused())
.isEqualTo(false);
assertThat(updated.get().getLastEventId()).isEqualTo(event.eventId());
}
@Test
@@ -225,7 +234,9 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
assertThat(triggerStateStore.find(triggerId)).isPresent();
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().getLastEventId()).isEqualTo(event.eventId());
}
@Test
@@ -260,6 +271,7 @@ class TriggerEventHandlerTest {
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().getBackfill()).isNotNull();
assertThat(updated.get().getLastEventId()).isEqualTo(event.eventId());
}
@Test
@@ -276,4 +288,52 @@ class TriggerEventHandlerTest {
// no exception expected, handled gracefully
assertThat(triggerStateStore.find(triggerId)).isPresent();
}
@Test
void shouldNotUpdateGivenTriggerEventTwice() {
// GIVEN
triggerStateStore.save(triggerState);
handler = newTriggerEventHandler(List.of(Fixtures.defaultFlow()));
TriggerUpdated event = new TriggerUpdated(triggerId, Fixtures.defaultFlow().getRevision());
// WHEN (first)
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
TriggerState updated;
updated = triggerStateStore.find(triggerId).orElseThrow();
assertThat(updated.getLastEventId()).isEqualTo(event.eventId());
Instant updatedAt = updated.getUpdatedAt();
assertThat(updatedAt).isAfter(triggerState.getUpdatedAt());
// WHEN (second)
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
updated = triggerStateStore.find(triggerId).orElseThrow();
assertThat(updated.getLastEventId()).isEqualTo(event.eventId());
assertThat(updated.getUpdatedAt()).isEqualTo(updatedAt); // not updated
}
@Test
void shouldNotUpdateGivenOldTriggerEvent() {
// GIVEN
TriggerUpdated event1 = new TriggerUpdated(triggerId, Fixtures.defaultFlow().getRevision());
TriggerUpdated event2 = new TriggerUpdated(triggerId, Fixtures.defaultFlow().getRevision());
TriggerState state = triggerState.lastEventId(CLOCK, event2.eventId());
triggerStateStore.save(state);
handler = newTriggerEventHandler(List.of(Fixtures.defaultFlow()));
// WHEN (first)
handler.handle(CLOCK, TEST_VNODE, event1);
// THEN
TriggerState updated;
updated = triggerStateStore.find(triggerId).orElseThrow();
assertThat(updated.getLastEventId()).isEqualTo(event2.eventId());
assertThat(updated.getUpdatedAt()).isEqualTo(state.getUpdatedAt()); // not updated
}
}

View File

@@ -566,7 +566,7 @@ public class DefaultWorker implements Worker {
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
.increment();
triggerEventQueue.send(new TriggerReceived(TriggerId.of(workerTrigger.getTriggerContext()), Instant.now(), getId()));
triggerEventQueue.send(new TriggerReceived(TriggerId.of(workerTrigger.getTriggerContext()), getId()));
this.metricRegistry
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))