chore(system): add WorkerJobQueueInterface to properly pass workerId on subscribe

This commit is contained in:
Florian Hussonnois
2025-08-11 18:24:54 +02:00
committed by Florian Hussonnois
parent 31dbecec77
commit 194ae826e5
10 changed files with 50 additions and 44 deletions

View File

@@ -5,11 +5,9 @@ import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
@@ -34,7 +32,7 @@ public interface QueueFactoryInterface {
QueueInterface<Executor> executor();
QueueInterface<WorkerJob> workerJob();
WorkerJobQueueInterface workerJob();
QueueInterface<WorkerTaskResult> workerTaskResult();

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import java.util.function.Consumer;
public interface WorkerJobQueueInterface extends QueueInterface<WorkerJob> {
Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer);
}

View File

@@ -85,7 +85,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
@Inject
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
private QueueInterface<WorkerJob> workerJobQueue;
private WorkerJobQueueInterface workerJobQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
@@ -274,12 +274,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
}));
this.receiveCancellations.addFirst(this.workerJobQueue.receive(
this.receiveCancellations.addFirst(this.workerJobQueue.subscribe(
this.id,
this.workerGroup,
Worker.class,
either -> {
pendingJobCount.incrementAndGet();
executorService.execute(() -> {
pendingJobCount.decrementAndGet();
runningJobCount.incrementAndGet();

View File

@@ -9,11 +9,11 @@ import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Prototype;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@@ -45,7 +45,7 @@ public class H2QueueFactory implements QueueFactoryInterface {
@Singleton
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<WorkerJob> workerJob() {
public WorkerJobQueueInterface workerJob() {
return new H2WorkerJobQueue(applicationContext);
}

View File

@@ -1,6 +1,7 @@
package io.kestra.runner.h2;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
@@ -14,17 +15,17 @@ import java.util.function.Consumer;
* This specific queue is used to be able to save WorkerJobRunning for each WorkerJob
*/
@Slf4j
public class H2WorkerJobQueue extends H2Queue<WorkerJob> {
public class H2WorkerJobQueue extends H2Queue<WorkerJob> implements WorkerJobQueueInterface {
private final JdbcWorkerJobQueueService jdbcWorkerJobQueueService;
public H2WorkerJobQueue(ApplicationContext applicationContext) {
super(WorkerJob.class, applicationContext);
this.jdbcWorkerJobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
}
@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcWorkerJobQueueService.receive(this, consumerGroup, queueType, consumer);
public Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcWorkerJobQueueService.subscribe(this, workerId, workerGroup, consumer);
}
@Override

View File

@@ -9,11 +9,11 @@ import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Prototype;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@@ -45,7 +45,7 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
@Singleton
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<WorkerJob> workerJob() {
public WorkerJobQueueInterface workerJob() {
return new MysqlWorkerJobQueue(applicationContext);
}

View File

@@ -1,6 +1,7 @@
package io.kestra.runner.mysql;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
@@ -14,22 +15,22 @@ import java.util.function.Consumer;
* This specific queue is used to be able to save WorkerJobRunning for each WorkerJob
*/
@Slf4j
public class MysqlWorkerJobQueue extends MysqlQueue<WorkerJob> {
public class MysqlWorkerJobQueue extends MysqlQueue<WorkerJob> implements WorkerJobQueueInterface {
private final JdbcWorkerJobQueueService jdbcWorkerJobQueueService;
public MysqlWorkerJobQueue(ApplicationContext applicationContext) {
super(WorkerJob.class, applicationContext);
this.jdbcWorkerJobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
}
@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcWorkerJobQueueService.receive(this, consumerGroup, queueType, consumer);
}
@Override
public void close() throws IOException {
super.close();
jdbcWorkerJobQueueService.close();
}
@Override
public Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcWorkerJobQueueService.subscribe(this, workerId, workerGroup, consumer);
}
}

View File

@@ -9,11 +9,11 @@ import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Prototype;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@@ -45,7 +45,7 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
@Singleton
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<WorkerJob> workerJob() {
public WorkerJobQueueInterface workerJob() {
return new PostgresWorkerJobQueue(applicationContext);
}

View File

@@ -2,6 +2,7 @@ package io.kestra.runner.postgres;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
import io.micronaut.context.ApplicationContext;
@@ -14,19 +15,19 @@ import java.util.function.Consumer;
* This specific queue is used to be able to save WorkerJobRunning for each WorkerJob
*/
@Slf4j
public class PostgresWorkerJobQueue extends PostgresQueue<WorkerJob> {
public class PostgresWorkerJobQueue extends PostgresQueue<WorkerJob> implements WorkerJobQueueInterface {
private final JdbcWorkerJobQueueService jdbcWorkerJobQueueService;
public PostgresWorkerJobQueue(ApplicationContext applicationContext) {
super(WorkerJob.class, applicationContext);
this.jdbcWorkerJobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
}
@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcWorkerJobQueueService.receive(this, consumerGroup, queueType, consumer);
public Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return jdbcWorkerJobQueueService.subscribe(this, workerId, workerGroup, consumer);
}
@Override
public void close() throws IOException {
super.close();

View File

@@ -2,12 +2,11 @@ package io.kestra.jdbc;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.*;
import io.kestra.core.server.ServiceRegistry;
import io.kestra.core.server.ServiceType;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@@ -16,26 +15,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@Singleton
@Slf4j
@Singleton
public class JdbcWorkerJobQueueService implements Closeable {
private final AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
private final ServiceRegistry serviceRegistry;
private final AtomicReference<Runnable> disposable = new AtomicReference<>();
private final AtomicBoolean isStopped = new AtomicBoolean(false);
@Inject
public JdbcWorkerJobQueueService(ApplicationContext applicationContext) {
this.serviceRegistry = applicationContext.getBean(ServiceRegistry.class);
this.jdbcWorkerJobRunningRepository = applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class);
}
public Runnable receive(JdbcQueue<WorkerJob> workerJobQueue, String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
this.disposable.set(workerJobQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
Worker worker = serviceRegistry.waitForServiceAndGet(ServiceType.WORKER).unwrap();
final WorkerInstance workerInstance = new WorkerInstance(worker.getId(), worker.getWorkerGroup());
public Runnable subscribe(JdbcQueue<WorkerJob> workerJobQueue, String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
this.disposable.set(workerJobQueue.receiveTransaction(workerGroup, Worker.class, (dslContext, eithers) -> {
final WorkerInstance workerInstance = new WorkerInstance(workerId, workerGroup);
eithers.forEach(either -> {
if (either.isRight()) {
@@ -61,7 +55,7 @@ public class JdbcWorkerJobQueueService implements Closeable {
} else {
throw new IllegalArgumentException("Message is of type " + workerJob.getClass() + " which should never occurs");
}
jdbcWorkerJobRunningRepository.save(workerJobRunning, dslContext);
if (log.isTraceEnabled()) {