fix(system): avoid starting two queues for WorkerJob and WorkerTriggerResult

We previously starts 2 queues: one for emit, and a specific one for receive wich handle WorkerJobRunning under the cover.

We can replace by using a single queue that will transparently handle WorkerJobRunning. As queues starts thread pools their cost is not negligeable.

Fixes #9007
This commit is contained in:
Loïc Mathieu
2025-05-28 18:05:53 +02:00
parent 69bf11c935
commit 5304630b30
16 changed files with 59 additions and 195 deletions

View File

@@ -5,13 +5,10 @@ import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
import io.kestra.core.runners.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Bean;
@@ -45,11 +42,11 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
}
@Override
@Singleton
@Prototype // must be prototype so we can create two Worker in the same application context for testing purpose.
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<WorkerJob> workerJob() {
return new MysqlQueue<>(WorkerJob.class, applicationContext);
return new MysqlWorkerJobQueue(applicationContext);
}
@Override
@@ -65,7 +62,7 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
@Named(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<WorkerTriggerResult> workerTriggerResult() {
return new MysqlQueue<>(WorkerTriggerResult.class, applicationContext);
return new MysqlWorkerTriggerResultQueue(applicationContext);
}
@Override
@@ -132,20 +129,6 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
return new MysqlQueue<>(Trigger.class, applicationContext);
}
@Override
@Prototype // must be prototype so we can create two Worker in the same application context for testing purpose.
@Bean(preDestroy = "close")
public WorkerJobQueueInterface workerJobQueue() {
return new MysqlWorkerJobQueue(applicationContext);
}
@Override
@Singleton
@Bean(preDestroy = "close")
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
return new MysqlWorkerTriggerResultQueue(applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.SUBFLOWEXECUTIONRESULT_NAMED)

View File

@@ -1,40 +1,35 @@
package io.kestra.runner.mysql;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.function.Consumer;
/**
* This specific queue is used to be able to save WorkerJobRunning for each WorkerJob
*/
@Slf4j
public class MysqlWorkerJobQueue implements WorkerJobQueueInterface {
public class MysqlWorkerJobQueue extends MysqlQueue<WorkerJob> {
private final JdbcWorkerJobQueueService jdbcWorkerJobQueueService;
public MysqlWorkerJobQueue(ApplicationContext applicationContext) {
super(WorkerJob.class, applicationContext);
this.jdbcWorkerJobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
}
@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcWorkerJobQueueService.receive(consumerGroup, queueType, consumer);
return jdbcWorkerJobQueueService.receive(this, consumerGroup, queueType, consumer);
}
@Override
public void close() {
public void close() throws IOException {
super.close();
jdbcWorkerJobQueueService.close();
}
@Override
public void pause() {
jdbcWorkerJobQueueService.pause();
}
@Override
public void resume() {
jdbcWorkerJobQueueService.resume();
}
}

View File

@@ -1,7 +1,6 @@
package io.kestra.runner.mysql;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService;
@@ -11,8 +10,11 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.function.Consumer;
/**
* This specific queue is used to be able to purge WorkerJobRunning for triggers
*/
@Slf4j
public class MysqlWorkerTriggerResultQueue extends MysqlQueue<WorkerTriggerResult> implements WorkerTriggerResultQueueInterface {
public class MysqlWorkerTriggerResultQueue extends MysqlQueue<WorkerTriggerResult> {
private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService;
public MysqlWorkerTriggerResultQueue(ApplicationContext applicationContext) {
@@ -22,7 +24,7 @@ public class MysqlWorkerTriggerResultQueue extends MysqlQueue<WorkerTriggerResul
@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
return jdbcWorkerTriggerResultQueueService.receive(this, consumerGroup, queueType, consumer);
}
@Override