fix(system): avoid creating multiple worker job queue

We created miltiple worker job queue because the bean was in the prototype scope.
This was needed only for tests as they are closing it.
Switching to singleton and rebuilding the context of the test that needs it fixes the issue.
This commit is contained in:
Loïc Mathieu
2025-07-03 16:43:47 +02:00
parent 2109fa8116
commit 6a4397fdfd
8 changed files with 22 additions and 23 deletions

View File

@@ -1066,18 +1066,16 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
}
/**
* This method should only be used on tests.
* It shut down the worker without waiting for tasks to end,
* and without closing the queue, so tests can launch and shutdown a worker manually without closing the queue.
*/
@VisibleForTesting
public void shutdown() {
// initiate shutdown
shutdown.compareAndSet(false, true);
try {
// close the WorkerJob queue to stop receiving new JobTask execution.
workerJobQueue.close();
} catch (IOException e) {
log.error("Failed to close the WorkerJobQueue");
}
// close all queues and shutdown now
this.receiveCancellations.forEach(Runnable::run);
this.executorService.shutdownNow();

View File

@@ -23,6 +23,7 @@ import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import reactor.core.publisher.Flux;
import java.time.Duration;
@@ -35,7 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.utils.Rethrow.throwSupplier;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@KestraTest(rebuildContext = true)
class WorkerTest {
@Inject
ApplicationContext applicationContext;

View File

@@ -33,7 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
@KestraTest
@KestraTest(rebuildContext = true)
abstract public class AbstractSchedulerTest {
@Inject
protected ApplicationContext applicationContext;

View File

@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest
@KestraTest(rebuildContext = true)
class TriggerTest {
@Inject
private ApplicationContext applicationContext;

View File

@@ -42,7 +42,7 @@ public class H2QueueFactory implements QueueFactoryInterface {
}
@Override
@Prototype // must be prototype so we can create two Worker in the same application context for testing purpose.
@Singleton
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<WorkerJob> workerJob() {

View File

@@ -42,7 +42,7 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
}
@Override
@Prototype // must be prototype so we can create two Worker in the same application context for testing purpose.
@Singleton
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<WorkerJob> workerJob() {

View File

@@ -42,7 +42,7 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
}
@Override
@Prototype // must be prototype so we can create two Worker in the same application context for testing purpose.
@Singleton
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<WorkerJob> workerJob() {

View File

@@ -111,7 +111,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
CountDownLatch resubmitLatch = new CountDownLatch(1);
// create first worker
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, null);
worker.run();
Flux<WorkerTaskResult> receive = TestsUtils.receive(workerTaskResultQueue, either -> {
@@ -130,7 +130,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
worker.shutdown(); // stop processing task
// create second worker (this will revoke previously one).
Worker newWorker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
Worker newWorker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, null);
newWorker.run();
boolean resubmitLatchAwait = resubmitLatch.await(10, TimeUnit.SECONDS);
assertThat(resubmitLatchAwait).isTrue();
@@ -147,7 +147,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
CountDownLatch resubmitLatch = new CountDownLatch(1);
// create first worker
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, "workerGroupKey");
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, "workerGroupKey");
worker.run();
Flux<WorkerTaskResult> receive = TestsUtils.receive(workerTaskResultQueue, either -> {
@@ -166,7 +166,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
worker.shutdown(); // stop processing task
// create second worker (this will revoke previously one).
Worker newWorker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, "workerGroupKey");
Worker newWorker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, "workerGroupKey");
newWorker.run();
boolean resubmitLatchAwait = resubmitLatch.await(10, TimeUnit.SECONDS);
assertThat(resubmitLatchAwait).isTrue();
@@ -181,7 +181,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
void taskResubmitSkipExecution() throws Exception {
CountDownLatch runningLatch = new CountDownLatch(1);
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null);
worker.run();
WorkerTask workerTask = workerTask(Duration.ofSeconds(5));
@@ -203,7 +203,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
assertThat(runningLatchAwait).isTrue();
worker.shutdown();
Worker newWorker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
Worker newWorker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, null);
newWorker.run();
// wait a little to be sure there is no resubmit
@@ -215,7 +215,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
@Test
void shouldReEmitTriggerWhenWorkerIsDetectedAsNonResponding() throws Exception {
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, null);
worker.run();
WorkerTrigger workerTrigger = workerTrigger(Duration.ofSeconds(5));
@@ -236,7 +236,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
receiveTrigger.blockLast();
worker.shutdown();
Worker newWorker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
Worker newWorker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, null);
newWorker.run();
assertThat(countDownLatch.await(30, TimeUnit.SECONDS)).isTrue();
@@ -246,7 +246,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
@Test
void shouldReEmitTriggerToTheSameWorkerGroup() throws Exception {
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, "workerGroupKey");
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, "workerGroupKey");
worker.run();
WorkerTrigger workerTrigger = workerTrigger(Duration.ofSeconds(5), "workerGroupKey");
@@ -267,7 +267,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
receiveTrigger.blockLast();
worker.shutdown();
Worker newWorker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, "workerGroupKey");
Worker newWorker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, "workerGroupKey");
newWorker.run();
assertThat(countDownLatch.await(30, TimeUnit.SECONDS)).isTrue();