chore(jdbc): cleanup QueueInterface

This commit is contained in:
Florian Hussonnois
2024-06-10 16:48:34 +02:00
committed by Florian Hussonnois
parent 769a21665f
commit bc34aeb16c
9 changed files with 30 additions and 81 deletions

View File

@@ -45,5 +45,4 @@ public interface QueueInterface<T> extends Closeable {
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
void pause();
}

View File

@@ -11,9 +11,6 @@ import java.util.function.Consumer;
* Required for the QueueFactory, to have common interface with JDBC & Kafka
*/
public interface WorkerTriggerResultQueueInterface extends Closeable {
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer);
void pause();
void cleanup();
}

View File

@@ -25,16 +25,6 @@ public class H2WorkerTriggerResultQueue extends H2Queue<WorkerTriggerResult> imp
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
}
@Override
public void pause() {
jdbcWorkerTriggerResultQueueService.pause();
}
@Override
public void cleanup() {
jdbcWorkerTriggerResultQueueService.cleanup();
}
@Override
public void close() throws IOException {
super.close();

View File

@@ -25,16 +25,6 @@ public class MysqlWorkerTriggerResultQueue extends MysqlQueue<WorkerTriggerResul
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
}
@Override
public void pause() {
jdbcWorkerTriggerResultQueueService.pause();
}
@Override
public void cleanup() {
jdbcWorkerTriggerResultQueueService.cleanup();
}
@Override
public void close() throws IOException {
super.close();

View File

@@ -25,16 +25,6 @@ public class PostgresWorkerTriggerResultQueue extends PostgresQueue<WorkerTrigge
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
}
@Override
public void pause() {
jdbcWorkerTriggerResultQueueService.pause();
}
@Override
public void cleanup() {
jdbcWorkerTriggerResultQueueService.cleanup();
}
@Override
public void close() throws IOException {
super.close();

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcQueue;
@@ -18,16 +17,24 @@ import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@Singleton
@Slf4j
public class JdbcWorkerTriggerResultQueueService {
public class JdbcWorkerTriggerResultQueueService implements Closeable {
private final static ObjectMapper MAPPER = JdbcMapper.of();
private final JdbcQueue<WorkerTriggerResult> workerTriggerResultQueue;
@Inject
private AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
private final AtomicReference<Runnable> disposable = new AtomicReference<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private Runnable queueStop;
@SuppressWarnings("unchecked")
@@ -38,7 +45,7 @@ public class JdbcWorkerTriggerResultQueueService {
}
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
this.queueStop = workerTriggerResultQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
disposable.set(workerTriggerResultQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
eithers.forEach(either -> {
if (either.isRight()) {
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
@@ -58,26 +65,21 @@ public class JdbcWorkerTriggerResultQueueService {
});
eithers.forEach(consumer);
});
return this.queueStop;
}));
return disposable.get();
}
public void pause() {
this.stopQueue();
}
private void stopQueue() {
synchronized (this) {
if (this.queueStop != null) {
this.queueStop.run();
this.queueStop = null;
}
}
}
public void cleanup() { }
/** {@inheritDoc **/
@Override
public void close() {
this.stopQueue();
if (!isClosed.compareAndSet(false, true)) {
return;
}
Runnable runnable = this.disposable.get();
if (runnable != null) {
runnable.run();
this.disposable.set(null);
}
}
}

View File

@@ -61,7 +61,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
protected final JdbcQueueIndexer jdbcQueueIndexer;
protected volatile boolean isShutdown = false;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
public JdbcQueue(Class<T> cls, ApplicationContext applicationContext) {
ExecutorsUtils executorsUtils = applicationContext.getBean(ExecutorsUtils.class);
@@ -264,7 +264,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
AtomicReference<ZonedDateTime> lastPoll = new AtomicReference<>(ZonedDateTime.now());
poolExecutor.execute(() -> {
while (running.get() && !this.isShutdown) {
while (running.get() && !this.isClosed.get()) {
try {
Integer count = runnable.get();
if (count > 0) {
@@ -289,9 +289,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
}
});
return () -> {
running.set(false);
};
return () -> running.set(false);
}
protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
@@ -310,14 +308,12 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
.forEach(consumer);
}
@Override
public void pause() {
this.isShutdown = true;
}
@Override
public void close() throws IOException {
this.isShutdown = true;
if (!this.isClosed.compareAndSet(false, true)) {
return;
}
poolExecutor.shutdown();
}

View File

@@ -132,11 +132,6 @@ public class MemoryQueue<T> implements QueueInterface<T> {
};
}
@Override
public void pause() {
}
public int getSubscribersCount() {
return this.queues
.values()

View File

@@ -26,16 +26,6 @@ public class MemoryWorkerTriggerResultQueue implements WorkerTriggerResultQueueI
return workerTriggerResultQueue.receive(consumerGroup, queueType, consumer);
}
@Override
public void pause() {
}
@Override
public void cleanup() {
}
@Override
public void close() {