Compare commits

...

1 Commits

Author SHA1 Message Date
brian.mulier
a120f67b7f fix(core): JDBC Executor will consume executions from queue per executionId in order 2025-12-01 12:16:34 +01:00

View File

@@ -296,10 +296,25 @@ public class JdbcExecutor implements ExecutorInterface {
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
Executor.class,
executions -> {
List<CompletableFuture<Void>> futures = executions.stream()
.map(execution -> CompletableFuture.runAsync(() -> executionQueue(execution), executionExecutorService))
List<CompletableFuture<Void>> exceptionHandlingFutures = executions.stream()
.filter(Either::isRight)
.map(either -> CompletableFuture.runAsync(() -> executionQueue(either), executionExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
List<CompletableFuture<Void>> perExecutionFutures = executions.stream()
.filter(Either::isLeft)
.collect(Collectors.groupingBy(either -> either.getLeft().getId()))
.values()
.stream()
.map(eithers -> CompletableFuture.runAsync(() -> {
eithers.forEach(this::executionQueue);
}, executionExecutorService))
.toList();
CompletableFuture.allOf(Stream.concat(
perExecutionFutures.stream(),
exceptionHandlingFutures.stream()
).toArray(CompletableFuture[]::new)).join();
}
));
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(