Compare commits

...

1 Commits

Author SHA1 Message Date
Loïc Mathieu
b243716820 feat(system): Executor 2.0 PoC 2025-10-13 12:16:56 +02:00
29 changed files with 578 additions and 1650 deletions

View File

@@ -4,6 +4,7 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -49,7 +50,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -11,6 +11,7 @@ import io.kestra.core.runners.*;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTION_EVENT_NAMED = "executionEventQueue";
String EXECUTOR_NAMED = "executorQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
@@ -30,6 +31,8 @@ public interface QueueFactoryInterface {
QueueInterface<Execution> execution();
QueueInterface<ExecutionEvent> executionEvent();
QueueInterface<Executor> executor();
WorkerJobQueueInterface workerJob();

View File

@@ -35,6 +35,24 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
/**
* Delete all messages of the queue for this key.
* This is used to purge a queue for a specific key.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKey(String key) throws QueueException {
// by default do nothing
}
/**
* Delete all messages of the queue for a set of keys.
* This is used to purge a queue for specific keys.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKeys(List<String> keys) throws QueueException {
// by default do nothing
}
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive(null, consumer, false);
}
@@ -54,4 +72,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(null, queueType, consumer);
}
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(consumerGroup, queueType, consumer, true);
}
/**
* Consumer a batch of messages.
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
*/
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
}
}

View File

@@ -19,8 +19,6 @@ public class QueueService {
return ((SubflowExecution<?>) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionResult.class) {
return ((SubflowExecutionResult) object).getExecutionId();
} else if (object.getClass() == ExecutorState.class) {
return ((ExecutorState) object).getExecutionId();
} else if (object.getClass() == Setting.class) {
return ((Setting) object).getKey();
} else if (object.getClass() == Executor.class) {

View File

@@ -2,12 +2,12 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.FlowScope;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.Executor;
import io.kestra.core.utils.DateUtils;
import io.kestra.plugin.core.dashboard.data.Executions;
import io.micronaut.data.model.Pageable;
@@ -156,4 +156,6 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
String tenantId,
@Nullable List<FlowFilter> flows
);
Executor lock(String executionId, Function<Execution, Executor> function);
}

View File

@@ -0,0 +1,17 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record ExecutionEvent(String tenantId, String executionId, Instant eventDate, ExecutionEventType eventType) implements HasUID {
public ExecutionEvent(Execution execution, ExecutionEventType eventType) {
this(execution.getTenantId(), execution.getId(), Instant.now(), eventType);
}
@Override
public String uid() {
return executionId;
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.runners;
public enum ExecutionEventType {
CREATED,
UPDATED,
TERMINATED,
}

View File

@@ -1,21 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.models.flows.State;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Data
@NoArgsConstructor
public class ExecutorState {
private String executionId;
private Map<String, State.Type> workerTaskDeduplication = new ConcurrentHashMap<>();
private Map<String, String> childDeduplication = new ConcurrentHashMap<>();
private Map<String, State.Type> subflowExecutionDeduplication = new ConcurrentHashMap<>();
public ExecutorState(String executionId) {
this.executionId = executionId;
}
}

View File

@@ -0,0 +1,398 @@
package io.kestra.executor;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.*;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.trace.Tracer;
import io.kestra.core.trace.TracerFactory;
import io.kestra.core.utils.*;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton
@Slf4j
public class DefaultExecutor implements ExecutorInterface {
@Inject
private ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
private QueueInterface<ExecutionEvent> executionEventQueue;
@Inject
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
private QueueInterface<WorkerJob> workerJobQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
@Inject
private SkipExecutionService skipExecutionService;
@Inject
private PluginDefaultService pluginDefaultService;
@Inject
private ExecutorService executorService;
@Inject
private WorkerGroupService workerGroupService;
@Inject
private ExecutionService executionService;
@Inject
private FlowMetaStoreInterface flowMetaStore;
// FIXME change config names
@Value("${kestra.jdbc.executor.clean.execution-queue:true}")
private boolean cleanExecutionQueue;
@Value("${kestra.jdbc.executor.clean.worker-queue:true}")
private boolean cleanWorkerJobQueue;
private final AtomicReference<ServiceState> state = new AtomicReference<>();
private final String id = IdUtils.create();
private final List<Runnable> receiveCancellations = new ArrayList<>();
private final Tracer tracer;
private final java.util.concurrent.ExecutorService workerTaskResultExecutorService;
private final java.util.concurrent.ExecutorService executionExecutorService;
@Inject
public DefaultExecutor(TracerFactory tracerFactory, ExecutorsUtils executorsUtils, @Value("${kestra.jdbc.executor.thread-count:0}") int threadCount) {
this.tracer = tracerFactory.getTracer(DefaultExecutor.class, "EXECUTOR");
// By default, we start available processors count threads with a minimum of 4 by executor service
// for the worker task result queue and the execution queue.
// Other queues would not benefit from more consumers.
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(4, Runtime.getRuntime().availableProcessors());
this.workerTaskResultExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-worker-task-result-executor");
this.executionExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-execution-executor");
}
@Override
public void run() {
setState(ServiceState.CREATED);
// listen to executor related queues
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, execution -> executionQueue(execution)));
this.receiveCancellations.addFirst(this.executionEventQueue.receiveBatch(
Executor.class,
executionEvents -> {
List<CompletableFuture<Void>> futures = executionEvents.stream()
.map(executionEvent -> CompletableFuture.runAsync(() -> executionEventQueue(executionEvent), executionExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
));
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receiveBatch(
Executor.class,
workerTaskResults -> {
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
.map(workerTaskResult -> CompletableFuture.runAsync(() -> workerTaskResultQueue(workerTaskResult), workerTaskResultExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
));
setState(ServiceState.RUNNING);
log.info("Executor started");
}
// This serves as a temporal bridge between the old execution queue and the new execution event queue to avoid updating all code that uses the old queue
private void executionQueue(Either<Execution, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
return;
}
Execution message = either.getLeft();
if (skipExecutionService.skipExecution(message)) {
log.warn("Skipping execution {}", message.getId());
return;
}
try {
executionEventQueue.emit(new ExecutionEvent(message, ExecutionEventType.CREATED));
} catch (QueueException e) {
// If we cannot send the execution event we fail the execution
executionRepository.lock(message.getId(), execution -> {
try {
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.UPDATED); // TODO terminated
// TODO transaction between repo and queue
this.executionRepository.update(failed);
this.executionEventQueue.emit(event);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", execution.getId(), ex);
}
return null;
});
}
}
private void executionEventQueue(Either<ExecutionEvent, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
return;
}
ExecutionEvent message = either.getLeft();
if (skipExecutionService.skipExecution(message.executionId())) { // TODO we may add tenant/namespace/flow for skip them
log.warn("Skipping execution {}", message.executionId());
return;
}
Executor result = executionRepository.lock(message.executionId(), execution -> {
return tracer.inCurrentContext(
execution,
FlowId.uidWithoutRevision(execution),
() -> {
final FlowWithSource flow = findFlow(execution);
Executor executor = new Executor(execution, null).withFlow(flow);
// process the execution
if (log.isDebugEnabled()) {
executorService.log(log, true, executor);
}
executor = executorService.process(executor);
if (!executor.getNexts().isEmpty()) {
executor.withExecution(
executorService.onNexts(executor.getExecution(), executor.getNexts()),
"onNexts"
);
}
// worker task
if (!executor.getWorkerTasks().isEmpty()) {
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
executor
.getWorkerTasks()
.forEach(throwConsumer(workerTask -> {
try {
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.SKIPPED)));
} else {
if (workerTask.getTask().isSendToWorkerTask()) {
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey())))
.orElse(null);
workerJobQueue.emit(workerGroupKey, workerTask);
}
if (workerTask.getTask().isFlowable()) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));
}
}
} catch (Exception e) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.FAILED)));
workerTask.getRunContext().logger().error("Failed to evaluate the runIf condition for task {}. Cause: {}", workerTask.getTask().getId(), e.getMessage(), e);
}
}));
try {
executorService.addWorkerTaskResults(executor, workerTaskResults);
} catch (InternalException e) {
log.error("Unable to add a worker task result to the execution", e);
}
}
return executor;
}
);
});
if (result != null) {
this.toExecution(result);
}
}
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
if (either == null) {
// FIXME it happens in Kafka but sould not? or maybe it should...
return;
}
if (either.isRight()) {
log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage(), either.getRight());
return;
}
WorkerTaskResult message = either.getLeft();
if (skipExecutionService.skipExecution(message.getTaskRun())) {
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
return;
}
if (log.isDebugEnabled()) {
executorService.log(log, true, message);
}
Executor executor = executionRepository.lock(message.getTaskRun().getExecutionId(), execution -> {
Executor current = new Executor(execution, null);
if (execution == null) {
throw new IllegalStateException("Execution state don't exist for " + message.getTaskRun().getExecutionId() + ", receive " + message);
}
if (execution.hasTaskRunJoinable(message.getTaskRun())) {
try {
// process worker task result
executorService.addWorkerTaskResult(current, () -> findFlow(execution), message);
// join worker result
return current;
} catch (InternalException e) {
return handleFailedExecutionFromExecutor(current, e);
}
}
return null;
});
if (executor != null) {
this.toExecution(executor);
}
}
private void toExecution(Executor executor) {
try {
boolean shouldSend = false;
if (executor.getException() != null) {
executor = handleFailedExecutionFromExecutor(executor, executor.getException());
shouldSend = true;
} else if (executor.isExecutionUpdated()) {
shouldSend = true;
}
if (!shouldSend) {
// delete the execution from the state storage if ended
// IMPORTANT: it must be done here as it's when the execution arrives 'again' with a terminated state,
// so we are sure at this point that no new executions will be created otherwise the tate storage would be re-created by the execution queue.
if (executorService.canBePurged(executor)) {
// TODO executorStateStorage.delete(executor.getExecution());
}
return;
}
if (log.isDebugEnabled()) {
executorService.log(log, false, executor);
}
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
// or from a worker task in an afterExecution block, in this case we need to load the flow
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
executor = executor.withFlow(findFlow(executor.getExecution()));
}
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
if (isTerminated) {
if (cleanExecutionQueue) {
executionEventQueue.deleteByKey(executor.getExecution().getId());
executionQueue.deleteByKey(executor.getExecution().getId());
}
// Purge the workerTaskResultQueue and the workerJobQueue
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
// If any of these assumptions changed, this code would not be safe anymore.
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();
workerTaskResultQueue.deleteByKeys(taskRunKeys);
workerJobQueue.deleteByKeys(taskRunKeys);
}
ExecutionEvent event = new ExecutionEvent(executor.getExecution(), ExecutionEventType.TERMINATED);
this.executionEventQueue.emit(event);
} else {
ExecutionEvent event = new ExecutionEvent(executor.getExecution(), ExecutionEventType.UPDATED);
this.executionEventQueue.emit(event);
}
} catch (QueueException e) {
// If we cannot add the new worker task result to the execution, we fail it
executionRepository.lock(executor.getExecution().getId(), execution -> {
try {
Execution failed = execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
ExecutionEvent event = new ExecutionEvent(failed, ExecutionEventType.TERMINATED);
this.executionEventQueue.emit(event);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", execution.getId(), ex);
}
return null;
});
}
}
private FlowWithSource findFlow(Execution execution) {
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
return pluginDefaultService.injectDefaults(flow, execution);
}
private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
try {
logQueue.emitAsync(failedExecutionWithLog.getLogs());
} catch (QueueException ex) {
// fail silently
}
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
}
@Override
public String getId() {
return id;
}
@Override
public ServiceType getType() {
return ServiceType.EXECUTOR;
}
@Override
public ServiceState getState() {
return state.get();
}
private void setState(final ServiceState state) {
this.state.set(state);
eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
}
}

View File

@@ -3,7 +3,6 @@ package io.kestra.repository.h2;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.services.JdbcFilterService;
import io.micronaut.context.ApplicationContext;
import io.kestra.core.utils.DateUtils;
@@ -23,9 +22,8 @@ public class H2ExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public H2ExecutionRepository(@Named("executions") H2Repository<Execution> repository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, applicationContext, executorStateStorage, filterService);
super(repository, applicationContext, filterService);
}
@Override

View File

@@ -1,15 +0,0 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.ExecutorState;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
public H2ExecutorStateStorage(@Named("executorstate") H2Repository<ExecutorState> repository) {
super(repository);
}
}

View File

@@ -33,6 +33,14 @@ public class H2QueueFactory implements QueueFactoryInterface {
return new H2Queue<>(Execution.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionEvent> executionEvent() {
return new H2Queue<>(ExecutionEvent.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTOR_NAMED)

View File

@@ -0,0 +1,22 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.MultipleConditionEvent',
'io.kestra.core.runners.ExecutionEvent'
) NOT NULL
DROP TABLE IF EXISTS executorstate;

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.services.JdbcFilterService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -24,9 +23,8 @@ public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public MysqlExecutionRepository(@Named("executions") MysqlRepository<Execution> repository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, applicationContext, executorStateStorage, filterService);
super(repository, applicationContext, filterService);
}
@Override

View File

@@ -1,16 +0,0 @@
package io.kestra.runner.mysql;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.core.runners.ExecutorState;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
public MysqlExecutorStateStorage(@Named("executorstate") MysqlRepository<ExecutorState> repository) {
super(repository);
}
}

View File

@@ -33,6 +33,14 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
return new MysqlQueue<>(Execution.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionEvent> executionEvent() {
return new MysqlQueue<>(ExecutionEvent.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTOR_NAMED)

View File

@@ -0,0 +1,22 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.MultipleConditionEvent',
'io.kestra.core.runners.ExecutionEvent'
) NOT NULL
DROP TABLE IF EXISTS executorstate;

View File

@@ -5,7 +5,6 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.services.JdbcFilterService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -26,9 +25,8 @@ public class PostgresExecutionRepository extends AbstractJdbcExecutionRepository
@Inject
public PostgresExecutionRepository(@Named("executions") PostgresRepository<Execution> repository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, applicationContext, executorStateStorage, filterService);
super(repository, applicationContext, filterService);
}
@Override

View File

@@ -1,15 +0,0 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutorState;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.repository.postgres.PostgresRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
public PostgresExecutorStateStorage(@Named("executorstate") PostgresRepository<ExecutorState> repository) {
super(repository);
}
}

View File

@@ -33,6 +33,14 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
return new PostgresQueue<>(Execution.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionEvent> executionEvent() {
return new PostgresQueue<>(ExecutionEvent.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTOR_NAMED)

View File

@@ -0,0 +1,3 @@
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.ExecutionEvent';
DROP TABLE IF EXISTS executorstate;

View File

@@ -71,12 +71,6 @@ public class JdbcTableConfigsFactory {
return new InstantiableJdbcTableConfig("multipleconditions", MultipleConditionWindow.class, "multipleconditions");
}
@Bean
@Named("executorstate")
public InstantiableJdbcTableConfig executorState() {
return new InstantiableJdbcTableConfig("executorstate", ExecutorState.class, "executorstate");
}
@Bean
@Named("executordelayed")
public InstantiableJdbcTableConfig executorDelayed() {

View File

@@ -20,11 +20,9 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorState;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
import io.kestra.jdbc.services.JdbcFilterService;
import io.kestra.plugin.core.dashboard.data.Executions;
@@ -35,7 +33,6 @@ import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.annotation.Nullable;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
@@ -66,7 +63,6 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
private final ApplicationContext applicationContext;
protected final AbstractJdbcExecutorStateStorage executorStateStorage;
private QueueInterface<Execution> executionQueue;
private final NamespaceUtils namespaceUtils;
@@ -100,11 +96,9 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
public AbstractJdbcExecutionRepository(
io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService
) {
this.jdbcRepository = jdbcRepository;
this.executorStateStorage = executorStateStorage;
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.namespaceUtils = applicationContext.getBean(NamespaceUtils.class);
@@ -980,7 +974,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
});
}
public Executor lock(String executionId, Function<Pair<Execution, ExecutorState>, Pair<Executor, ExecutorState>> function) {
@Override
public Executor lock(String executionId, Function<Execution, Executor> function) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
@@ -1000,14 +995,11 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
return null;
}
ExecutorState executorState = executorStateStorage.get(context, execution.get());
Pair<Executor, ExecutorState> pair = function.apply(Pair.of(execution.get(), executorState));
Executor executor = function.apply(execution.get());
if (pair != null) {
this.jdbcRepository.persist(pair.getKey().getExecution(), context, null);
this.executorStateStorage.save(context, pair.getRight());
return pair.getKey();
if (executor != null) {
this.jdbcRepository.persist(executor.getExecution(), context, null);
return executor;
}
return null;

View File

@@ -1,40 +0,0 @@
package io.kestra.jdbc.runner;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.ExecutorState;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import java.util.Map;
public abstract class AbstractJdbcExecutorStateStorage {
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutorState> jdbcRepository;
public AbstractJdbcExecutorStateStorage(io.kestra.jdbc.AbstractJdbcRepository<ExecutorState> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}
public ExecutorState get(DSLContext dslContext, Execution execution) {
SelectConditionStep<Record1<Object>> select = dslContext
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(
AbstractJdbcRepository.field("key").eq(execution.getId())
);
return this.jdbcRepository.fetchOne(select)
.orElse(new ExecutorState(execution.getId()));
}
public void save(DSLContext dslContext, ExecutorState executorState) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executorState);
this.jdbcRepository.persist(executorState, dslContext, fields);
}
public void delete(Execution execution) {
this.jdbcRepository.delete(new ExecutorState(execution.getId()));
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -185,10 +185,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
// and the queue has its own cleaner, which we better not mess with, as the 'queues' table is selected with a lock.
}
/**
* Delete all messages of the queue for this key.
* This is used to purge a queue for a specific key.
*/
@Override
public void deleteByKey(String key) throws QueueException {
dslContextWrapper.transaction(configuration -> {
int deleted = DSL
@@ -336,14 +333,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
);
}
public Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(null, queueType, consumer);
}
public Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(consumerGroup, queueType, consumer, true);
}
@Override
public Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
return this.receiveImpl(
consumerGroup,

View File

@@ -8,6 +8,7 @@ import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceRegistry;
import io.kestra.core.server.ServiceType;
import io.kestra.core.server.WorkerTaskRestartStrategy;
import io.kestra.executor.DefaultExecutor;
import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
@@ -37,7 +38,7 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
private final static Logger log = LoggerFactory.getLogger(JdbcServiceLivenessCoordinator.class);
private final AtomicReference<JdbcExecutor> executor = new AtomicReference<>();
private final AtomicReference<DefaultExecutor> executor = new AtomicReference<>();
private final AbstractJdbcServiceInstanceRepository serviceInstanceRepository;
private final Duration purgeRetention;
@@ -89,7 +90,8 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
.toList();
if (!ids.isEmpty()) {
log.info("Trigger task restart for non-responding workers after termination grace period: {}.", ids);
executor.get().reEmitWorkerJobsForWorkers(configuration, ids);
// FIXME
//executor.get().reEmitWorkerJobsForWorkers(configuration, ids);
}
}
@@ -143,7 +145,8 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
if (!workerIdsHavingTasksToRestart.isEmpty()) {
log.info("Trigger task restart for non-responding workers after timeout: {}.", workerIdsHavingTasksToRestart);
executor.get().reEmitWorkerJobsForWorkers(configuration, workerIdsHavingTasksToRestart);
// FIXME
//executor.get().reEmitWorkerJobsForWorkers(configuration, workerIdsHavingTasksToRestart);
}
});
}
@@ -155,7 +158,7 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
}
synchronized void setExecutor(final JdbcExecutor executor) {
synchronized void setExecutor(final DefaultExecutor executor) {
this.executor.set(executor);
}

View File

@@ -595,6 +595,8 @@ public class ExecutionController {
propagator.get().inject(Context.current(), result, ExecutionTextMapSetter.INSTANCE);
}
// TODO this is temporal the time we create a queue indexer for it like in JDBC
executionRepository.save(result);
executionQueue.emit(result);
eventPublisher.publishEvent(new CrudEvent<>(result, CrudEventType.CREATE));
@@ -709,6 +711,9 @@ public class ExecutionController {
.map(ContextPropagators::getTextMapPropagator)
.ifPresent(propagator -> propagator.inject(Context.current(), executionWithInputs, ExecutionTextMapSetter.INSTANCE));
// FIXME we should have the same as for JDBC a "queue indexer" that would index sync executions
// for now we store it via the executionRepository instead
executionRepository.save(executionWithInputs);
executionQueue.emit(executionWithInputs);
eventPublisher.publishEvent(new CrudEvent<>(executionWithInputs, CrudEventType.CREATE));

View File

@@ -4,6 +4,8 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.ExecutionEvent;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
@@ -18,6 +20,7 @@ import org.apache.commons.lang3.tuple.Pair;
import reactor.core.publisher.FluxSink;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -34,18 +37,21 @@ public class ExecutionStreamingService {
private final Map<String, Map<String, Pair<FluxSink<Event<Execution>>, Flow>>> subscribers = new ConcurrentHashMap<>();
private final Object subscriberLock = new Object();
private final QueueInterface<Execution> executionQueue;
private final QueueInterface<ExecutionEvent> executionQueue;
private final ExecutionService executionService;
private final ExecutionRepositoryInterface executionRepository;
private Runnable queueConsumer;
@Inject
public ExecutionStreamingService(
@Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface<Execution> executionQueue,
ExecutionService executionService
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED) QueueInterface<ExecutionEvent> executionQueue,
ExecutionService executionService,
ExecutionRepositoryInterface executionRepository
) {
this.executionQueue = executionQueue;
this.executionService = executionService;
this.executionRepository = executionRepository;
}
@PostConstruct
@@ -57,22 +63,27 @@ public class ExecutionStreamingService {
return;
}
Execution execution = either.getLeft();
String executionId = execution.getId();
ExecutionEvent event = either.getLeft();
// Get all subscribers for this execution
Map<String, Pair<FluxSink<Event<Execution>>, Flow>> executionSubscribers = subscribers.get(executionId);
Map<String, Pair<FluxSink<Event<Execution>>, Flow>> executionSubscribers = subscribers.get(event.executionId());
if (!MapUtils.isEmpty(executionSubscribers)) {
Optional<Execution> execution = executionRepository.findById(event.tenantId(), event.executionId());
if (execution.isEmpty()) {
log.error("Unable to find the execution id {}", event.executionId());
return;
}
executionSubscribers.values().forEach(pair -> {
var sink = pair.getLeft();
var flow = pair.getRight();
try {
if (isStopFollow(flow, execution)) {
sink.next(Event.of(execution).id("end"));
if (isStopFollow(flow, execution.get())) {
sink.next(Event.of(execution.get()).id("end"));
sink.complete();
} else {
sink.next(Event.of(execution).id("progress"));
sink.next(Event.of(execution.get()).id("progress"));
}
} catch (Exception e) {
log.error("Error sending execution update", e);