Compare commits

...

1 Commits

Author SHA1 Message Date
Loïc Mathieu
2adb008dfb chore(system): simulate FIFO processing for messages of the same execution inside the JDBC executor
The JDBC executor process execution and worker task result messages concurrently for performance.
But the first thing it does is to lock the execution to avoid incosistent processing of the same execution.

By simulating a FIFO queue on execution to avoid processing concurrently the same execution (but keep processing different execution concurrently) we improve performance for concurrent task processing as we avoid the cost of lock & wait on the DB.
2025-08-25 09:16:07 +02:00

View File

@@ -293,19 +293,43 @@ public class JdbcExecutor implements ExecutorInterface, Service {
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
Executor.class,
executions -> {
List<CompletableFuture<Void>> futures = executions.stream()
// We need to simulate FIFO by executionId as processing concurrently messages for the same execution may erase outputs of one message by another.
// Moreover, as we lock by execution, this is suboptimal to process them concurrently
String deserializationExceptionId = "__kestra_deserializationExceptionId__";
Map<String, List<Either<Execution, DeserializationException>>> executionById = executions.stream()
.collect(Collectors.groupingBy(either -> either.isLeft() ? either.getLeft().getId() : deserializationExceptionId));
List<Either<Execution, DeserializationException>> concurrent = executionById.values().stream()
.flatMap(l -> l.stream().limit(1)).toList();
List<Either<Execution, DeserializationException>> executionMultiple = executionById.values().stream().flatMap(l -> l.stream().skip(1)).toList();
// execute concurrently "singles"
List<CompletableFuture<Void>> futures = concurrent.stream()
.map(execution -> CompletableFuture.runAsync(() -> executionQueue(execution), executionExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executionMultiple.forEach(exec -> executionQueue(exec));
}
));
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
Executor.class,
workerTaskResults -> {
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
.map(workerTaskResult -> CompletableFuture.runAsync(() -> workerTaskResultQueue(workerTaskResult), workerTaskResultExecutorService))
// We need to simulate FIFO by executionId as processing concurrently messages for the same execution may erase outputs of one message by another.
// Moreover, as we lock by execution, this is suboptimal to process them concurrently
String deserializationExceptionId = "__kestra_deserializationExceptionId__";
Map<String, List<Either<WorkerTaskResult, DeserializationException>>> executionById = workerTaskResults.stream()
.collect(Collectors.groupingBy(either -> either.isLeft() ? either.getLeft().getTaskRun().getExecutionId() : deserializationExceptionId));
List<Either<WorkerTaskResult, DeserializationException>> concurrent = executionById.values().stream()
.flatMap(l -> l.stream().limit(1)).toList();
List<Either<WorkerTaskResult, DeserializationException>> executionMultiple = executionById.values().stream().flatMap(l -> l.stream().skip(1)).toList();
// execute concurrently "singles"
List<CompletableFuture<Void>> futures = concurrent.stream()
.map(execution -> CompletableFuture.runAsync(() -> workerTaskResultQueue(execution), workerTaskResultExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executionMultiple.forEach(exec -> workerTaskResultQueue(exec));
}
));
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));