Compare commits

...

1 Commits

Author SHA1 Message Date
Loïc Mathieu
485f9a3669 feat(jdbc): Improve internal queue cleaning
Instead of cleaning queues via the JdbcCleaner, or via queues.deleteByIds(), directly clean some queues after processing.
We only do this for queues that are known to have a single consumer, for these queues, instead of updating the offsets after consumption, we remove directly the records.
2025-04-07 17:52:05 +02:00
9 changed files with 84 additions and 32 deletions

View File

@@ -44,5 +44,9 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
return receive(consumerGroup, queueType, consumer, true);
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
return receive(consumerGroup, queueType, consumer, forUpdate, false);
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete);
}

View File

@@ -76,4 +76,18 @@ public class H2Queue<T> extends JdbcQueue<T> {
update.execute();
}
@Override
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
var update = ctx.delete(DSL.table(table.getName()))
.where(AbstractJdbcRepository.field("offset").in(offsets.toArray(Integer[]::new)));
if (consumerGroup != null) {
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
} else {
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
}
update.execute();
}
}

View File

@@ -83,6 +83,22 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
update.execute();
}
@Override
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
var update = ctx
.delete(DSL.table(table.getName()))
.where(AbstractJdbcRepository.field("offset").in(offsets));
if (consumerGroup != null) {
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
} else {
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
}
update.execute();
}
private static final class MysqlQueueConsumers {
private static final Set<String> CONSUMERS;

View File

@@ -98,6 +98,20 @@ public class PostgresQueue<T> extends JdbcQueue<T> {
update.execute();
}
@Override
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
var update = ctx.delete(DSL.table(table.getName()))
.where(AbstractJdbcRepository.field("offset").in(offsets));
if (consumerGroup != null) {
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
} else {
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
}
update.execute();
}
@Override
protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
return fetch

View File

@@ -80,7 +80,8 @@ public class JdbcWorkerJobQueueService implements Closeable, Pauseable {
});
eithers.forEach(consumer);
}));
},
true));
return this.disposable.get();
}

View File

@@ -63,7 +63,8 @@ public class JdbcWorkerTriggerResultQueueService implements Closeable {
}
consumer.accept(either);
});
}));
},
true));
return disposable.get();
}

View File

@@ -186,9 +186,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Value("${kestra.jdbc.executor.clean.execution-queue:true}")
private boolean cleanExecutionQueue;
@Value("${kestra.jdbc.executor.clean.worker-queue:true}")
private boolean cleanWorkerJobQueue;
private final Tracer tracer;
private final FlowRepositoryInterface flowRepository;
@@ -251,7 +248,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
for (int i = 0; i < numberOfThreads; i++) {
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue));
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(null, Executor.class, this::workerTaskResultQueue, true, true));
}
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
@@ -1038,18 +1035,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
});
}
// Purge the workerTaskResultQueue and the workerJobQueue
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
// If any of these assumptions changed, this code would not be safe anymore.
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
}
}
} catch (QueueException e) {
if (!ignoreFailure) {

View File

@@ -255,6 +255,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
abstract protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);
abstract protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);
protected abstract Condition buildTypeCondition(String type);
@Override
@@ -302,7 +304,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
}
@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete) {
return this.receiveImpl(
consumerGroup,
queueType,
@@ -310,7 +312,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
eithers.forEach(consumer);
},
false,
forUpdate
forUpdate,
delete
);
}
@@ -330,17 +333,23 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
consumer.accept(eithers);
},
false,
forUpdate
forUpdate,
false
);
}
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer) {
return this.receiveTransaction(consumerGroup, queueType, consumer, false);
}
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer, boolean delete) {
return this.receiveImpl(
consumerGroup,
queueType,
consumer,
true,
true
true,
delete
);
}
@@ -349,7 +358,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
Class<?> queueType,
BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer,
Boolean inTransaction,
boolean forUpdate
boolean forUpdate,
boolean delete
) {
String queueName = queueName(queueType);
@@ -364,12 +374,21 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
consumer.accept(ctx, this.map(result));
}
this.updateGroupOffsets(
ctx,
consumerGroup,
queueName,
result.map(record -> record.get("offset", Integer.class))
);
if (delete) {
this.deleteGroupOffsets(
ctx,
consumerGroup,
queueName,
result.map(record -> record.get("offset", Integer.class))
);
} else {
this.updateGroupOffsets(
ctx,
consumerGroup,
queueName,
result.map(record -> record.get("offset", Integer.class))
);
}
}
return result;

View File

@@ -25,7 +25,6 @@ import java.util.function.BiConsumer;
public class JdbcScheduler extends AbstractScheduler {
private final TriggerRepositoryInterface triggerRepository;
private final JooqDSLContextWrapper dslContextWrapper;
private final PluginDefaultService pluginDefaultService;
@Inject
public JdbcScheduler(
@@ -38,7 +37,6 @@ public class JdbcScheduler extends AbstractScheduler {
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class);
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
}
@Override