chore(system): extract the executor to its own module

This commit is contained in:
Loïc Mathieu
2025-09-01 16:54:13 +02:00
parent f3057d2d57
commit a5724bcb18
29 changed files with 154 additions and 25 deletions

View File

@@ -34,6 +34,7 @@ dependencies {
implementation project(":storage-local")
// Kestra server components
implementation project(":executor")
implementation project(":scheduler")
implementation project(":webserver")
implementation project(":worker")

View File

@@ -3,7 +3,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;

View File

@@ -6,8 +6,8 @@ import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.cli.StandAloneRunner;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;

View File

@@ -75,6 +75,7 @@ dependencies {
testImplementation project(':runner-memory')
testImplementation project(':storage-local')
testImplementation project(':worker')
testImplementation project(':executor')
testImplementation "io.micronaut:micronaut-http-client"
testImplementation "io.micronaut:micronaut-http-server-netty"

View File

@@ -3,7 +3,6 @@ package io.kestra.core.models.triggers.multipleflows;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.TimeWindow;
import io.kestra.core.utils.Rethrow;
import org.slf4j.Logger;
@@ -24,7 +23,7 @@ public interface MultipleCondition extends Rethrow.PredicateChecked<ConditionCon
/**
* This conditions will only validate previously calculated value on
* {@link io.kestra.core.services.FlowTriggerService#computeExecutionsFromFlowTriggers(Execution, List, Optional)}} and {@link MultipleConditionStorageInterface#save(List)} by the executor.
* io.kestra.executor.FlowTriggerService#computeExecutionsFromFlowTriggers(Execution, List, Optional) and {@link MultipleConditionStorageInterface#save(List)} by the executor.
* The real validation is done here.
*/
@Override

View File

@@ -11,6 +11,10 @@ import lombok.Getter;
import java.util.ArrayList;
import java.util.List;
// TODO for 2.0: this class is used as a queue consumer (which should have been the ExecutorInterface instead),
// a queue message (only in Kafka) and an execution context.
// At some point, we should rename it to ExecutorContext and move it to the executor module,
// then rename the ExecutorInterface to just Executor (to be used as a queue consumer)
@Getter
@AllArgsConstructor
public class Executor {

View File

@@ -1,7 +1,7 @@
package io.kestra.core.runners;
import java.io.Closeable;
import io.kestra.core.server.Service;
public interface ExecutorInterface extends Closeable, Runnable {
public interface ExecutorInterface extends Service, Runnable {
}

View File

@@ -130,7 +130,7 @@ public class ConditionService {
return this.conditionContext(runContext, flow, execution, null);
}
boolean valid(FlowInterface flow, List<Condition> list, ConditionContext conditionContext) {
public boolean valid(FlowInterface flow, List<Condition> list, ConditionContext conditionContext) {
return list
.stream()
.allMatch(condition -> {

View File

@@ -408,7 +408,7 @@ public class FlowService {
return latestFlows.values().stream().filter(flow -> !flow.isDeleted());
}
protected boolean removeUnwanted(Flow f, Execution execution) {
public boolean removeUnwanted(Flow f, Execution execution) {
// we don't allow recursive
return !f.uidWithoutRevision().equals(FlowId.uidWithoutRevision(execution));
}

View File

@@ -8,7 +8,7 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.utils.Await;
import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.IdUtils;

19
executor/build.gradle Normal file
View File

@@ -0,0 +1,19 @@
configurations {
implementation.extendsFrom(micronaut)
}
dependencies {
annotationProcessor project(':processor')
implementation project(":core")
// test
testAnnotationProcessor project(':processor')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':tests')
testImplementation project(':jdbc')
testImplementation project(':jdbc').sourceSets.test.output
testImplementation project(':jdbc-h2')
testImplementation("io.micronaut.sql:micronaut-jooq")
}

View File

@@ -1,4 +1,4 @@
package io.kestra.core.runners;
package io.kestra.executor;
import io.kestra.core.debug.Breakpoint;
import io.kestra.core.exceptions.InternalException;
@@ -14,6 +14,7 @@ import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.services.*;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.test.flow.TaskFixture;
@@ -380,7 +381,7 @@ public class ExecutorService {
if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
try {
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);

View File

@@ -1,7 +1,6 @@
package io.kestra.core.services;
package io.kestra.executor;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
@@ -10,6 +9,8 @@ import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.ListUtils;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;

View File

@@ -1,4 +1,4 @@
package io.kestra.core.services;
package io.kestra.executor;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;

View File

@@ -1,4 +1,4 @@
package io.kestra.core.services;
package io.kestra.executor;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;

View File

@@ -1,4 +1,7 @@
package io.kestra.core.runners;
package io.kestra.executor;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
/**
* State store containing all workers' jobs in RUNNING state.

View File

@@ -1,4 +1,4 @@
package io.kestra.core.services;
package io.kestra.executor;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
@@ -8,6 +8,8 @@ import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.core.log.Log;
import jakarta.inject.Inject;

View File

@@ -1,4 +1,4 @@
package io.kestra.core.services;
package io.kestra.executor;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;

View File

@@ -0,0 +1 @@
allure.results.directory=build/allure-results

View File

@@ -0,0 +1,80 @@
micronaut:
router:
static-resources:
ui:
paths: classpath:ui
mapping: /ui/**
http:
client:
read-idle-timeout: 60s
connect-timeout: 30s
read-timeout: 60s
http-version: HTTP_1_1
services:
api:
url: http://localhost:28181
server:
cors:
enabled: true
configurations:
all:
allowedOrigins:
- http://bad-origin
jackson:
serialization:
writeDatesAsTimestamps: false
writeDurationsAsTimestamps: false
serialization-inclusion: non_null
deserialization:
FAIL_ON_UNKNOWN_PROPERTIES: false
kestra:
url: http://localhost:8081
encryption:
secret-key: I6EGNzRESu3X3pKZidrqCGOHQFUFC0yK
server-type: STANDALONE
storage:
type: local
local:
base-path: /tmp/unittest
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/usages
initial-delay: 5m
fixed-delay: 1h
server:
access-log:
enabled: false
basic-auth:
username: admin@kestra.io
password: Kestra123
open-urls:
- "/ping"
- "/api/v1/executions/webhook/"
liveness:
enabled: false
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
queue:
type: h2
repository:
type: h2
datasources:
h2:
url: jdbc:h2:mem:public;TIME ZONE=UTC;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
username: sa
password: ""
driverClassName: org.h2.Driver
flyway:
datasources:
h2:
enabled: true
locations:
- classpath:migrations/h2
ignore-migration-patterns: "*:missing,*:future"
out-of-order: true

View File

@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<include resource="logback/base.xml" />
<include resource="logback/text.xml" />
<include resource="logback/test.xml" />
<root level="WARN">
<appender-ref ref="STDOUT" />
<appender-ref ref="STDERR" />
</root>
</configuration>

View File

@@ -5,6 +5,7 @@ configurations {
dependencies {
implementation project(":core")
implementation project(":jdbc")
implementation project(":executor")
implementation("io.micronaut.sql:micronaut-jooq")
runtimeOnly("com.h2database:h2")

View File

@@ -5,6 +5,7 @@
dependencies {
implementation project(":core")
implementation project(":jdbc")
implementation project(":executor")
implementation("io.micronaut.sql:micronaut-jooq")
runtimeOnly("com.mysql:mysql-connector-j")

View File

@@ -5,6 +5,7 @@ configurations {
dependencies {
implementation project(":core")
implementation project(":jdbc")
implementation project(":executor")
implementation("io.micronaut.sql:micronaut-jooq")
runtimeOnly("org.postgresql:postgresql")

View File

@@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.WorkerJobRunningStateStore;
import io.kestra.executor.WorkerJobRunningStateStore;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.runner.JdbcQueue;

View File

@@ -3,7 +3,7 @@ package io.kestra.jdbc.repository;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.repositories.WorkerJobRunningRepositoryInterface;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.core.runners.WorkerJobRunningStateStore;
import io.kestra.executor.WorkerJobRunningStateStore;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;
import org.jooq.Record1;

View File

@@ -19,8 +19,6 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.*;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorService;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import io.kestra.core.server.*;
import io.kestra.core.services.*;
import io.kestra.core.storages.StorageContext;
@@ -28,6 +26,10 @@ import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.trace.Tracer;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.*;
import io.kestra.executor.ExecutorService;
import io.kestra.executor.FlowTriggerService;
import io.kestra.executor.SLAService;
import io.kestra.executor.SkipExecutionService;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
@@ -68,7 +70,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton
@JdbcRunnerEnabled
@Slf4j
public class JdbcExecutor implements ExecutorInterface, Service {
public class JdbcExecutor implements ExecutorInterface {
private static final ObjectMapper MAPPER = JdbcMapper.of();
private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();

View File

@@ -14,7 +14,7 @@ import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.tasks.test.SleepTrigger;
import io.kestra.core.utils.IdUtils;

View File

@@ -17,6 +17,7 @@ include 'jdbc-mysql'
include 'jdbc-postgres'
include 'webserver'
include 'executor'
include 'worker'
include 'ui'