chore(core): remove useless method pause for WorkerJobQueueInterface

This commit is contained in:
Florian Hussonnois
2024-06-07 16:32:32 +02:00
committed by Florian Hussonnois
parent 210308f806
commit c06938f155
7 changed files with 37 additions and 42 deletions

View File

@@ -3,13 +3,23 @@ package io.kestra.core.queues;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import jakarta.annotation.PreDestroy;
import java.io.Closeable;
import java.io.IOException;
import java.util.function.Consumer;
/**
* Interface for consuming the {@link WorkerJob} queue.
*/
public interface WorkerJobQueueInterface extends Closeable {
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer);
void pause();
/**
* Closes any resources used for the queue consumption.
*/
@Override
void close() throws IOException;
}

View File

@@ -727,7 +727,13 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
setState(ServiceState.TERMINATING);
workerJobQueue.pause();
try {
// close the WorkerJob queue to stop receiving new JobTask execution.
workerJobQueue.close();
} catch (IOException e) {
log.error("Failed to close the WorkerJobQueue");
}
final boolean terminatedGracefully;
if (!skipGracefulTermination.get()) {

View File

@@ -23,11 +23,6 @@ public class H2WorkerJobQueue implements WorkerJobQueueInterface {
return jdbcWorkerJobQueueService.receive(consumerGroup, queueType, consumer);
}
@Override
public void pause() {
jdbcWorkerJobQueueService.pause();
}
@Override
public void close() {
jdbcWorkerJobQueueService.close();

View File

@@ -23,11 +23,6 @@ public class MysqlWorkerJobQueue implements WorkerJobQueueInterface {
return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer);
}
@Override
public void pause() {
jdbcworkerjobQueueService.pause();
}
@Override
public void close() {
jdbcworkerjobQueueService.close();

View File

@@ -23,11 +23,6 @@ public class PostgresWorkerJobQueue implements WorkerJobQueueInterface {
return jdbcWorkerJobQueueService.receive(consumerGroup, queueType, consumer);
}
@Override
public void pause() {
jdbcWorkerJobQueueService.pause();
}
@Override
public void close() {
jdbcWorkerJobQueueService.close();

View File

@@ -14,17 +14,19 @@ import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@Singleton
@Slf4j
public class JdbcWorkerJobQueueService {
public class JdbcWorkerJobQueueService implements Closeable {
private final JdbcQueue<WorkerJob> workerTaskQueue;
private final AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
private final ServiceRegistry serviceRegistry;
private Runnable queueStop;
private final AtomicReference<Runnable> disposable = new AtomicReference<>();
private final AtomicBoolean isStopped = new AtomicBoolean(false);
@SuppressWarnings("unchecked")
public JdbcWorkerJobQueueService(ApplicationContext applicationContext) {
@@ -38,7 +40,7 @@ public class JdbcWorkerJobQueueService {
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
this.queueStop = workerTaskQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
this.disposable.set(workerTaskQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
Worker worker = serviceRegistry.waitForServiceAndGet(Service.ServiceType.WORKER).unwrap();
@@ -81,25 +83,22 @@ public class JdbcWorkerJobQueueService {
});
eithers.forEach(consumer);
});
}));
return this.queueStop;
return this.disposable.get();
}
public void pause() {
this.stopQueue();
}
/** {@inheritDoc **/
@Override
public void close() {
if (!isStopped.compareAndSet(true, false)) {
return;
}
private void stopQueue() {
synchronized (this) {
if (this.queueStop != null) {
this.queueStop.run();
this.queueStop = null;
}
Runnable runnable = this.disposable.get();
if (runnable != null) {
runnable.run();
this.disposable.set(null);
}
}
public void close() {
this.stopQueue();
}
}

View File

@@ -27,11 +27,6 @@ public class MemoryWorkerJobQueue implements WorkerJobQueueInterface {
return workerTaskQueue.receive(consumerGroup, queueType, consumer);
}
@Override
public void pause() {
}
@Override
public void close() {