Compare commits

...

1 Commits

Author SHA1 Message Date
Loïc Mathieu
485f9a3669 feat(jdbc): Improve internal queue cleaning
Instead of cleaning queues via the JdbcCleaner, or via queues.deleteByIds(), directly clean some queues after processing.
We only do this for queues that are known to have a single consumer, for these queues, instead of updating the offsets after consumption, we remove directly the records.
2025-04-07 17:52:05 +02:00
9 changed files with 84 additions and 32 deletions

View File

@@ -44,5 +44,9 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
return receive(consumerGroup, queueType, consumer, true); return receive(consumerGroup, queueType, consumer, true);
} }
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate); default Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
return receive(consumerGroup, queueType, consumer, forUpdate, false);
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete);
} }

View File

@@ -76,4 +76,18 @@ public class H2Queue<T> extends JdbcQueue<T> {
update.execute(); update.execute();
} }
@Override
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
var update = ctx.delete(DSL.table(table.getName()))
.where(AbstractJdbcRepository.field("offset").in(offsets.toArray(Integer[]::new)));
if (consumerGroup != null) {
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
} else {
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
}
update.execute();
}
} }

View File

@@ -83,6 +83,22 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
update.execute(); update.execute();
} }
@Override
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
var update = ctx
.delete(DSL.table(table.getName()))
.where(AbstractJdbcRepository.field("offset").in(offsets));
if (consumerGroup != null) {
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
} else {
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
}
update.execute();
}
private static final class MysqlQueueConsumers { private static final class MysqlQueueConsumers {
private static final Set<String> CONSUMERS; private static final Set<String> CONSUMERS;

View File

@@ -98,6 +98,20 @@ public class PostgresQueue<T> extends JdbcQueue<T> {
update.execute(); update.execute();
} }
@Override
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
var update = ctx.delete(DSL.table(table.getName()))
.where(AbstractJdbcRepository.field("offset").in(offsets));
if (consumerGroup != null) {
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
} else {
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
}
update.execute();
}
@Override @Override
protected List<Either<T, DeserializationException>> map(Result<Record> fetch) { protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
return fetch return fetch

View File

@@ -80,7 +80,8 @@ public class JdbcWorkerJobQueueService implements Closeable, Pauseable {
}); });
eithers.forEach(consumer); eithers.forEach(consumer);
})); },
true));
return this.disposable.get(); return this.disposable.get();
} }

View File

@@ -63,7 +63,8 @@ public class JdbcWorkerTriggerResultQueueService implements Closeable {
} }
consumer.accept(either); consumer.accept(either);
}); });
})); },
true));
return disposable.get(); return disposable.get();
} }

View File

@@ -186,9 +186,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Value("${kestra.jdbc.executor.clean.execution-queue:true}") @Value("${kestra.jdbc.executor.clean.execution-queue:true}")
private boolean cleanExecutionQueue; private boolean cleanExecutionQueue;
@Value("${kestra.jdbc.executor.clean.worker-queue:true}")
private boolean cleanWorkerJobQueue;
private final Tracer tracer; private final Tracer tracer;
private final FlowRepositoryInterface flowRepository; private final FlowRepositoryInterface flowRepository;
@@ -251,7 +248,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(2, Runtime.getRuntime().availableProcessors() / 2); int numberOfThreads = threadCount != 0 ? threadCount : Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue)); this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue)); this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(null, Executor.class, this::workerTaskResultQueue, true, true));
} }
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue)); this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue)); this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
@@ -1038,18 +1035,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
this.triggerState.update(executionService.resetExecution(flow, execution, trigger)); this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
}); });
} }
// Purge the workerTaskResultQueue and the workerJobQueue
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
// If any of these assumptions changed, this code would not be safe anymore.
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
}
} }
} catch (QueueException e) { } catch (QueueException e) {
if (!ignoreFailure) { if (!ignoreFailure) {

View File

@@ -255,6 +255,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
abstract protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets); abstract protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);
abstract protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);
protected abstract Condition buildTypeCondition(String type); protected abstract Condition buildTypeCondition(String type);
@Override @Override
@@ -302,7 +304,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
} }
@Override @Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) { public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete) {
return this.receiveImpl( return this.receiveImpl(
consumerGroup, consumerGroup,
queueType, queueType,
@@ -310,7 +312,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
eithers.forEach(consumer); eithers.forEach(consumer);
}, },
false, false,
forUpdate forUpdate,
delete
); );
} }
@@ -330,17 +333,23 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
consumer.accept(eithers); consumer.accept(eithers);
}, },
false, false,
forUpdate forUpdate,
false
); );
} }
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer) { public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer) {
return this.receiveTransaction(consumerGroup, queueType, consumer, false);
}
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer, boolean delete) {
return this.receiveImpl( return this.receiveImpl(
consumerGroup, consumerGroup,
queueType, queueType,
consumer, consumer,
true, true,
true true,
delete
); );
} }
@@ -349,7 +358,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
Class<?> queueType, Class<?> queueType,
BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer,
Boolean inTransaction, Boolean inTransaction,
boolean forUpdate boolean forUpdate,
boolean delete
) { ) {
String queueName = queueName(queueType); String queueName = queueName(queueType);
@@ -364,12 +374,21 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
consumer.accept(ctx, this.map(result)); consumer.accept(ctx, this.map(result));
} }
this.updateGroupOffsets( if (delete) {
ctx, this.deleteGroupOffsets(
consumerGroup, ctx,
queueName, consumerGroup,
result.map(record -> record.get("offset", Integer.class)) queueName,
); result.map(record -> record.get("offset", Integer.class))
);
} else {
this.updateGroupOffsets(
ctx,
consumerGroup,
queueName,
result.map(record -> record.get("offset", Integer.class))
);
}
} }
return result; return result;

View File

@@ -25,7 +25,6 @@ import java.util.function.BiConsumer;
public class JdbcScheduler extends AbstractScheduler { public class JdbcScheduler extends AbstractScheduler {
private final TriggerRepositoryInterface triggerRepository; private final TriggerRepositoryInterface triggerRepository;
private final JooqDSLContextWrapper dslContextWrapper; private final JooqDSLContextWrapper dslContextWrapper;
private final PluginDefaultService pluginDefaultService;
@Inject @Inject
public JdbcScheduler( public JdbcScheduler(
@@ -38,7 +37,6 @@ public class JdbcScheduler extends AbstractScheduler {
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class); triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class); executionState = applicationContext.getBean(SchedulerExecutionState.class);
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class); dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
} }
@Override @Override