fix(queues): add forUpdate boolean to receive method to allow non-locking receiving

This commit is contained in:
brian.mulier
2024-06-05 11:25:41 +02:00
committed by brian-mulier-p
parent cc64e526cc
commit e6b89ab77c
7 changed files with 96 additions and 43 deletions

View File

@@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.Hashing;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.utils.Either;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.queues.QueueService;
import io.kestra.core.queues.QueueException;
@@ -102,12 +101,12 @@ public class MemoryQueue<T> implements QueueInterface<T> {
}
@Override
public Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
return this.receive(consumerGroup, null, consumer);
public Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
return this.receive(consumerGroup, null, consumer, forUpdate);
}
@Override
public synchronized Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer) {
public synchronized Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
String queueName;
if (queueType == null) {
queueName = UUID.randomUUID().toString();