fix(scheduler): remove mutation methods from TriggerRepositoryInterface

Make sure any change made on a trigger state is performed by
the scheduler.

Move the TriggerStateStore interface to core, and remove default
implementation to use the repository implementation
This commit is contained in:
Florian Hussonnois
2025-12-18 18:44:41 +01:00
parent ea36532aa4
commit 4631868298
19 changed files with 313 additions and 321 deletions

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.scheduler.SchedulerConfiguration;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.core.scheduler.store.TriggerStateStore;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import picocli.CommandLine;
@@ -37,6 +38,7 @@ public class V2TriggerMigrationCommand extends AbstractCommand {
Log.info("🔁 Starting trigger states migration...");
TriggerRepositoryInterface repository = applicationContext.getBean(TriggerRepositoryInterface.class);
TriggerStateStore store = applicationContext.getBean(TriggerStateStore.class);
SchedulerConfiguration configuration = applicationContext.getBean(SchedulerConfiguration.class);
List<Trigger> triggers = repository.findAllForAllTenantsV1();
Log.info("Found [{}] triggers to migrate.");
@@ -44,7 +46,7 @@ public class V2TriggerMigrationCommand extends AbstractCommand {
try {
TriggerState migrated = trigger.toTriggerState(configuration.vnodes());
if (!dryRun) {
repository.save(migrated);
store.save(migrated);
}
System.out.println("✅ Migration complete for: " + TriggerId.of(trigger));
} catch (Exception e) {

View File

@@ -3,33 +3,60 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.plugin.core.dashboard.data.Triggers;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.plugin.core.dashboard.data.Triggers;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import reactor.core.publisher.Flux;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
/**
* Repository interface for searching for trigger states.
*/
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
/**
* Finds the trigger state for the given identifier.
*
* @param trigger the identifier.
* @return an optional {@link TriggerState}.
*/
Optional<TriggerState> findById(TriggerId trigger);
Optional<TriggerState> findByUid(String uid);
/**
* Finds all trigger states for the given tenant id
*
* @param tenantId the tenant identifier - cannot be {@code null}
* @return the list of trigger states.
*/
List<TriggerState> findAll(String tenantId);
/**
* Finds all trigger states across all tenants.
*
* @return the list of trigger states.
*/
List<TriggerState> findAllForAllTenants();
TriggerState save(TriggerState trigger);
void delete(TriggerState trigger);
/**
* Searches for all trigger states matching the given criterion.
*
* @param from the pageable.
* @param tenantId the tenant identifier - cannot be {@code null}
* @return the list of matching trigger states.
*/
ArrayListTotal<TriggerState> find(Pageable from, String query, String tenantId, String namespace, String flowId, String workerId);
/**
* Searches for all trigger states matching the given tenant and filters.
*
* @param from the pageable.
* @param tenantId the tenant identifier - cannot be {@code null}
* @param filters the query filters.
* @return the list of matching trigger states.
*/
ArrayListTotal<TriggerState> find(Pageable from, String tenantId, List<QueryFilter> filters);
/**
@@ -50,22 +77,6 @@ public interface TriggerRepositoryInterface extends QueryBuilderInterface<Trigge
return Function.identity();
}
/**
* Finds all {@link TriggerState} instances that are eligible to be scheduled as of the specified timestamp.
*
* @param now
* the current timestamp used to evaluate scheduling eligibility;
* triggers with a next execution time less than or equal to this
* value are considered eligible
* @param vNodes
* the set of virtual node identifiers used to restrict the search scope;
* @param locked
* if {@code true}, only locked triggers are returned;
* if {@code false}, only unlocked triggers are returned
* @return a list of triggers that are eligible for scheduling at the given time
*/
List<TriggerState> findTriggersEligibleForScheduling(ZonedDateTime now, Set<Integer> vNodes, boolean locked);
/**
* FOR KESTRA 2.0 MIGRATION
*/

View File

@@ -1,26 +1,30 @@
package io.kestra.scheduler.stores;
package io.kestra.core.scheduler.store;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.scheduler.model.TriggerState;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* Service interface providing read-write access to trigger states.
* <p>
* This service is attended to be used only the {@link io.kestra.core.runners.Scheduler}.
*
* @see TriggerState
*/
public interface TriggerStateStore {
/**
* Finds all {@link TriggerState} instances that are eligible to be scheduled as of the specified timestamp.
*
* @param now
* the current timestamp used to evaluate scheduling eligibility;
* @param now the current timestamp used to evaluate scheduling eligibility;
* triggers with a next execution time less than or equal to this
* value are considered eligible
* @param vNodes
* the set of virtual node identifiers used to restrict the search scope;
* @param locked
* if {@code true}, only locked triggers are returned;
* @param vNodes the set of virtual node identifiers used to restrict the search scope;
* @param locked if {@code true}, only locked triggers are returned;
* if {@code false}, only unlocked triggers are returned
* @return a list of triggers that are eligible for scheduling at the given time
*/
@@ -40,14 +44,14 @@ public interface TriggerStateStore {
* @param triggerId The trigger identifier.
* @return an optional {@link TriggerState}
*/
Optional<TriggerState> find(TriggerId triggerId);
Optional<TriggerState> findById(TriggerId triggerId);
/**
* Saves the given {@link TriggerState}.
*
* @param triggerState The trigger state.
*/
void save(TriggerState triggerState);
TriggerState save(TriggerState triggerState);
/**
* Deletes the state for the specified identifier.

View File

@@ -19,6 +19,7 @@ import io.kestra.core.scheduler.events.DeleteBackfillTrigger;
import io.kestra.core.scheduler.events.ResetTrigger;
import io.kestra.core.scheduler.events.SetDisableTrigger;
import io.kestra.core.scheduler.events.SetPauseBackfillTrigger;
import io.kestra.core.scheduler.events.TriggerDeleted;
import io.kestra.core.scheduler.model.TriggerState;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -51,6 +52,51 @@ public class TriggerStateService {
this.flowRepository = flowRepository;
}
/**
* Deletes the trigger for the identifier.
*
* @param trigger the trigger identifier.
* @throws NotFoundException if trigger can be found.
*/
public void deleteById(TriggerId trigger) throws NotFoundException {
getTriggerState(trigger); // check if state exists.
triggerEventQueue.send(new TriggerDeleted(trigger));
}
/**
* Deletes all triggers for the given identifiers.
*
* @param triggers the trigger identifiers.
* @return the number of deleted triggers.
* @throws NotFoundException if trigger can be found.
*/
public int deleteByIdyIds(List<TriggerId> triggers) throws NotFoundException {
return triggers.stream()
.map(trigger -> {
deleteById(trigger);
return 1;
}).reduce(Integer::sum).orElse(0);
}
/**
* Deletes all triggers matching the given filters.
*
* @param tenant the tenant identifier.
* @param filters the filters to match triggers.
* @return the number of deleted triggers.
* @throws NotFoundException if trigger can be found.
*/
public int deleteAllTriggersMatching(String tenant, List<QueryFilter> filters) throws NotFoundException {
return triggerRepository.find(tenant, filters).map(throwFunction(trigger -> {
deleteById(trigger);
return 1;
}))
.reduce(Integer::sum)
.blockOptional()
.orElse(0);
}
/**
* Toggles all triggers matching the given filters.
*

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.QueryFilter.Op;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.scheduler.store.TriggerStateStore;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.core.scheduler.model.TriggerState;
@@ -43,7 +44,7 @@ public abstract class AbstractTriggerRepositoryTest {
private static final Pageable TEST_DEFAULT_PAGED = Pageable.from(1, 100, Sort.of(Sort.Order.asc("namespace")));
@Inject
protected TriggerRepositoryInterface triggerRepository;
protected TriggerStateStore triggerStateStore;
private static TriggerState.TriggerStateBuilder trigger(String tenantId) {
return TriggerState.builder()
@@ -70,9 +71,9 @@ public abstract class AbstractTriggerRepositoryTest {
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(generateDefaultTrigger(tenant));
triggerStateStore.save(generateDefaultTrigger(tenant));
ArrayListTotal<TriggerState> entries = triggerRepository.find(Pageable.UNPAGED, tenant, List.of(filter));
ArrayListTotal<TriggerState> entries = triggerStateStore.find(Pageable.UNPAGED, tenant, List.of(filter));
assertThat(entries).hasSize(1);
}
@@ -81,9 +82,9 @@ public abstract class AbstractTriggerRepositoryTest {
@MethodSource("filterCombinations")
void should_find_all_async(QueryFilter filter){
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(generateDefaultTrigger(tenant));
triggerStateStore.save(generateDefaultTrigger(tenant));
List<TriggerState> entries = triggerRepository.find(tenant, List.of(filter)).collectList().block();
List<TriggerState> entries = triggerStateStore.find(tenant, List.of(filter)).collectList().block();
assertThat(entries).withFailMessage(filter.toString()).hasSize(1);
}
@@ -104,7 +105,7 @@ public abstract class AbstractTriggerRepositoryTest {
@ParameterizedTest
@MethodSource("errorFilterCombinations")
void should_fail_to_find_all(QueryFilter filter){
assertThrows(InvalidQueryFiltersException.class, () -> triggerRepository.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
assertThrows(InvalidQueryFiltersException.class, () -> triggerStateStore.find(Pageable.UNPAGED, TestsUtils.randomTenant(this.getClass().getSimpleName()), List.of(filter)));
}
static Stream<QueryFilter> errorFilterCombinations() {
@@ -127,7 +128,7 @@ public abstract class AbstractTriggerRepositoryTest {
TriggerState.TriggerStateBuilder builder = trigger(tenant);
// WHEN
Optional<TriggerState> result = triggerRepository.findById(builder.build());
Optional<TriggerState> result = triggerStateStore.findById(builder.build());
// THEN
assertThat(result).isEmpty();
@@ -138,10 +139,10 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
TriggerState state = trigger(tenant).build();
triggerRepository.save(state);
triggerStateStore.save(state);
// WHEN
Optional<TriggerState> result = triggerRepository.findById(state);
Optional<TriggerState> result = triggerStateStore.findById(state);
// THEN
assertThat(result).isNotEmpty();
@@ -154,15 +155,15 @@ public abstract class AbstractTriggerRepositoryTest {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
TriggerState.TriggerStateBuilder builder = trigger(tenant);
triggerRepository.save(builder.build());
triggerStateStore.save(builder.build());
// WHEN
Instant updatedAt = Instant.now().truncatedTo(ChronoUnit.MILLIS);
TriggerState updated = builder.updatedAt(updatedAt).build();
triggerRepository.save(updated);
triggerStateStore.save(updated);
// THEN
Optional<TriggerState> result = triggerRepository.findById(updated);
Optional<TriggerState> result = triggerStateStore.findById(updated);
assertThat(result).isNotEmpty();
assertThat(TriggerId.of(result.get())).isEqualTo(TriggerId.of(updated));
assertThat(result.get().getUpdatedAt()).isEqualTo(updatedAt);
@@ -175,12 +176,12 @@ public abstract class AbstractTriggerRepositoryTest {
String tenant2 = TestsUtils.randomTenant(this.getClass().getSimpleName());
String tenant3 = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant1).build());
triggerRepository.save(trigger(tenant2).build());
triggerRepository.save(trigger(tenant3).build());
triggerStateStore.save(trigger(tenant1).build());
triggerStateStore.save(trigger(tenant2).build());
triggerStateStore.save(trigger(tenant3).build());
// WHEN
List<TriggerState> all = triggerRepository.findAllForAllTenants()
List<TriggerState> all = triggerStateStore.findAllForAllTenants()
.stream().filter(it -> Set.of(tenant1, tenant2, tenant3).contains(it.getTenantId())).toList();
// THEN
@@ -194,12 +195,12 @@ public abstract class AbstractTriggerRepositoryTest {
String tenant2 = TestsUtils.randomTenant(this.getClass().getSimpleName());
String tenant3 = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant1).build());
triggerRepository.save(trigger(tenant2).build());
triggerRepository.save(trigger(tenant3).build());
triggerStateStore.save(trigger(tenant1).build());
triggerStateStore.save(trigger(tenant2).build());
triggerStateStore.save(trigger(tenant3).build());
// WHEN
List<TriggerState> all = triggerRepository.findAll(tenant1)
List<TriggerState> all = triggerStateStore.findAll(tenant1)
.stream().filter(it -> Set.of(tenant1, tenant2, tenant3).contains(it.getTenantId())).toList();
// THEN
@@ -211,13 +212,13 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant).triggerId("trigger1").build());
triggerRepository.save(trigger(tenant).triggerId("trigger2").build());
triggerRepository.save(trigger(tenant).triggerId("trigger3").build());
triggerRepository.save(trigger(tenant).triggerId("trigger4").build());
triggerStateStore.save(trigger(tenant).triggerId("trigger1").build());
triggerStateStore.save(trigger(tenant).triggerId("trigger2").build());
triggerStateStore.save(trigger(tenant).triggerId("trigger3").build());
triggerStateStore.save(trigger(tenant).triggerId("trigger4").build());
// WHEN
List<TriggerState> find = triggerRepository.find(TEST_DEFAULT_PAGED, null, tenant, null, null, null);
List<TriggerState> find = triggerStateStore.find(TEST_DEFAULT_PAGED, null, tenant, null, null, null);
// THEN
assertThat(find.size()).isEqualTo(4);
@@ -228,11 +229,11 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant).namespace("io.kestra.unittest.1").flowId("my-flow1").triggerId("trigger1").build());
triggerRepository.save(trigger(tenant).namespace("io.kestra.unittest.2").flowId("my-flow1").triggerId("trigger1").build());
triggerStateStore.save(trigger(tenant).namespace("io.kestra.unittest.1").flowId("my-flow1").triggerId("trigger1").build());
triggerStateStore.save(trigger(tenant).namespace("io.kestra.unittest.2").flowId("my-flow1").triggerId("trigger1").build());
// WHEN
List<TriggerState> find = triggerRepository.find(TEST_DEFAULT_PAGED, null, tenant, null, "my-flow1", null);
List<TriggerState> find = triggerStateStore.find(TEST_DEFAULT_PAGED, null, tenant, null, "my-flow1", null);
// THEN
assertThat(find.size()).isEqualTo(2);
@@ -243,11 +244,11 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant).namespace("io.kestra.unittest.1").flowId("my-flow1").triggerId("trigger1").build());
triggerRepository.save(trigger(tenant).namespace("io.kestra.unittest.2").flowId("my-flow1").triggerId("trigger1").build());
triggerStateStore.save(trigger(tenant).namespace("io.kestra.unittest.1").flowId("my-flow1").triggerId("trigger1").build());
triggerStateStore.save(trigger(tenant).namespace("io.kestra.unittest.2").flowId("my-flow1").triggerId("trigger1").build());
// WHEN
List<TriggerState> find = triggerRepository.find(TEST_DEFAULT_PAGED, null, tenant, "io.kestra.unittest.1", null, null);
List<TriggerState> find = triggerStateStore.find(TEST_DEFAULT_PAGED, null, tenant, "io.kestra.unittest.1", null, null);
// THEN
assertThat(find.size()).isEqualTo(1);
@@ -259,12 +260,12 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant).namespace("io.kestra.unittest.1").flowId(ANY_VALUE).triggerId(ANY_VALUE).build());
triggerRepository.save(trigger(tenant).namespace("io.kestra.unittest.2").flowId(ANY_VALUE).triggerId(ANY_VALUE).build());
triggerRepository.save(trigger(tenant).namespace(ANY_VALUE).flowId(ANY_VALUE).triggerId(ANY_VALUE).build());
triggerStateStore.save(trigger(tenant).namespace("io.kestra.unittest.1").flowId(ANY_VALUE).triggerId(ANY_VALUE).build());
triggerStateStore.save(trigger(tenant).namespace("io.kestra.unittest.2").flowId(ANY_VALUE).triggerId(ANY_VALUE).build());
triggerStateStore.save(trigger(tenant).namespace(ANY_VALUE).flowId(ANY_VALUE).triggerId(ANY_VALUE).build());
// WHEN
List<TriggerState> find = triggerRepository.find(TEST_DEFAULT_PAGED, null, tenant, "io.kestra", null, null);
List<TriggerState> find = triggerStateStore.find(TEST_DEFAULT_PAGED, null, tenant, "io.kestra", null, null);
// THEN
assertThat(find.size()).isEqualTo(2);
@@ -277,12 +278,12 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant).namespace(ANY_VALUE).flowId(ANY_VALUE).workerId("worker1").build());
triggerRepository.save(trigger(tenant).namespace(ANY_VALUE).flowId(ANY_VALUE).workerId("worker2").build());
triggerRepository.save(trigger(tenant).namespace(ANY_VALUE).flowId(ANY_VALUE).triggerId(ANY_VALUE).build());
triggerStateStore.save(trigger(tenant).namespace(ANY_VALUE).flowId(ANY_VALUE).workerId("worker1").build());
triggerStateStore.save(trigger(tenant).namespace(ANY_VALUE).flowId(ANY_VALUE).workerId("worker2").build());
triggerStateStore.save(trigger(tenant).namespace(ANY_VALUE).flowId(ANY_VALUE).triggerId(ANY_VALUE).build());
// WHEN
List<TriggerState> find = triggerRepository.find(TEST_DEFAULT_PAGED, null, tenant, null, null, "worker1");
List<TriggerState> find = triggerStateStore.find(TEST_DEFAULT_PAGED, null, tenant, null, null, "worker1");
// THEN
assertThat(find.size()).isEqualTo(1);
@@ -295,11 +296,11 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant).namespace("io.kestra.unittest1").flowId("myflow1").triggerId("mytrigger1").build());
triggerRepository.save(trigger(tenant).namespace("io.kestra.unittest2").flowId("myflow2").triggerId("mytrigger2").build());
triggerStateStore.save(trigger(tenant).namespace("io.kestra.unittest1").flowId("myflow1").triggerId("mytrigger1").build());
triggerStateStore.save(trigger(tenant).namespace("io.kestra.unittest2").flowId("myflow2").triggerId("mytrigger2").build());
// WHEN
List<TriggerState> find = triggerRepository.find(TEST_DEFAULT_PAGED, query, tenant, null, null, null);
List<TriggerState> find = triggerStateStore.find(TEST_DEFAULT_PAGED, query, tenant, null, null, null);
// THEN
assertThat(find.size()).isEqualTo(1);
@@ -309,7 +310,7 @@ public abstract class AbstractTriggerRepositoryTest {
void shouldCountForNullTenant() {
// Given
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(TriggerState
triggerStateStore.save(TriggerState
.builder()
.tenantId(tenant)
.triggerId(IdUtils.create())
@@ -318,7 +319,7 @@ public abstract class AbstractTriggerRepositoryTest {
.build()
);
// When
long count = triggerRepository.countAll(tenant);
long count = triggerStateStore.countAll(tenant);
// Then
assertThat(count).isEqualTo(1);
}
@@ -330,11 +331,11 @@ public abstract class AbstractTriggerRepositoryTest {
TriggerState.TriggerStateBuilder builderA = trigger(tenant).flowId("flowA").triggerId("tA");
TriggerState.TriggerStateBuilder builderB = trigger(tenant).flowId("flowB").triggerId("tB");
TriggerState savedA = triggerRepository.save(builderA.build());
TriggerState savedB = triggerRepository.save(builderB.build());
TriggerState savedA = triggerStateStore.save(builderA.build());
TriggerState savedB = triggerStateStore.save(builderB.build());
try {
List<TriggerState> all = triggerRepository.find(tenant, null).collectList().block();
List<TriggerState> all = triggerStateStore.find(tenant, null).collectList().block();
assertThat(all).isNotNull();
assertThat(all.stream().map(TriggerState::getTriggerId).toList())
.containsExactlyInAnyOrder(savedA.getTriggerId(), savedB.getTriggerId());
@@ -345,12 +346,12 @@ public abstract class AbstractTriggerRepositoryTest {
.value("flowA")
.build());
List<TriggerState> filtered = triggerRepository.find(tenant, filters).collectList().block();
List<TriggerState> filtered = triggerStateStore.find(tenant, filters).collectList().block();
assertThat(filtered).hasSize(1);
assertThat(filtered.get(0).getFlowId()).isEqualTo("flowA");
} finally {
triggerRepository.delete(savedA);
triggerRepository.delete(savedB);
triggerStateStore.delete(savedA);
triggerStateStore.delete(savedB);
}
}
@@ -359,11 +360,11 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant1 = TestsUtils.randomTenant(this.getClass().getSimpleName());
String tenant2 = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant1).triggerId("A").locked(false).vnode(0).nextEvaluationDate(null).build());
triggerRepository.save(trigger(tenant1).triggerId("B").locked(false).vnode(1).nextEvaluationDate(null).build());
triggerRepository.save(trigger(tenant2).triggerId("C").locked(false).vnode(2).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant1).triggerId("A").locked(false).vnode(0).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant1).triggerId("B").locked(false).vnode(1).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant2).triggerId("C").locked(false).vnode(2).nextEvaluationDate(null).build());
// WHEN
List<TriggerState> results = triggerRepository.findTriggersEligibleForScheduling(ZonedDateTime.now(), Set.of(0, 1), false)
List<TriggerState> results = triggerStateStore.findTriggersEligibleForScheduling(ZonedDateTime.now(), Set.of(0, 1), false)
.stream().filter(it -> Set.of(tenant1, tenant2).contains(it.getTenantId())).toList();
// THEN
@@ -376,11 +377,11 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant1 = TestsUtils.randomTenant(this.getClass().getSimpleName());
String tenant2 = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant1).triggerId("A").locked(false).vnode(0).nextEvaluationDate(null).build());
triggerRepository.save(trigger(tenant2).triggerId("B").locked(false).vnode(1).nextEvaluationDate(null).build());
triggerRepository.save(trigger(tenant2).triggerId("C").locked(false).vnode(2).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant1).triggerId("A").locked(false).vnode(0).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant2).triggerId("B").locked(false).vnode(1).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant2).triggerId("C").locked(false).vnode(2).nextEvaluationDate(null).build());
// WHEN
List<TriggerState> results = triggerRepository.findTriggersEligibleForScheduling(ZonedDateTime.now(), Set.of(3), false)
List<TriggerState> results = triggerStateStore.findTriggersEligibleForScheduling(ZonedDateTime.now(), Set.of(3), false)
.stream().filter(it -> Set.of(tenant1, tenant2).contains(it.getTenantId())).toList();
// THEN
@@ -392,11 +393,11 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant1 = TestsUtils.randomTenant(this.getClass().getSimpleName());
String tenant2 = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant1).triggerId("A").locked(false).vnode(0).nextEvaluationDate(null).build());
triggerRepository.save(trigger(tenant1).triggerId("B").locked(true).vnode(1).nextEvaluationDate(null).build());
triggerRepository.save(trigger(tenant2).triggerId("C").locked(false).vnode(2).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant1).triggerId("A").locked(false).vnode(0).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant1).triggerId("B").locked(true).vnode(1).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant2).triggerId("C").locked(false).vnode(2).nextEvaluationDate(null).build());
// WHEN
List<TriggerState> results = triggerRepository.findTriggersEligibleForScheduling(ZonedDateTime.now(), Set.of(1), true)
List<TriggerState> results = triggerStateStore.findTriggersEligibleForScheduling(ZonedDateTime.now(), Set.of(1), true)
.stream().filter(it -> Set.of(tenant1, tenant2).contains(it.getTenantId())).toList();
// THEN
@@ -410,13 +411,13 @@ public abstract class AbstractTriggerRepositoryTest {
// GIVEN
String tenant1 = TestsUtils.randomTenant(this.getClass().getSimpleName());
String tenant2 = TestsUtils.randomTenant(this.getClass().getSimpleName());
triggerRepository.save(trigger(tenant1).triggerId("A").locked(false).vnode(0).nextEvaluationDate(now).build());
triggerRepository.save(trigger(tenant1).triggerId("B").locked(false).vnode(1).nextEvaluationDate(now.plus(Duration.ofMinutes(5))).build());
triggerRepository.save(trigger(tenant2).triggerId("C").locked(false).vnode(2).nextEvaluationDate(now.minus(Duration.ofMinutes(5))).build());
triggerRepository.save(trigger(tenant2).triggerId("D").locked(false).vnode(3).nextEvaluationDate(null).build());
triggerStateStore.save(trigger(tenant1).triggerId("A").locked(false).vnode(0).nextEvaluationDate(now).build());
triggerStateStore.save(trigger(tenant1).triggerId("B").locked(false).vnode(1).nextEvaluationDate(now.plus(Duration.ofMinutes(5))).build());
triggerStateStore.save(trigger(tenant2).triggerId("C").locked(false).vnode(2).nextEvaluationDate(now.minus(Duration.ofMinutes(5))).build());
triggerStateStore.save(trigger(tenant2).triggerId("D").locked(false).vnode(3).nextEvaluationDate(null).build());
// WHEN
List<TriggerState> results = triggerRepository.findTriggersEligibleForScheduling(now.atZone(ZoneId.systemDefault()), Set.of(0, 1, 2, 3), false)
List<TriggerState> results = triggerStateStore.findTriggersEligibleForScheduling(now.atZone(ZoneId.systemDefault()), Set.of(0, 1, 2, 3), false)
.stream().filter(it -> Set.of(tenant1, tenant2).contains(it.getTenantId())).toList();
// THEN

View File

@@ -14,6 +14,8 @@ import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.QueueIndexerRepository;
import io.kestra.core.runners.TransactionContext;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.core.scheduler.store.TriggerStateStore;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.jdbc.JdbcMapper;
@@ -21,11 +23,9 @@ import io.kestra.jdbc.runner.JdbcTransactionContext;
import io.kestra.jdbc.services.JdbcFilterService;
import io.kestra.plugin.core.dashboard.data.ITriggers;
import io.kestra.plugin.core.dashboard.data.Triggers;
import io.kestra.core.scheduler.model.TriggerState;
import io.micronaut.data.model.Pageable;
import lombok.Getter;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import reactor.core.publisher.Flux;
@@ -35,7 +35,7 @@ import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepository<TriggerState> implements TriggerRepositoryInterface, QueueIndexerRepository<TriggerState> {
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepository<TriggerState> implements TriggerRepositoryInterface, QueueIndexerRepository<TriggerState>, TriggerStateStore {
private static final Field<Object> NAMESPACE_FIELD = field("namespace");
private static final Field<Long> NEXT_EVALUATION_EPOCH_FIELD = field("next_evaluation_epoch", Long.class);
@@ -45,12 +45,12 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
private static final Field<Object> WORKER_ID_FIELD = field("worker_id");
private static final Field<Object> VALUE_FIELD = field("value");
private static final String NEXT_EVALUATION_DATE_COLUMN = "next_evaluation_date";
private static final Field<Object> KEY_FIELD = DSL.field(DSL.quotedName("key"));
private final JdbcFilterService filterService;
@Getter
private final Map<Triggers.Fields, String> fieldsMapping = Map.of(
Triggers.Fields.ID, "key",
Triggers.Fields.ID, KEY_FIELD.getName(),
Triggers.Fields.NAMESPACE, NAMESPACE_FIELD.getName(),
Triggers.Fields.FLOW_ID, FLOW_ID_FIELD.getName(),
Triggers.Fields.TRIGGER_ID, "trigger_id",
@@ -78,12 +78,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
@Override
public Optional<TriggerState> findById(TriggerId trigger) {
return findByUid(trigger.uid());
}
@Override
public Optional<TriggerState> findByUid(String uid) {
return findOne(DSL.trueCondition(), field("key").eq(uid));
return findOne(DSL.trueCondition(), KEY_FIELD.eq(trigger.uid()));
}
@Override
@@ -137,7 +132,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
.transactionResult(configuration -> {
DSL.using(configuration)
.insertInto(this.jdbcRepository.getTable())
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(trigger))
.set(KEY_FIELD, trigger.uid())
.set(this.jdbcRepository.persistFields(trigger))
.execute();
@@ -146,8 +141,15 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
}
@Override
public void delete(TriggerState trigger) {
this.jdbcRepository.delete(trigger);
public void delete(TriggerId trigger) {
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSL.using(configuration)
.delete(this.jdbcRepository.getTable())
.where(KEY_FIELD.eq(trigger.uid()))
.execute();
});
}
@Override
@@ -161,7 +163,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
var condition = this.fullTextCondition(query).and(this.defaultFilter());
if (namespace != null) {
condition = condition.and(DSL.or(NAMESPACE_FIELD.eq(namespace), NAMESPACE_FIELD.likeIgnoreCase(namespace + ".%")));
condition = condition.and(org.jooq.impl.DSL.or(NAMESPACE_FIELD.eq(namespace), NAMESPACE_FIELD.likeIgnoreCase(namespace + ".%")));
}
if (flowId != null) {
@@ -305,6 +307,22 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
});
}
/**
* {@inheritDoc}
**/
@Override
public List<TriggerState> findAllForVNodes(Set<Integer> vNodes) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> DSL.using(configuration)
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(VNODE_FIELD.in(vNodes))
.fetch()
)
.map(r -> this.jdbcRepository.deserialize(r.get("value", String.class)));
}
/**
* {@inheritDoc}
**/

View File

@@ -13,7 +13,7 @@ import io.kestra.core.services.MaintenanceService;
import io.kestra.core.utils.Disposable;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.scheduler.pubsub.TriggerWorkerJobResultSubscriber;
import io.kestra.scheduler.stores.TriggerStateStore;
import io.kestra.core.scheduler.store.TriggerStateStore;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.inject.Inject;

View File

@@ -35,7 +35,7 @@ import io.kestra.core.utils.Logs;
import io.kestra.scheduler.internals.NextEvaluationDate;
import io.kestra.scheduler.pubsub.TriggerExecutionPublisher;
import io.kestra.scheduler.stores.FlowMetaStore;
import io.kestra.scheduler.stores.TriggerStateStore;
import io.kestra.core.scheduler.store.TriggerStateStore;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@@ -300,7 +300,7 @@ public class TriggerEventHandler {
* @param event the event.
*/
void onTriggerDeleted(TriggerDeleted event) {
triggerStateStore.find(event.id()).ifPresent(state -> {
triggerStateStore.findById(event.id()).ifPresent(state -> {
triggerStateStore.delete(event.id());
maySendExecutionKilled(event, state);
});
@@ -367,7 +367,7 @@ public class TriggerEventHandler {
}
private Optional<TriggerState> findTriggerState(final TriggerEvent event) {
Optional<TriggerState> state = triggerStateStore.find(event.id());
Optional<TriggerState> state = triggerStateStore.findById(event.id());
if (state.isEmpty()) {
Logs.logTrigger(event.id(), Level.WARN, "Cannot process event {}. Cause: Trigger state not found.", event.type());
return Optional.empty();

View File

@@ -34,7 +34,7 @@ import io.kestra.scheduler.models.TriggerEvaluationContext;
import io.kestra.scheduler.pubsub.TriggerExecutionPublisher;
import io.kestra.scheduler.pubsub.TriggerWorkerJobPublisher;
import io.kestra.scheduler.stores.FlowMetaStore;
import io.kestra.scheduler.stores.TriggerStateStore;
import io.kestra.core.scheduler.store.TriggerStateStore;
import io.kestra.core.scheduler.vnodes.VNodes;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;

View File

@@ -6,7 +6,7 @@ import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.scheduler.SchedulerClock;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.core.utils.Logs;
import io.kestra.scheduler.stores.TriggerStateStore;
import io.kestra.core.scheduler.store.TriggerStateStore;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.scheduling.annotation.Scheduled;

View File

@@ -1,15 +1,15 @@
package io.kestra.scheduler.beans;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.scheduler.SchedulerConfiguration;
import io.kestra.core.scheduler.store.TriggerStateStore;
import io.kestra.scheduler.stores.CachedFlowMetaStore;
import io.kestra.scheduler.stores.CachedTriggerStateStore;
import io.kestra.scheduler.stores.DefaultFlowMetaStore;
import io.kestra.scheduler.stores.DefaultTriggerStateStore;
import io.kestra.scheduler.stores.FlowMetaStore;
import io.kestra.scheduler.stores.TriggerStateStore;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.event.BeanCreatedEvent;
import io.micronaut.context.event.BeanCreatedEventListener;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -19,21 +19,25 @@ public class BeanFactory {
@Inject
SchedulerConfiguration schedulerConfiguration;
@Inject
TriggerRepositoryInterface triggerRepository;
@Inject
FlowRepositoryInterface flowRepositoryInterface;
@Singleton
public TriggerStateStore triggerStateStore() {
DefaultTriggerStateStore store = new DefaultTriggerStateStore(schedulerConfiguration, triggerRepository);
return schedulerConfiguration.isCacheDisable() ? store : new CachedTriggerStateStore(store, schedulerConfiguration);
}
@Singleton
public FlowMetaStore flowMetaStore() {
DefaultFlowMetaStore store = new DefaultFlowMetaStore(schedulerConfiguration, flowRepositoryInterface);
return schedulerConfiguration.isCacheDisable() ? store: new CachedFlowMetaStore(store, schedulerConfiguration);
}
@Singleton
public static class TriggerStateStoreBeanDecorator implements BeanCreatedEventListener<TriggerStateStore> {
@Inject
SchedulerConfiguration configuration;
@Override
public TriggerStateStore onCreated(BeanCreatedEvent<TriggerStateStore> event) {
TriggerStateStore bean = event.getBean();
return configuration.isCacheDisable() ? bean : new CachedTriggerStateStore(bean, configuration);
}
}
}

View File

@@ -15,7 +15,7 @@ import io.kestra.core.utils.Logs;
import io.kestra.scheduler.SchedulableTriggerFetcher;
import io.kestra.scheduler.models.TriggerEvaluationContext;
import io.kestra.scheduler.stores.FlowMetaStore;
import io.kestra.scheduler.stores.TriggerStateStore;
import io.kestra.core.scheduler.store.TriggerStateStore;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@@ -5,10 +5,9 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.scheduler.SchedulerConfiguration;
import io.kestra.core.scheduler.vnodes.VNodes;
import io.kestra.core.scheduler.model.TriggerState;
import jakarta.inject.Inject;
import org.jspecify.annotations.NonNull;
import io.kestra.core.scheduler.store.TriggerStateStore;
import io.kestra.core.scheduler.vnodes.VNodes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -17,6 +16,11 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A decorator class adding caching capabilities on top of {@link TriggerStateStore}.
*
* @see SchedulerConfiguration
*/
public class CachedTriggerStateStore implements TriggerStateStore {
private static final Logger LOG = LoggerFactory.getLogger(CachedTriggerStateStore.class);
@@ -25,7 +29,6 @@ public class CachedTriggerStateStore implements TriggerStateStore {
private final SchedulerConfiguration schedulerConfiguration;
private final Map<Integer, Cache<String, TriggerState>> partitionedCache = new ConcurrentHashMap<>();
@Inject
public CachedTriggerStateStore(TriggerStateStore delegate, SchedulerConfiguration schedulerConfiguration) {
this.delegate = delegate;
this.schedulerConfiguration = schedulerConfiguration;
@@ -67,7 +70,7 @@ public class CachedTriggerStateStore implements TriggerStateStore {
return findAllForNodesFromCache(vNodes);
}
private List<@NonNull TriggerState> findAllForNodesFromCache(final Set<Integer> vNodes) {
private List<TriggerState> findAllForNodesFromCache(final Set<Integer> vNodes) {
return vNodes.stream()
.map(partitionedCache::get)
.filter(Objects::nonNull)
@@ -79,7 +82,7 @@ public class CachedTriggerStateStore implements TriggerStateStore {
* {@inheritDoc}
*/
@Override
public Optional<TriggerState> find(TriggerId triggerId) {
public Optional<TriggerState> findById(TriggerId triggerId) {
int vnode = VNodes.computeVNodeFromTrigger(triggerId, schedulerConfiguration.vnodes());
Cache<String, TriggerState> cache = partitionedCache.get(vnode);
@@ -90,7 +93,7 @@ public class CachedTriggerStateStore implements TriggerStateStore {
}
}
Optional<TriggerState> state = delegate.find(triggerId);
Optional<TriggerState> state = delegate.findById(triggerId);
state.ifPresent(s -> {
partitionedCache
.computeIfAbsent(vnode, k -> newCache())
@@ -103,12 +106,13 @@ public class CachedTriggerStateStore implements TriggerStateStore {
* {@inheritDoc}
*/
@Override
public void save(TriggerState triggerState) {
delegate.save(triggerState);
public TriggerState save(TriggerState triggerState) {
TriggerState saved = delegate.save(triggerState);
int vnode = triggerState.getVnode();
partitionedCache
.computeIfAbsent(vnode, k -> newCache())
.put(triggerState.uid(), triggerState);
return saved;
}
/**

View File

@@ -1,74 +0,0 @@
package io.kestra.scheduler.stores;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.scheduler.SchedulerConfiguration;
import io.kestra.core.scheduler.vnodes.VNodes;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* The {@link TriggerStateStore} implementation.
* <p>
* Implementation based on the {@link TriggerRepositoryInterface}.
*/
public class DefaultTriggerStateStore implements TriggerStateStore {
private final TriggerRepositoryInterface triggerRepository;
private final SchedulerConfiguration schedulerConfiguration;
public DefaultTriggerStateStore(SchedulerConfiguration schedulerConfiguration,
TriggerRepositoryInterface triggerRepository) {
this.triggerRepository = triggerRepository;
this.schedulerConfiguration = schedulerConfiguration;
}
/**
* {@inheritDoc}
**/
@Override
public List<TriggerState> findTriggersEligibleForScheduling(ZonedDateTime now, Set<Integer> vNodes, boolean locked) {
return triggerRepository.findTriggersEligibleForScheduling(now, vNodes, locked)
.stream()
.toList();
}
/**
* {@inheritDoc}
**/
@Override
public List<TriggerState> findAllForVNodes(final Set<Integer> vNodes) {
return this.triggerRepository.findAllForAllTenants()
.stream()
.filter(f -> vNodes.contains(VNodes.computeVNodeFromTrigger(TriggerId.of(f), schedulerConfiguration.vnodes())))
.toList();
}
/**
* {@inheritDoc}
**/
@Override
public Optional<TriggerState> find(TriggerId triggerId) {
return triggerRepository.findById(triggerId);
}
/**
* {@inheritDoc}
**/
@Override
public void save(TriggerState triggerState) {
triggerRepository.save(triggerState);
}
/**
* {@inheritDoc}
**/
@Override
public void delete(TriggerId triggerId) {
triggerRepository.findById(triggerId).ifPresent(triggerRepository::delete);
}
}

View File

@@ -27,7 +27,7 @@ import io.kestra.scheduler.pubsub.TriggerWorkerJobResultSubscriber;
import io.kestra.scheduler.stores.CachedFlowMetaStore;
import io.kestra.scheduler.stores.CachedTriggerStateStore;
import io.kestra.scheduler.stores.FlowMetaStore;
import io.kestra.scheduler.stores.TriggerStateStore;
import io.kestra.core.scheduler.store.TriggerStateStore;
import io.kestra.scheduler.utils.CollectorTriggerExecutionPublisher;
import io.kestra.scheduler.utils.InMemoryFlowMetaStore;
import io.kestra.scheduler.utils.InMemorySchedulerEventQueue;

View File

@@ -93,7 +93,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> saved = triggerStateStore.find(triggerId);
Optional<TriggerState> saved = triggerStateStore.findById(triggerId);
assertThat(saved).isPresent();
assertThat(TriggerId.of(saved.get())).isEqualTo(triggerId);
assertThat(saved.get().getLastEventId()).isNotNull();
@@ -109,7 +109,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> saved = triggerStateStore.find(triggerId);
Optional<TriggerState> saved = triggerStateStore.findById(triggerId);
assertThat(saved).isEmpty();
}
@@ -124,7 +124,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
assertThat(triggerStateStore.find(triggerId)).isEmpty();
assertThat(triggerStateStore.findById(triggerId)).isEmpty();
Mockito.verify(executionKilledQueue, Mockito.never()).emit(Mockito.any(ExecutionKilled.class));
}
@@ -139,7 +139,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
assertThat(triggerStateStore.find(triggerId)).isEmpty();
assertThat(triggerStateStore.findById(triggerId)).isEmpty();
Mockito.verify(executionKilledQueue, Mockito.only()).emit(Mockito.any(ExecutionKilled.class));
}
@@ -158,7 +158,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
Optional<TriggerState> updated = triggerStateStore.findById(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().isDisabled()).isTrue();
assertThat(updated.get().getUpdatedAt()).isAfter(triggerState.getUpdatedAt());
@@ -176,7 +176,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
Optional<TriggerState> updated = triggerStateStore.findById(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().isLocked()).isFalse();
assertThat(updated.get().getUpdatedAt()).isAfter(triggerState.getUpdatedAt());
@@ -194,7 +194,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
Optional<TriggerState> updated = triggerStateStore.findById(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().isDisabled()).isTrue();
assertThat(updated.get().getUpdatedAt()).isAfter(triggerState.getUpdatedAt());
@@ -214,7 +214,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
Optional<TriggerState> updated = triggerStateStore.findById(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().isDisabled()).isFalse();
assertThat(updated.get().getUpdatedAt()).isAfter(triggerState.getUpdatedAt());
@@ -238,7 +238,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
Optional<TriggerState> updated = triggerStateStore.findById(triggerId);
assertThat(updated)
.get()
.extracting(t -> t.getBackfill().getPaused())
@@ -262,7 +262,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
Optional<TriggerState> updated = triggerStateStore.findById(triggerId);
assertThat(updated)
.get()
.extracting(t -> t.getBackfill().getPaused())
@@ -281,7 +281,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
Optional<TriggerState> updated = triggerStateStore.findById(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().getLastEventId()).isEqualTo(event.eventId());
}
@@ -315,7 +315,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
Optional<TriggerState> updated = triggerStateStore.findById(triggerId);
assertThat(updated).isPresent();
assertThat(updated.get().getBackfill()).isNotNull();
assertThat(updated.get().getLastEventId()).isEqualTo(event.eventId());
@@ -333,7 +333,7 @@ class TriggerEventHandlerTest {
// THEN
// no exception expected, handled gracefully
assertThat(triggerStateStore.find(triggerId)).isPresent();
assertThat(triggerStateStore.findById(triggerId)).isPresent();
}
@Test
@@ -349,7 +349,7 @@ class TriggerEventHandlerTest {
// THEN
TriggerState updated;
updated = triggerStateStore.find(triggerId).orElseThrow();
updated = triggerStateStore.findById(triggerId).orElseThrow();
assertThat(updated.getLastEventId()).isEqualTo(event.eventId());
Instant updatedAt = updated.getUpdatedAt();
assertThat(updatedAt).isAfter(triggerState.getUpdatedAt());
@@ -358,7 +358,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
updated = triggerStateStore.find(triggerId).orElseThrow();
updated = triggerStateStore.findById(triggerId).orElseThrow();
assertThat(updated.getLastEventId()).isEqualTo(event.eventId());
assertThat(updated.getUpdatedAt()).isEqualTo(updatedAt); // not updated
}
@@ -379,7 +379,7 @@ class TriggerEventHandlerTest {
// THEN
TriggerState updated;
updated = triggerStateStore.find(triggerId).orElseThrow();
updated = triggerStateStore.findById(triggerId).orElseThrow();
assertThat(updated.getLastEventId()).isEqualTo(event2.eventId());
assertThat(updated.getUpdatedAt()).isEqualTo(state.getUpdatedAt()); // not updated
}
@@ -401,7 +401,7 @@ class TriggerEventHandlerTest {
handler.handle(CLOCK, TEST_VNODE, event);
// THEN
Optional<TriggerState> updated = triggerStateStore.find(triggerId);
Optional<TriggerState> updated = triggerStateStore.findById(triggerId);
assertThat(updated).get().extracting(TriggerState::getBackfill).isNull();
assertThat(updated).get().extracting(TriggerState::getNextEvaluationDate).isEqualTo(previousNextEvaluationDate.toInstant());
assertThat(updated).get().extracting(TriggerState::getLastEventId).isEqualTo(event.eventId());

View File

@@ -101,7 +101,7 @@ class TriggerSchedulerTest {
scheduler.onStart(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS); // vNode are 0-based
// THEN
TriggerState state = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState state = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
assertThat(state).isNotNull();
assertThat(state.isLocked()).isFalse();
assertThat(state.getEvaluatedAt()).isNull();
@@ -120,7 +120,7 @@ class TriggerSchedulerTest {
scheduler.onSchedule(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS);
// THEN
TriggerState state = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState state = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
assertThat(state).isNotNull();
assertThat(state.isLocked()).isTrue();
@@ -155,7 +155,7 @@ class TriggerSchedulerTest {
scheduler.onSchedule(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS);
// THEN
TriggerState state = triggerStateStore.find(Fixtures.triggerId("polling")).orElse(null);
TriggerState state = triggerStateStore.findById(Fixtures.triggerId("polling")).orElse(null);
assertThat(state).isNotNull();
assertThat(state.isLocked()).isTrue();
@@ -181,7 +181,7 @@ class TriggerSchedulerTest {
scheduler.onSchedule(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS);
// THEN
TriggerState state = triggerStateStore.find(Fixtures.triggerId("polling")).orElse(null);
TriggerState state = triggerStateStore.findById(Fixtures.triggerId("polling")).orElse(null);
assertThat(state).isNotNull();
assertThat(state.isLocked()).isFalse();
@@ -205,7 +205,7 @@ class TriggerSchedulerTest {
scheduler.onSchedule(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS);
// THEN
TriggerState state = triggerStateStore.find(Fixtures.triggerId("realtime")).orElse(null);
TriggerState state = triggerStateStore.findById(Fixtures.triggerId("realtime")).orElse(null);
assertThat(state).isNotNull();
assertThat(state.isLocked()).isTrue();
@@ -329,7 +329,7 @@ class TriggerSchedulerTest {
scheduler.onSchedule(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS);
// Assertions on TriggerState
TriggerState currentTriggerState = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState currentTriggerState = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
assertThat(currentTriggerState).isNotNull();
// [1-4 Calls] onSchedule
@@ -389,7 +389,7 @@ class TriggerSchedulerTest {
scheduler.onSchedule(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS);
// Assertions on TriggerState
TriggerState currentTriggerState = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState currentTriggerState = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
assertThat(currentTriggerState).isNotNull();
// [1st Call] onSchedule
@@ -453,7 +453,7 @@ class TriggerSchedulerTest {
// [THEN]
final ZonedDateTime expectedNextEvaluationNDate = ZonedDateTime.now(Clock.offset(initialSchedulerClock, Duration.ofHours(1)));
// Assert TriggerState
TriggerState currentTriggerState = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState currentTriggerState = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
assertThat(currentTriggerState).isNotNull();
assertThat(currentTriggerState.getEvaluatedAt()).isEqualTo(initialState.getEvaluatedAt());
@@ -474,7 +474,7 @@ class TriggerSchedulerTest {
// endregion [GIVEN]
// WHEN
TriggerState initialState = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState initialState = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
triggerStateStore.save(initialState
.locked(SchedulerClock.getClock(), false)
.disabled(SchedulerClock.getClock(), true)
@@ -484,7 +484,7 @@ class TriggerSchedulerTest {
scheduler.onSchedule(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS);
// THEN
TriggerState state = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState state = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
assertThat(state).isNotNull();
assertThat(state.isLocked()).isFalse();
@@ -506,7 +506,7 @@ class TriggerSchedulerTest {
// Trigger an initial execution
scheduler.onSchedule(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS);
TriggerState triggerState = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState triggerState = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
assertThat(triggerState).isNotNull();
assertThat(triggerState.getNextEvaluationDate()).isEqualTo(SchedulerClock.now().plusMinutes(15).toInstant());
@@ -527,7 +527,7 @@ class TriggerSchedulerTest {
scheduler.onSchedule(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS);
// Assertions on TriggerState
TriggerState currentTriggerState = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState currentTriggerState = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
assertThat(currentTriggerState).isNotNull();
assertThat(currentTriggerState.isLocked()).isTrue();
assertThat(currentTriggerState.getUpdatedAt()).isEqualTo(SchedulerClock.now().toInstant());
@@ -572,12 +572,12 @@ class TriggerSchedulerTest {
scheduler.onStart(SchedulerClock.getClock(), SchedulerClock.now().toInstant(), NODES_ASSIGNMENTS); // vNode are 0-based
// THEN
TriggerState state = triggerStateStore.find(Fixtures.triggerId()).orElse(null);
TriggerState state = triggerStateStore.findById(Fixtures.triggerId()).orElse(null);
assertThat(state).isNull();
}
private void completeExecution() {
triggerStateStore.find(Fixtures.triggerId()).ifPresent(state -> {
triggerStateStore.findById(Fixtures.triggerId()).ifPresent(state -> {
TriggerState newState = state
.updateForExecutionState(SchedulerClock.getClock(), State.Type.SUCCESS)
.locked(SchedulerClock.getClock(), false);

View File

@@ -2,7 +2,7 @@ package io.kestra.scheduler.utils;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.scheduler.stores.TriggerStateStore;
import io.kestra.core.scheduler.store.TriggerStateStore;
import java.time.ZonedDateTime;
import java.util.*;
@@ -44,7 +44,7 @@ public class InMemoryTriggerStateStore implements TriggerStateStore {
}
@Override
public Optional<TriggerState> find(TriggerId triggerId) {
public Optional<TriggerState> findById(TriggerId triggerId) {
return Optional.ofNullable(store.get(triggerId));
}

View File

@@ -356,16 +356,13 @@ public class TriggerController {
@ExecuteOn(TaskExecutors.IO)
@Delete(uri = "/{namespace}/{flowId}/{triggerId}")
@Operation(tags = {"Triggers"}, summary = "Delete a trigger")
public MutableHttpResponse<?> deleteTrigger(
public HttpResponse<Void> deleteTrigger(
@Parameter(description = "The namespace") @PathVariable String namespace,
@Parameter(description = "The flow id") @PathVariable String flowId,
@Parameter(description = "The trigger id") @PathVariable String triggerId
) throws HttpStatusException {
return triggerRepository.findById(TriggerId.of(tenantService.resolveTenant(), namespace, flowId, triggerId))
.map(it -> {
triggerRepository.delete(it);
triggerStateService.deleteById(TriggerId.of(tenantService.resolveTenant(), namespace, flowId, triggerId));
return HttpResponse.noContent();
}).orElseGet(HttpResponse::noContent);
}
@ExecuteOn(TaskExecutors.IO)
@@ -374,17 +371,8 @@ public class TriggerController {
public MutableHttpResponse<?> deleteTriggersByIds(
@Parameter(description = "The triggers to delete") @Body List<ApiTriggerId> triggers
) {
Integer count = triggers.stream().map(it -> it.toTriggerId(tenantService.resolveTenant()))
.map(trigger -> {
Optional<TriggerState> maybe = triggerRepository.findById(trigger);
if (maybe.isPresent()) {
triggerRepository.delete(maybe.get());
return 1;
}
return 0;
})
.reduce(Integer::sum)
.orElse(0);
List<TriggerId> ids = triggers.stream().map(it -> it.toTriggerId(tenantService.resolveTenant())).toList();
int count = triggerStateService.deleteByIdyIds(ids);
return HttpResponse.ok(BulkResponse.builder().count(count).build());
}
@@ -394,19 +382,7 @@ public class TriggerController {
public MutableHttpResponse<?> deleteTriggersByQuery(
@Parameter(description = "Filters") @QueryFilterFormat List<QueryFilter> filters
) {
Integer count = triggerRepository
.find(tenantService.resolveTenant(), filters)
.map(trigger -> {
try {
triggerRepository.delete(trigger);
return 1;
} catch (Exception ignored) {
return 0;
}
})
.reduce(Integer::sum)
.block();
int count = triggerStateService.deleteAllTriggersMatching(tenantService.resolveTenant(), filters);
return HttpResponse.ok(BulkResponse.builder().count(count).build());
}
// region [Disabled APIs]