refactor(executor): extends AbstractService

This commit is contained in:
Florian Hussonnois
2025-12-19 14:30:45 +01:00
parent ea36532aa4
commit 72e7e2e190

View File

@@ -17,6 +17,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.runners.Executor;
import io.kestra.core.server.AbstractService;
import io.kestra.core.server.ClusterEvent;
import io.kestra.core.server.Metric;
import io.kestra.core.server.ServiceStateChangeEvent;
@@ -30,7 +31,6 @@ import io.kestra.core.scheduler.events.TriggerExecutionTerminated;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@@ -41,7 +41,6 @@ import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -49,13 +48,10 @@ import static io.kestra.core.utils.Rethrow.*;
@Singleton
@Slf4j
public class DefaultExecutor implements Executor {
public class DefaultExecutor extends AbstractService implements Executor {
private static final String UNABLE_TO_DESERIALIZE_AN_EXECUTION = "Unable to deserialize an execution: {}";
private static final String SKIPPING_EXECUTION = "Skipping execution {}";
@Inject
private ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@@ -149,9 +145,7 @@ public class DefaultExecutor implements Executor {
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> executionDelayFuture;
private ScheduledFuture<?> monitorSLAFuture;
private final AtomicReference<ServiceState> state = new AtomicReference<>();
private final String id = IdUtils.create();
private final List<Runnable> receiveCancellations = new ArrayList<>();
private final AtomicBoolean isPaused = new AtomicBoolean(false);
private final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -162,7 +156,8 @@ public class DefaultExecutor implements Executor {
@Inject
public DefaultExecutor(ExecutorsUtils executorsUtils, @Value("${kestra.executor.thread-count:0}") int threadCount) {
public DefaultExecutor(ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher, ExecutorsUtils executorsUtils, @Value("${kestra.executor.thread-count:0}") int threadCount) {
super(ServiceType.EXECUTOR, eventPublisher);
// By default, we start available processors count threads with a minimum of 4 by executor service
// for the worker task result queue and the execution queue.
// Other queues would not benefit from more consumers.
@@ -196,8 +191,6 @@ public class DefaultExecutor implements Executor {
@Override
public void run() {
setState(ServiceState.CREATED);
// listen to executor related queues
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
this.receiveCancellations.addFirst(this.executionCommandQueue.receive(Executor.class, this::executionCommandQueue));
@@ -765,41 +758,9 @@ public class DefaultExecutor implements Executor {
}
@Override
@PreDestroy
public void close() {
if (shutdown.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("Terminating");
}
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(executionDelayFuture, monitorSLAFuture));
setState(ServiceState.TERMINATED_GRACEFULLY);
if (log.isDebugEnabled()) {
log.debug("Closed ({})", state.get().name());
}
}
}
@Override
public String getId() {
return id;
}
@Override
public ServiceType getType() {
return ServiceType.EXECUTOR;
}
@Override
public ServiceState getState() {
return state.get();
}
private void setState(final ServiceState state) {
this.state.set(state);
eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
protected ServiceState doStop() {
this.receiveCancellations.forEach(Runnable::run);
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(executionDelayFuture, monitorSLAFuture));
return ServiceState.TERMINATED_GRACEFULLY;
}
}