refactor(executor): use MaintenanceMode listener

This commit is contained in:
Florian Hussonnois
2025-12-19 14:42:55 +01:00
parent 72e7e2e190
commit 3eaab5d9c9

View File

@@ -18,7 +18,6 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import io.kestra.core.runners.Executor;
import io.kestra.core.server.AbstractService;
import io.kestra.core.server.ClusterEvent;
import io.kestra.core.server.Metric;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
@@ -79,10 +78,6 @@ public class DefaultExecutor extends AbstractService implements Executor {
@Inject
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
private QueueInterface<MultipleConditionEvent> multipleConditionEventQueue;
@Inject
@Named(QueueFactoryInterface.CLUSTER_EVENT_NAMED)
private Optional<QueueInterface<ClusterEvent>> clusterEventQueue;
@Inject
private SkipExecutionService skipExecutionService;
@Inject
@@ -216,7 +211,19 @@ public class DefaultExecutor extends AbstractService implements Executor {
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, this::multipleConditionEventQueue));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
// Register maintenance listener
this.receiveCancellations.add(this.maintenanceService.listen(new MaintenanceService.MaintenanceListener() {
@Override
public void onMaintenanceModeEnter() {
DefaultExecutor.this.enterMaintenance();
}
@Override
public void onMaintenanceModeExit() {
DefaultExecutor.this.exitMaintenance();
}
})::dispose);
// Start delay and monitoring loops
executionDelayFuture = scheduledExecutorService.scheduleAtFixedRate(
@@ -428,20 +435,6 @@ public class DefaultExecutor extends AbstractService implements Executor {
multipleConditionEventMessageHandler.handle(multipleConditionEvent);
}
private void clusterEventQueue(Either<ClusterEvent, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a cluster event: {}", either.getRight().getMessage());
return;
}
ClusterEvent clusterEvent = either.getLeft();
log.info("Cluster event received: {}", clusterEvent);
switch (clusterEvent.eventType()) {
case MAINTENANCE_ENTER -> enterMaintenance();
case MAINTENANCE_EXIT -> exitMaintenance();
}
}
/**
* ExecutionDelay is currently two types of execution:
* <br/>