From be5e24217bbdb95708b259de6223ee59360a6114 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Mon, 1 Sep 2025 16:57:34 +0200 Subject: [PATCH] chore(system): extract the scheduler to its own module --- .../commands/servers/SchedulerCommand.java | 2 +- core/build.gradle | 1 + .../kestra/core/metrics/MetricRegistry.java | 14 ---- .../ScheduleContextInterface.java | 2 +- .../{IndexerInterface.java => Scheduler.java} | 3 +- .../SchedulerTriggerStateInterface.java | 2 +- .../kestra/core/runners/StandAloneRunner.java | 3 +- .../io/kestra/core/schedulers/Scheduler.java | 9 --- .../AbstractFlowRepositoryTest.java | 61 ++++++++++++-- .../plugin/core/trigger/ToggleTest.java | 2 +- jdbc-h2/build.gradle | 1 + .../h2/H2SchedulerScheduleTest.java | 17 ---- jdbc-mysql/build.gradle | 2 + .../mysql/MysqlSchedulerScheduleTest.java | 6 +- jdbc-postgres/build.gradle | 2 + .../PostgresSchedulerScheduleTest.java | 6 +- .../AbstractJdbcTriggerRepository.java | 2 +- .../io/kestra/jdbc/runner/JdbcScheduler.java | 6 +- .../jdbc/runner/JdbcSchedulerContext.java | 2 +- .../runner/JdbcSchedulerTriggerState.java | 4 +- platform/build.gradle | 1 + scheduler/build.gradle | 20 +++++ .../kestra/scheduler}/AbstractScheduler.java | 9 +-- .../scheduler}/SchedulerExecutionState.java | 2 +- .../SchedulerExecutionStateInterface.java | 2 +- .../SchedulerExecutionWithTrigger.java | 2 +- .../endpoint}/SchedulerEndpoint.java | 4 +- .../scheduler}/AbstractSchedulerTest.java | 3 +- .../scheduler}/SchedulerConditionTest.java | 3 +- .../SchedulerPollingTriggerTest.java | 3 +- .../SchedulerScheduleOnDatesTest.java | 3 +- .../scheduler}/SchedulerScheduleTest.java | 3 +- .../scheduler}/SchedulerStreamingTest.java | 2 +- .../scheduler}/SchedulerThreadTest.java | 3 +- .../SchedulerTriggerChangeTest.java | 4 +- .../SchedulerTriggerStateInterfaceTest.java | 3 +- .../src/test/resources/allure.properties | 1 + .../src/test/resources/application-test.yml | 80 +++++++++++++++++++ scheduler/src/test/resources/logback.xml | 11 +++ settings.gradle | 1 + 40 files changed, 220 insertions(+), 87 deletions(-) rename core/src/main/java/io/kestra/core/{schedulers => runners}/ScheduleContextInterface.java (92%) rename core/src/main/java/io/kestra/core/runners/{IndexerInterface.java => Scheduler.java} (54%) rename core/src/main/java/io/kestra/core/{schedulers => runners}/SchedulerTriggerStateInterface.java (98%) delete mode 100644 core/src/main/java/io/kestra/core/schedulers/Scheduler.java delete mode 100644 jdbc-h2/src/test/java/io/kestra/schedulers/h2/H2SchedulerScheduleTest.java create mode 100644 scheduler/build.gradle rename {core/src/main/java/io/kestra/core/schedulers => scheduler/src/main/java/io/kestra/scheduler}/AbstractScheduler.java (99%) rename {core/src/main/java/io/kestra/core/schedulers => scheduler/src/main/java/io/kestra/scheduler}/SchedulerExecutionState.java (93%) rename {core/src/main/java/io/kestra/core/schedulers => scheduler/src/main/java/io/kestra/scheduler}/SchedulerExecutionStateInterface.java (84%) rename {core/src/main/java/io/kestra/core/schedulers => scheduler/src/main/java/io/kestra/scheduler}/SchedulerExecutionWithTrigger.java (90%) rename {core/src/main/java/io/kestra/core/endpoints => scheduler/src/main/java/io/kestra/scheduler/endpoint}/SchedulerEndpoint.java (96%) rename {core/src/test/java/io/kestra/core/schedulers => scheduler/src/test/java/io/kestra/scheduler}/AbstractSchedulerTest.java (98%) rename {core/src/test/java/io/kestra/core/schedulers => scheduler/src/test/java/io/kestra/scheduler}/SchedulerConditionTest.java (97%) rename {core/src/test/java/io/kestra/core/schedulers => scheduler/src/test/java/io/kestra/scheduler}/SchedulerPollingTriggerTest.java (99%) rename {core/src/test/java/io/kestra/core/schedulers => scheduler/src/test/java/io/kestra/scheduler}/SchedulerScheduleOnDatesTest.java (99%) rename {core/src/test/java/io/kestra/core/schedulers => scheduler/src/test/java/io/kestra/scheduler}/SchedulerScheduleTest.java (99%) rename {core/src/test/java/io/kestra/core/schedulers => scheduler/src/test/java/io/kestra/scheduler}/SchedulerStreamingTest.java (99%) rename {core/src/test/java/io/kestra/core/schedulers => scheduler/src/test/java/io/kestra/scheduler}/SchedulerThreadTest.java (97%) rename {core/src/test/java/io/kestra/core/schedulers => scheduler/src/test/java/io/kestra/scheduler}/SchedulerTriggerChangeTest.java (97%) rename {core/src/test/java/io/kestra/core/schedulers => scheduler/src/test/java/io/kestra/scheduler}/SchedulerTriggerStateInterfaceTest.java (94%) create mode 100644 scheduler/src/test/resources/allure.properties create mode 100644 scheduler/src/test/resources/application-test.yml create mode 100644 scheduler/src/test/resources/logback.xml diff --git a/cli/src/main/java/io/kestra/cli/commands/servers/SchedulerCommand.java b/cli/src/main/java/io/kestra/cli/commands/servers/SchedulerCommand.java index f32a17c3e5..72827cd70c 100644 --- a/cli/src/main/java/io/kestra/cli/commands/servers/SchedulerCommand.java +++ b/cli/src/main/java/io/kestra/cli/commands/servers/SchedulerCommand.java @@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers; import com.google.common.collect.ImmutableMap; import io.kestra.core.models.ServerType; -import io.kestra.core.schedulers.AbstractScheduler; +import io.kestra.core.scheduler.AbstractScheduler; import io.kestra.core.utils.Await; import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; diff --git a/core/build.gradle b/core/build.gradle index c56d3fcd27..55e319fe4c 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -75,6 +75,7 @@ dependencies { testImplementation project(':runner-memory') testImplementation project(':storage-local') testImplementation project(':worker') + testImplementation project(':scheduler') testImplementation project(':executor') testImplementation "io.micronaut:micronaut-http-client" diff --git a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java index 6967b6571f..ebca846a8e 100644 --- a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java +++ b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java @@ -6,7 +6,6 @@ import io.kestra.core.models.tasks.Task; import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.runners.*; -import io.kestra.core.schedulers.SchedulerExecutionWithTrigger; import io.micrometer.core.instrument.*; import io.micrometer.core.instrument.binder.MeterBinder; import io.micrometer.core.instrument.search.Search; @@ -395,19 +394,6 @@ public class MetricRegistry { return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId()); } - /** - * Return tags for current {@link SchedulerExecutionWithTrigger}. - * - * @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger - * @return tags to apply to metrics - */ - public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) { - return ArrayUtils.addAll( - this.tags(schedulerExecutionWithTrigger.getExecution()), - tags - ); - } - /** * Return tags for current {@link ExecutionKilled} * diff --git a/core/src/main/java/io/kestra/core/schedulers/ScheduleContextInterface.java b/core/src/main/java/io/kestra/core/runners/ScheduleContextInterface.java similarity index 92% rename from core/src/main/java/io/kestra/core/schedulers/ScheduleContextInterface.java rename to core/src/main/java/io/kestra/core/runners/ScheduleContextInterface.java index 83d2ae8e8c..ce83e5be20 100644 --- a/core/src/main/java/io/kestra/core/schedulers/ScheduleContextInterface.java +++ b/core/src/main/java/io/kestra/core/runners/ScheduleContextInterface.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.core.runners; import java.util.function.Consumer; diff --git a/core/src/main/java/io/kestra/core/runners/IndexerInterface.java b/core/src/main/java/io/kestra/core/runners/Scheduler.java similarity index 54% rename from core/src/main/java/io/kestra/core/runners/IndexerInterface.java rename to core/src/main/java/io/kestra/core/runners/Scheduler.java index 141650f1e6..405c1d69dc 100644 --- a/core/src/main/java/io/kestra/core/runners/IndexerInterface.java +++ b/core/src/main/java/io/kestra/core/runners/Scheduler.java @@ -2,6 +2,5 @@ package io.kestra.core.runners; import io.kestra.core.server.Service; -public interface IndexerInterface extends Service, Runnable { - +public interface Scheduler extends Service, Runnable { } diff --git a/core/src/main/java/io/kestra/core/schedulers/SchedulerTriggerStateInterface.java b/core/src/main/java/io/kestra/core/runners/SchedulerTriggerStateInterface.java similarity index 98% rename from core/src/main/java/io/kestra/core/schedulers/SchedulerTriggerStateInterface.java rename to core/src/main/java/io/kestra/core/runners/SchedulerTriggerStateInterface.java index e4f3f9a279..3fa7eb5f04 100644 --- a/core/src/main/java/io/kestra/core/schedulers/SchedulerTriggerStateInterface.java +++ b/core/src/main/java/io/kestra/core/runners/SchedulerTriggerStateInterface.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.core.runners; import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.flows.Flow; diff --git a/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java b/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java index ec4eb8ee8e..05743287db 100644 --- a/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java +++ b/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java @@ -1,6 +1,5 @@ package io.kestra.core.runners; -import io.kestra.core.schedulers.AbstractScheduler; import io.kestra.core.server.Service; import io.kestra.core.utils.Await; import io.kestra.core.utils.ExecutorsUtils; @@ -64,7 +63,7 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable { } if (schedulerEnabled) { - AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class); + Scheduler scheduler = applicationContext.getBean(Scheduler.class); poolExecutor.execute(scheduler); servers.add(scheduler); } diff --git a/core/src/main/java/io/kestra/core/schedulers/Scheduler.java b/core/src/main/java/io/kestra/core/schedulers/Scheduler.java deleted file mode 100644 index def829b292..0000000000 --- a/core/src/main/java/io/kestra/core/schedulers/Scheduler.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.kestra.core.schedulers; - -import jakarta.inject.Singleton; - -@SuppressWarnings("try") -@Singleton -public interface Scheduler extends Runnable, AutoCloseable { - -} diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java index f6b8a94279..0a1e14a6c1 100644 --- a/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java +++ b/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java @@ -1,5 +1,6 @@ package io.kestra.core.repositories; +import com.google.common.collect.ImmutableMap; import io.kestra.core.Helpers; import io.kestra.core.events.CrudEvent; import io.kestra.core.events.CrudEventType; @@ -10,13 +11,17 @@ import io.kestra.core.models.QueryFilter; import io.kestra.core.models.QueryFilter.Field; import io.kestra.core.models.QueryFilter.Op; import io.kestra.core.models.SearchResult; +import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.ExecutionTrigger; import io.kestra.core.models.flows.*; import io.kestra.core.models.flows.input.StringInput; import io.kestra.core.models.property.Property; +import io.kestra.core.models.triggers.AbstractTrigger; +import io.kestra.core.models.triggers.PollingTriggerInterface; +import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.queues.QueueException; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; -import io.kestra.core.schedulers.AbstractSchedulerTest; import io.kestra.core.services.FlowService; import io.kestra.core.utils.Await; import io.kestra.core.utils.IdUtils; @@ -28,7 +33,8 @@ import io.micronaut.data.model.Sort; import jakarta.inject.Inject; import jakarta.inject.Singleton; import jakarta.validation.ConstraintViolationException; -import lombok.Getter; +import lombok.*; +import lombok.experimental.SuperBuilder; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -552,9 +558,9 @@ public abstract class AbstractFlowRepositoryTest { .id(flowId) .namespace(TEST_NAMESPACE) .tenantId(MAIN_TENANT) - .triggers(Collections.singletonList(AbstractSchedulerTest.UnitTest.builder() + .triggers(Collections.singletonList(UnitTest.builder() .id("sleep") - .type(AbstractSchedulerTest.UnitTest.class.getName()) + .type(UnitTest.class.getName()) .build())) .tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())) .build(); @@ -592,9 +598,9 @@ public abstract class AbstractFlowRepositoryTest { .id(flowId) .namespace(TEST_NAMESPACE) .tenantId(MAIN_TENANT) - .triggers(Collections.singletonList(AbstractSchedulerTest.UnitTest.builder() + .triggers(Collections.singletonList(UnitTest.builder() .id("sleep") - .type(AbstractSchedulerTest.UnitTest.class.getName()) + .type(UnitTest.class.getName()) .build())) .tasks(Collections.singletonList(Return.builder().id(TEST_FLOW_ID).type(Return.class.getName()).format(Property.ofValue(TEST_FLOW_ID)).build())) .build(); @@ -911,4 +917,47 @@ public abstract class AbstractFlowRepositoryTest { return GenericFlow.fromYaml(TEST_TENANT_ID, source); } + protected static int COUNTER = 0; + + @SuperBuilder + @ToString + @EqualsAndHashCode + @Getter + @NoArgsConstructor + public static class UnitTest extends AbstractTrigger implements PollingTriggerInterface { + @Builder.Default + private final Duration interval = Duration.ofSeconds(2); + + private String defaultInjected; + + public Optional evaluate(ConditionContext conditionContext, TriggerContext context) throws InterruptedException { + COUNTER++; + + if (COUNTER % 2 == 0) { + Thread.sleep(4000); + + return Optional.empty(); + } else { + Execution execution = Execution.builder() + .id(IdUtils.create()) + .tenantId(context.getTenantId()) + .namespace(context.getNamespace()) + .flowId(context.getFlowId()) + .flowRevision(conditionContext.getFlow().getRevision()) + .state(new State()) + .trigger(ExecutionTrigger.builder() + .id(this.getId()) + .type(this.getType()) + .variables(ImmutableMap.of( + "counter", COUNTER, + "defaultInjected", defaultInjected == null ? "ko" : defaultInjected + )) + .build() + ) + .build(); + + return Optional.of(execution); + } + } + } } diff --git a/core/src/test/java/io/kestra/plugin/core/trigger/ToggleTest.java b/core/src/test/java/io/kestra/plugin/core/trigger/ToggleTest.java index 97d3352c59..eb733b438f 100644 --- a/core/src/test/java/io/kestra/plugin/core/trigger/ToggleTest.java +++ b/core/src/test/java/io/kestra/plugin/core/trigger/ToggleTest.java @@ -12,7 +12,7 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.TriggerRepositoryInterface; import io.kestra.core.runners.RunnerUtils; -import io.kestra.core.schedulers.AbstractScheduler; +import io.kestra.core.scheduler.AbstractScheduler; import io.kestra.core.utils.Await; import io.kestra.core.utils.TestsUtils; import jakarta.inject.Inject; diff --git a/jdbc-h2/build.gradle b/jdbc-h2/build.gradle index 986ac44eb5..5b0819bb43 100644 --- a/jdbc-h2/build.gradle +++ b/jdbc-h2/build.gradle @@ -6,6 +6,7 @@ dependencies { implementation project(":core") implementation project(":jdbc") implementation project(":executor") + implementation project(":scheduler") implementation("io.micronaut.sql:micronaut-jooq") runtimeOnly("com.h2database:h2") diff --git a/jdbc-h2/src/test/java/io/kestra/schedulers/h2/H2SchedulerScheduleTest.java b/jdbc-h2/src/test/java/io/kestra/schedulers/h2/H2SchedulerScheduleTest.java deleted file mode 100644 index 2d6211ff55..0000000000 --- a/jdbc-h2/src/test/java/io/kestra/schedulers/h2/H2SchedulerScheduleTest.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.kestra.schedulers.h2; - -import io.kestra.core.runners.FlowListeners; -import io.kestra.core.schedulers.AbstractScheduler; -import io.kestra.core.schedulers.SchedulerExecutionStateInterface; -import io.kestra.core.schedulers.SchedulerScheduleTest; -import io.kestra.jdbc.runner.JdbcScheduler; - -class H2SchedulerScheduleTest extends SchedulerScheduleTest { - @Override - protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) { - return new JdbcScheduler( - applicationContext, - flowListenersServiceSpy - ); - } -} diff --git a/jdbc-mysql/build.gradle b/jdbc-mysql/build.gradle index 21201568cd..886345ace5 100644 --- a/jdbc-mysql/build.gradle +++ b/jdbc-mysql/build.gradle @@ -6,6 +6,7 @@ dependencies { implementation project(":core") implementation project(":jdbc") implementation project(":executor") + implementation project(":scheduler") implementation("io.micronaut.sql:micronaut-jooq") runtimeOnly("com.mysql:mysql-connector-j") @@ -13,6 +14,7 @@ dependencies { testImplementation project(':core').sourceSets.test.output testImplementation project(':jdbc').sourceSets.test.output + testImplementation project(':scheduler').sourceSets.test.output testImplementation project(':storage-local') testImplementation project(':tests') testImplementation("io.micronaut.validation:micronaut-validation") // MysqlServiceLivenessCoordinatorTest fail to init without that diff --git a/jdbc-mysql/src/test/java/io/kestra/schedulers/mysql/MysqlSchedulerScheduleTest.java b/jdbc-mysql/src/test/java/io/kestra/schedulers/mysql/MysqlSchedulerScheduleTest.java index 6e017199eb..1b62d14d39 100644 --- a/jdbc-mysql/src/test/java/io/kestra/schedulers/mysql/MysqlSchedulerScheduleTest.java +++ b/jdbc-mysql/src/test/java/io/kestra/schedulers/mysql/MysqlSchedulerScheduleTest.java @@ -1,10 +1,10 @@ package io.kestra.schedulers.mysql; import io.kestra.core.runners.FlowListeners; -import io.kestra.core.schedulers.AbstractScheduler; -import io.kestra.core.schedulers.SchedulerExecutionStateInterface; -import io.kestra.core.schedulers.SchedulerScheduleTest; import io.kestra.jdbc.runner.JdbcScheduler; +import io.kestra.scheduler.AbstractScheduler; +import io.kestra.scheduler.SchedulerExecutionStateInterface; +import io.kestra.scheduler.SchedulerScheduleTest; class MysqlSchedulerScheduleTest extends SchedulerScheduleTest { @Override diff --git a/jdbc-postgres/build.gradle b/jdbc-postgres/build.gradle index 4b8c1c49fc..94fa5a871f 100644 --- a/jdbc-postgres/build.gradle +++ b/jdbc-postgres/build.gradle @@ -6,6 +6,7 @@ dependencies { implementation project(":core") implementation project(":jdbc") implementation project(":executor") + implementation project(":scheduler") implementation("io.micronaut.sql:micronaut-jooq") runtimeOnly("org.postgresql:postgresql") @@ -13,6 +14,7 @@ dependencies { testImplementation project(':core').sourceSets.test.output testImplementation project(':jdbc').sourceSets.test.output + testImplementation project(':scheduler').sourceSets.test.output testImplementation project(':storage-local') testImplementation project(':tests') testImplementation("io.micronaut.validation:micronaut-validation") // PostgresServiceLivenessCoordinatorTest fail to init without that diff --git a/jdbc-postgres/src/test/java/io/kestra/schedulers/postgres/PostgresSchedulerScheduleTest.java b/jdbc-postgres/src/test/java/io/kestra/schedulers/postgres/PostgresSchedulerScheduleTest.java index 06ad6f687d..50bdde6b87 100644 --- a/jdbc-postgres/src/test/java/io/kestra/schedulers/postgres/PostgresSchedulerScheduleTest.java +++ b/jdbc-postgres/src/test/java/io/kestra/schedulers/postgres/PostgresSchedulerScheduleTest.java @@ -1,10 +1,10 @@ package io.kestra.schedulers.postgres; import io.kestra.core.runners.FlowListeners; -import io.kestra.core.schedulers.AbstractScheduler; -import io.kestra.core.schedulers.SchedulerExecutionStateInterface; -import io.kestra.core.schedulers.SchedulerScheduleTest; import io.kestra.jdbc.runner.JdbcScheduler; +import io.kestra.scheduler.AbstractScheduler; +import io.kestra.scheduler.SchedulerExecutionStateInterface; +import io.kestra.scheduler.SchedulerScheduleTest; class PostgresSchedulerScheduleTest extends SchedulerScheduleTest { @Override diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java index fecadef7d7..dfaba2aa69 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java @@ -14,7 +14,7 @@ import io.kestra.core.models.triggers.Trigger; import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.repositories.ArrayListTotal; import io.kestra.core.repositories.TriggerRepositoryInterface; -import io.kestra.core.schedulers.ScheduleContextInterface; +import io.kestra.core.runners.ScheduleContextInterface; import io.kestra.core.utils.DateUtils; import io.kestra.core.utils.ListUtils; import io.kestra.jdbc.runner.JdbcQueueIndexerInterface; diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java index d6e3e1340b..f5d7121019 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java @@ -3,13 +3,15 @@ package io.kestra.jdbc.runner; import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.triggers.Trigger; import io.kestra.core.repositories.TriggerRepositoryInterface; -import io.kestra.core.schedulers.*; +import io.kestra.core.runners.ScheduleContextInterface; +import io.kestra.core.runners.SchedulerTriggerStateInterface; import io.kestra.core.services.FlowListenersInterface; import io.kestra.core.services.FlowService; -import io.kestra.core.services.PluginDefaultService; import io.kestra.core.utils.ListUtils; import io.kestra.jdbc.JooqDSLContextWrapper; import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository; +import io.kestra.scheduler.AbstractScheduler; +import io.kestra.scheduler.SchedulerExecutionState; import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import jakarta.inject.Singleton; diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerContext.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerContext.java index 2dcad9a360..398929ee99 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerContext.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerContext.java @@ -1,6 +1,6 @@ package io.kestra.jdbc.runner; -import io.kestra.core.schedulers.ScheduleContextInterface; +import io.kestra.core.runners.ScheduleContextInterface; import io.kestra.jdbc.JooqDSLContextWrapper; import lombok.Getter; import org.jooq.DSLContext; diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java index 4740e2fa07..db8419d456 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java @@ -7,8 +7,8 @@ import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.Trigger; import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.queues.QueueException; -import io.kestra.core.schedulers.ScheduleContextInterface; -import io.kestra.core.schedulers.SchedulerTriggerStateInterface; +import io.kestra.core.runners.ScheduleContextInterface; +import io.kestra.core.runners.SchedulerTriggerStateInterface; import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository; import jakarta.annotation.PostConstruct; import jakarta.inject.Singleton; diff --git a/platform/build.gradle b/platform/build.gradle index d5e61a7cdc..4986371604 100644 --- a/platform/build.gradle +++ b/platform/build.gradle @@ -157,5 +157,6 @@ dependencies { api "io.kestra:repository-memory:$version" api "io.kestra:runner-memory:$version" api "io.kestra:storage-local:$version" + api "io.kestra:scheduler:$version" } } diff --git a/scheduler/build.gradle b/scheduler/build.gradle new file mode 100644 index 0000000000..3ffb1d8f18 --- /dev/null +++ b/scheduler/build.gradle @@ -0,0 +1,20 @@ +configurations { + implementation.extendsFrom(micronaut) +} + +dependencies { + annotationProcessor project(':processor') + implementation project(":core") + + // test + testAnnotationProcessor project(':processor') + testImplementation project(':core').sourceSets.test.output + testImplementation project(':storage-local') + testImplementation project(':worker') + + testImplementation project(':tests') + testImplementation project(':jdbc') + testImplementation project(':jdbc').sourceSets.test.output + testImplementation project(':jdbc-h2') + testImplementation("io.micronaut.sql:micronaut-jooq") +} \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java b/scheduler/src/main/java/io/kestra/scheduler/AbstractScheduler.java similarity index 99% rename from core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java rename to scheduler/src/main/java/io/kestra/scheduler/AbstractScheduler.java index fb60f08d6e..b32f3f514e 100644 --- a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java +++ b/scheduler/src/main/java/io/kestra/scheduler/AbstractScheduler.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; @@ -26,7 +26,6 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.runners.*; import io.kestra.core.server.ClusterEvent; -import io.kestra.core.server.Service; import io.kestra.core.server.ServiceStateChangeEvent; import io.kestra.core.server.ServiceType; import io.kestra.core.services.*; @@ -64,7 +63,7 @@ import java.util.stream.Collectors; @Slf4j @Singleton @SuppressWarnings("this-escape") -public abstract class AbstractScheduler implements Scheduler, Service { +public abstract class AbstractScheduler implements Scheduler { protected final ApplicationContext applicationContext; protected final QueueInterface executionQueue; protected final QueueInterface triggerQueue; @@ -824,7 +823,7 @@ public abstract class AbstractScheduler implements Scheduler, Service { private void log(SchedulerExecutionWithTrigger executionWithTrigger) { metricRegistry - .counter(MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT, MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION, metricRegistry.tags(executionWithTrigger)) + .counter(MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT, MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION, metricRegistry.tags(executionWithTrigger.getExecution())) .increment(); ZonedDateTime now = now(); @@ -841,7 +840,7 @@ public abstract class AbstractScheduler implements Scheduler, Service { // FIXME : "late" are not excluded and can increase delay value (false positive) if (next != null && now.isBefore(next)) { metricRegistry - .timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION, metricRegistry.tags(executionWithTrigger)) + .timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION, metricRegistry.tags(executionWithTrigger.getExecution())) .record(Duration.between( executionWithTrigger.getTriggerContext().getDate(), now )); diff --git a/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionState.java b/scheduler/src/main/java/io/kestra/scheduler/SchedulerExecutionState.java similarity index 93% rename from core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionState.java rename to scheduler/src/main/java/io/kestra/scheduler/SchedulerExecutionState.java index bf8acf5e72..ae355dff67 100644 --- a/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionState.java +++ b/scheduler/src/main/java/io/kestra/scheduler/SchedulerExecutionState.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import io.kestra.core.models.executions.Execution; import io.kestra.core.repositories.ExecutionRepositoryInterface; diff --git a/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionStateInterface.java b/scheduler/src/main/java/io/kestra/scheduler/SchedulerExecutionStateInterface.java similarity index 84% rename from core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionStateInterface.java rename to scheduler/src/main/java/io/kestra/scheduler/SchedulerExecutionStateInterface.java index 32652b0a99..7279dcd519 100644 --- a/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionStateInterface.java +++ b/scheduler/src/main/java/io/kestra/scheduler/SchedulerExecutionStateInterface.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import io.kestra.core.models.executions.Execution; diff --git a/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionWithTrigger.java b/scheduler/src/main/java/io/kestra/scheduler/SchedulerExecutionWithTrigger.java similarity index 90% rename from core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionWithTrigger.java rename to scheduler/src/main/java/io/kestra/scheduler/SchedulerExecutionWithTrigger.java index 4660d6962e..2848d1d423 100644 --- a/core/src/main/java/io/kestra/core/schedulers/SchedulerExecutionWithTrigger.java +++ b/scheduler/src/main/java/io/kestra/scheduler/SchedulerExecutionWithTrigger.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/core/src/main/java/io/kestra/core/endpoints/SchedulerEndpoint.java b/scheduler/src/main/java/io/kestra/scheduler/endpoint/SchedulerEndpoint.java similarity index 96% rename from core/src/main/java/io/kestra/core/endpoints/SchedulerEndpoint.java rename to scheduler/src/main/java/io/kestra/scheduler/endpoint/SchedulerEndpoint.java index 554bf78d41..99d63171fe 100644 --- a/core/src/main/java/io/kestra/core/endpoints/SchedulerEndpoint.java +++ b/scheduler/src/main/java/io/kestra/scheduler/endpoint/SchedulerEndpoint.java @@ -1,9 +1,9 @@ -package io.kestra.core.endpoints; +package io.kestra.scheduler.endpoint; import io.kestra.core.models.flows.FlowWithException; import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.Trigger; -import io.kestra.core.schedulers.AbstractScheduler; +import io.kestra.scheduler.AbstractScheduler; import io.micronaut.context.annotation.Requires; import io.micronaut.management.endpoint.annotation.Endpoint; import io.micronaut.management.endpoint.annotation.Read; diff --git a/core/src/test/java/io/kestra/core/schedulers/AbstractSchedulerTest.java b/scheduler/src/test/java/io/kestra/scheduler/AbstractSchedulerTest.java similarity index 98% rename from core/src/test/java/io/kestra/core/schedulers/AbstractSchedulerTest.java rename to scheduler/src/test/java/io/kestra/scheduler/AbstractSchedulerTest.java index 9151a9f5d9..90d36a3b87 100644 --- a/core/src/test/java/io/kestra/core/schedulers/AbstractSchedulerTest.java +++ b/scheduler/src/test/java/io/kestra/scheduler/AbstractSchedulerTest.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import com.google.common.collect.ImmutableMap; import io.kestra.core.models.Label; @@ -16,6 +16,7 @@ import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; +import io.kestra.core.runners.SchedulerTriggerStateInterface; import io.kestra.core.services.ExecutionService; import io.kestra.plugin.core.debug.Return; import io.kestra.core.utils.IdUtils; diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java b/scheduler/src/test/java/io/kestra/scheduler/SchedulerConditionTest.java similarity index 97% rename from core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java rename to scheduler/src/test/java/io/kestra/scheduler/SchedulerConditionTest.java index 11524769bc..cabe85c813 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerConditionTest.java +++ b/scheduler/src/test/java/io/kestra/scheduler/SchedulerConditionTest.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.FlowWithSource; @@ -8,6 +8,7 @@ import io.kestra.core.models.flows.GenericFlow; import io.kestra.core.models.triggers.Trigger; import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.runners.FlowListeners; +import io.kestra.core.runners.SchedulerTriggerStateInterface; import io.kestra.core.utils.TestsUtils; import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.core.condition.DayWeekInMonth; diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerPollingTriggerTest.java b/scheduler/src/test/java/io/kestra/scheduler/SchedulerPollingTriggerTest.java similarity index 99% rename from core/src/test/java/io/kestra/core/schedulers/SchedulerPollingTriggerTest.java rename to scheduler/src/test/java/io/kestra/scheduler/SchedulerPollingTriggerTest.java index 14ade1bc71..83325f53e9 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerPollingTriggerTest.java +++ b/scheduler/src/test/java/io/kestra/scheduler/SchedulerPollingTriggerTest.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import io.kestra.core.models.Label; import io.kestra.core.models.flows.FlowWithSource; @@ -6,6 +6,7 @@ import io.kestra.core.models.property.Property; import io.kestra.core.models.flows.GenericFlow; import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.repositories.FlowRepositoryInterface; +import io.kestra.core.runners.SchedulerTriggerStateInterface; import io.kestra.core.tasks.test.FailingPollingTrigger; import io.kestra.core.utils.TestsUtils; import io.kestra.jdbc.runner.JdbcScheduler; diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleOnDatesTest.java b/scheduler/src/test/java/io/kestra/scheduler/SchedulerScheduleOnDatesTest.java similarity index 99% rename from core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleOnDatesTest.java rename to scheduler/src/test/java/io/kestra/scheduler/SchedulerScheduleOnDatesTest.java index 0c606b6cdb..703ab98474 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleOnDatesTest.java +++ b/scheduler/src/test/java/io/kestra/scheduler/SchedulerScheduleOnDatesTest.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; @@ -10,6 +10,7 @@ import io.kestra.core.models.triggers.RecoverMissedSchedules; import io.kestra.core.models.triggers.Trigger; import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.runners.FlowListeners; +import io.kestra.core.runners.SchedulerTriggerStateInterface; import io.kestra.core.utils.Await; import io.kestra.core.utils.TestsUtils; import io.kestra.jdbc.runner.JdbcScheduler; diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java b/scheduler/src/test/java/io/kestra/scheduler/SchedulerScheduleTest.java similarity index 99% rename from core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java rename to scheduler/src/test/java/io/kestra/scheduler/SchedulerScheduleTest.java index e59cf6e10a..0067d6905e 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerScheduleTest.java +++ b/scheduler/src/test/java/io/kestra/scheduler/SchedulerScheduleTest.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import com.devskiller.friendly_id.FriendlyId; import io.kestra.core.models.executions.TaskRun; @@ -7,6 +7,7 @@ import io.kestra.core.models.flows.PluginDefault; import io.kestra.core.models.property.Property; import io.kestra.core.models.flows.GenericFlow; import io.kestra.core.repositories.FlowRepositoryInterface; +import io.kestra.core.runners.SchedulerTriggerStateInterface; import io.kestra.core.utils.TestsUtils; import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.core.condition.Expression; diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java b/scheduler/src/test/java/io/kestra/scheduler/SchedulerStreamingTest.java similarity index 99% rename from core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java rename to scheduler/src/test/java/io/kestra/scheduler/SchedulerStreamingTest.java index bc87aa4d5c..455250aac0 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java +++ b/scheduler/src/test/java/io/kestra/scheduler/SchedulerStreamingTest.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import com.google.common.collect.ImmutableMap; import io.kestra.core.models.Label; diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java b/scheduler/src/test/java/io/kestra/scheduler/SchedulerThreadTest.java similarity index 97% rename from core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java rename to scheduler/src/test/java/io/kestra/scheduler/SchedulerThreadTest.java index 88d820a3a3..09bf8e72b0 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerThreadTest.java +++ b/scheduler/src/test/java/io/kestra/scheduler/SchedulerThreadTest.java @@ -1,8 +1,7 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import io.kestra.core.models.Label; import io.kestra.core.models.executions.Execution; -import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.flows.State; import io.kestra.core.models.triggers.Trigger; diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerChangeTest.java b/scheduler/src/test/java/io/kestra/scheduler/SchedulerTriggerChangeTest.java similarity index 97% rename from core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerChangeTest.java rename to scheduler/src/test/java/io/kestra/scheduler/SchedulerTriggerChangeTest.java index 828592c226..ce6ad4e553 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerChangeTest.java +++ b/scheduler/src/test/java/io/kestra/scheduler/SchedulerTriggerChangeTest.java @@ -1,4 +1,4 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.executions.Execution; @@ -113,7 +113,7 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest { applicationContext, flowListenersService ); - Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null) + DefaultWorker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null) ) { // start the worker as it execute polling triggers worker.run(); diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerStateInterfaceTest.java b/scheduler/src/test/java/io/kestra/scheduler/SchedulerTriggerStateInterfaceTest.java similarity index 94% rename from core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerStateInterfaceTest.java rename to scheduler/src/test/java/io/kestra/scheduler/SchedulerTriggerStateInterfaceTest.java index 17ecdaa01d..1c91a87542 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerStateInterfaceTest.java +++ b/scheduler/src/test/java/io/kestra/scheduler/SchedulerTriggerStateInterfaceTest.java @@ -1,6 +1,7 @@ -package io.kestra.core.schedulers; +package io.kestra.scheduler; import io.kestra.core.models.triggers.Trigger; +import io.kestra.core.runners.SchedulerTriggerStateInterface; import io.kestra.core.utils.IdUtils; import io.kestra.core.junit.annotations.KestraTest; import jakarta.inject.Inject; diff --git a/scheduler/src/test/resources/allure.properties b/scheduler/src/test/resources/allure.properties new file mode 100644 index 0000000000..4873f6d0c5 --- /dev/null +++ b/scheduler/src/test/resources/allure.properties @@ -0,0 +1 @@ +allure.results.directory=build/allure-results diff --git a/scheduler/src/test/resources/application-test.yml b/scheduler/src/test/resources/application-test.yml new file mode 100644 index 0000000000..47d0ba8601 --- /dev/null +++ b/scheduler/src/test/resources/application-test.yml @@ -0,0 +1,80 @@ +micronaut: + router: + static-resources: + ui: + paths: classpath:ui + mapping: /ui/** + + http: + client: + read-idle-timeout: 60s + connect-timeout: 30s + read-timeout: 60s + http-version: HTTP_1_1 + services: + api: + url: http://localhost:28181 + server: + cors: + enabled: true + configurations: + all: + allowedOrigins: + - http://bad-origin +jackson: + serialization: + writeDatesAsTimestamps: false + writeDurationsAsTimestamps: false + serialization-inclusion: non_null + deserialization: + FAIL_ON_UNKNOWN_PROPERTIES: false + +kestra: + url: http://localhost:8081 + encryption: + secret-key: I6EGNzRESu3X3pKZidrqCGOHQFUFC0yK + server-type: STANDALONE + storage: + type: local + local: + base-path: /tmp/unittest + anonymous-usage-report: + enabled: true + uri: https://api.kestra.io/v1/reports/usages + initial-delay: 5m + fixed-delay: 1h + server: + access-log: + enabled: false + basic-auth: + username: admin@kestra.io + password: Kestra123 + open-urls: + - "/ping" + - "/api/v1/executions/webhook/" + liveness: + enabled: false + service: + purge: + initial-delay: 1h + fixed-delay: 1d + retention: 30d + queue: + type: h2 + repository: + type: h2 + +datasources: + h2: + url: jdbc:h2:mem:public;TIME ZONE=UTC;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE + username: sa + password: "" + driverClassName: org.h2.Driver +flyway: + datasources: + h2: + enabled: true + locations: + - classpath:migrations/h2 + ignore-migration-patterns: "*:missing,*:future" + out-of-order: true \ No newline at end of file diff --git a/scheduler/src/test/resources/logback.xml b/scheduler/src/test/resources/logback.xml new file mode 100644 index 0000000000..803c82e3f4 --- /dev/null +++ b/scheduler/src/test/resources/logback.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/settings.gradle b/settings.gradle index 0ef38d7261..35f0c5861d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,6 +18,7 @@ include 'jdbc-postgres' include 'webserver' include 'executor' +include 'scheduler' include 'worker' include 'ui'