mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
1 Commits
dependabot
...
feat/simul
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2adb008dfb |
@@ -293,19 +293,43 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
|
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
|
||||||
Executor.class,
|
Executor.class,
|
||||||
executions -> {
|
executions -> {
|
||||||
List<CompletableFuture<Void>> futures = executions.stream()
|
// We need to simulate FIFO by executionId as processing concurrently messages for the same execution may erase outputs of one message by another.
|
||||||
|
// Moreover, as we lock by execution, this is suboptimal to process them concurrently
|
||||||
|
String deserializationExceptionId = "__kestra_deserializationExceptionId__";
|
||||||
|
Map<String, List<Either<Execution, DeserializationException>>> executionById = executions.stream()
|
||||||
|
.collect(Collectors.groupingBy(either -> either.isLeft() ? either.getLeft().getId() : deserializationExceptionId));
|
||||||
|
List<Either<Execution, DeserializationException>> concurrent = executionById.values().stream()
|
||||||
|
.flatMap(l -> l.stream().limit(1)).toList();
|
||||||
|
List<Either<Execution, DeserializationException>> executionMultiple = executionById.values().stream().flatMap(l -> l.stream().skip(1)).toList();
|
||||||
|
|
||||||
|
// execute concurrently "singles"
|
||||||
|
List<CompletableFuture<Void>> futures = concurrent.stream()
|
||||||
.map(execution -> CompletableFuture.runAsync(() -> executionQueue(execution), executionExecutorService))
|
.map(execution -> CompletableFuture.runAsync(() -> executionQueue(execution), executionExecutorService))
|
||||||
.toList();
|
.toList();
|
||||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||||
|
|
||||||
|
executionMultiple.forEach(exec -> executionQueue(exec));
|
||||||
}
|
}
|
||||||
));
|
));
|
||||||
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
|
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
|
||||||
Executor.class,
|
Executor.class,
|
||||||
workerTaskResults -> {
|
workerTaskResults -> {
|
||||||
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
|
// We need to simulate FIFO by executionId as processing concurrently messages for the same execution may erase outputs of one message by another.
|
||||||
.map(workerTaskResult -> CompletableFuture.runAsync(() -> workerTaskResultQueue(workerTaskResult), workerTaskResultExecutorService))
|
// Moreover, as we lock by execution, this is suboptimal to process them concurrently
|
||||||
|
String deserializationExceptionId = "__kestra_deserializationExceptionId__";
|
||||||
|
Map<String, List<Either<WorkerTaskResult, DeserializationException>>> executionById = workerTaskResults.stream()
|
||||||
|
.collect(Collectors.groupingBy(either -> either.isLeft() ? either.getLeft().getTaskRun().getExecutionId() : deserializationExceptionId));
|
||||||
|
List<Either<WorkerTaskResult, DeserializationException>> concurrent = executionById.values().stream()
|
||||||
|
.flatMap(l -> l.stream().limit(1)).toList();
|
||||||
|
List<Either<WorkerTaskResult, DeserializationException>> executionMultiple = executionById.values().stream().flatMap(l -> l.stream().skip(1)).toList();
|
||||||
|
|
||||||
|
// execute concurrently "singles"
|
||||||
|
List<CompletableFuture<Void>> futures = concurrent.stream()
|
||||||
|
.map(execution -> CompletableFuture.runAsync(() -> workerTaskResultQueue(execution), workerTaskResultExecutorService))
|
||||||
.toList();
|
.toList();
|
||||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||||
|
|
||||||
|
executionMultiple.forEach(exec -> workerTaskResultQueue(exec));
|
||||||
}
|
}
|
||||||
));
|
));
|
||||||
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
|
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
|
||||||
|
|||||||
Reference in New Issue
Block a user