mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
3 Commits
develop
...
run-pr-133
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f6ea7c0af | ||
|
|
476b37aabe | ||
|
|
921b05fe5c |
@@ -1,7 +1,10 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.function.*;
|
||||
|
||||
@Slf4j
|
||||
public final class Rethrow {
|
||||
@FunctionalInterface
|
||||
public interface ConsumerChecked<T, E extends Exception> {
|
||||
@@ -117,4 +120,17 @@ public final class Rethrow {
|
||||
private static <E extends Exception, R> R throwException(Exception exception) throws E {
|
||||
throw (E) exception;
|
||||
}
|
||||
|
||||
public static <T> Consumer<T> failAwareConsumer(Consumer<T> consumer, boolean failSilently) {
|
||||
return t -> {
|
||||
try {
|
||||
consumer.accept(t);
|
||||
} catch (Exception exception) {
|
||||
if (!failSilently) {
|
||||
throw exception;
|
||||
}
|
||||
log.warn("Suppressed exception from consumer", exception);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
29
core/src/test/java/io/kestra/core/utils/RethrowTest.java
Normal file
29
core/src/test/java/io/kestra/core/utils/RethrowTest.java
Normal file
@@ -0,0 +1,29 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class RethrowTest {
|
||||
@Test
|
||||
void failAwareConsumer() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(out));
|
||||
|
||||
RuntimeException runtimeException = assertThrows(RuntimeException.class, () -> Rethrow.failAwareConsumer(c -> {
|
||||
throw new RuntimeException("loud");
|
||||
}, false).accept("anything"));
|
||||
assertThat(runtimeException.getMessage()).isEqualTo("loud");
|
||||
|
||||
assertDoesNotThrow(() -> Rethrow.failAwareConsumer(c -> {
|
||||
throw new RuntimeException("silent");
|
||||
}, true).accept("anything"));
|
||||
assertThat(out.toString()).contains("Suppressed exception from consumer");
|
||||
assertThat(out.toString()).contains("java.lang.RuntimeException: silent");
|
||||
}
|
||||
}
|
||||
@@ -69,6 +69,7 @@ import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.*;
|
||||
import static io.kestra.core.utils.Rethrow.failAwareConsumer;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Singleton
|
||||
@@ -222,6 +223,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
private final java.util.concurrent.ExecutorService workerTaskResultExecutorService;
|
||||
private final java.util.concurrent.ExecutorService executionExecutorService;
|
||||
private final int numberOfThreads;
|
||||
private final boolean skipFailure;
|
||||
|
||||
/**
|
||||
* Creates a new {@link JdbcExecutor} instance. Both constructor and field injection are used
|
||||
@@ -241,7 +243,8 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
final TracerFactory tracerFactory,
|
||||
final ExecutorsUtils executorsUtils,
|
||||
final MaintenanceService maintenanceService,
|
||||
@Value("${kestra.jdbc.executor.thread-count:0}") final int threadCount
|
||||
@Value("${kestra.jdbc.executor.thread-count:0}") final int threadCount,
|
||||
@Value("${kestra.jdbc.executor.skip-failure:false}") final boolean skipFailure
|
||||
) {
|
||||
this.serviceLivenessCoordinator = serviceLivenessCoordinator;
|
||||
this.flowMetaStore = flowMetaStore;
|
||||
@@ -256,6 +259,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
this.numberOfThreads = threadCount != 0 ? threadCount : Math.max(4, Runtime.getRuntime().availableProcessors());
|
||||
this.workerTaskResultExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-worker-task-result-executor");
|
||||
this.executionExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-execution-executor");
|
||||
this.skipFailure = skipFailure;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
@@ -295,27 +299,27 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
|
||||
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
|
||||
Executor.class,
|
||||
executions -> {
|
||||
failAwareConsumer(executions -> {
|
||||
List<CompletableFuture<Void>> futures = executions.stream()
|
||||
.map(execution -> CompletableFuture.runAsync(() -> executionQueue(execution), executionExecutorService))
|
||||
.toList();
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
}, skipFailure)
|
||||
));
|
||||
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
|
||||
Executor.class,
|
||||
workerTaskResults -> {
|
||||
failAwareConsumer(workerTaskResults -> {
|
||||
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
|
||||
.map(workerTaskResult -> CompletableFuture.runAsync(() -> workerTaskResultQueue(workerTaskResult), workerTaskResultExecutorService))
|
||||
.toList();
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
}, skipFailure)
|
||||
));
|
||||
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
|
||||
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
|
||||
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
|
||||
this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, this::multipleConditionEventQueue));
|
||||
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
|
||||
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, failAwareConsumer(this::killQueue, skipFailure)));
|
||||
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, failAwareConsumer(this::subflowExecutionResultQueue, skipFailure)));
|
||||
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, failAwareConsumer(this::subflowExecutionEndQueue, skipFailure)));
|
||||
this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, failAwareConsumer(this::multipleConditionEventQueue, skipFailure)));
|
||||
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(failAwareConsumer(this::clusterEventQueue, skipFailure))));
|
||||
|
||||
executionDelayFuture = scheduledDelay.scheduleAtFixedRate(
|
||||
this::executionDelaySend,
|
||||
@@ -369,9 +373,10 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
}
|
||||
);
|
||||
|
||||
// if we cannot create a FlowWithException, ignore the message
|
||||
this.receiveCancellations.addFirst(flowQueue.receive(
|
||||
FlowTopology.class,
|
||||
either -> {
|
||||
failAwareConsumer(either -> {
|
||||
FlowInterface flow;
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize a flow: {}", either.getRight().getMessage());
|
||||
@@ -404,8 +409,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to save flow topology for flow " + flow.uid(), e);
|
||||
}
|
||||
|
||||
}
|
||||
}, skipFailure)
|
||||
));
|
||||
|
||||
if (this.maintenanceService.isInMaintenanceMode()) {
|
||||
|
||||
@@ -12,6 +12,7 @@ public class KestraTestExtension extends MicronautJunit5Extension {
|
||||
@Override
|
||||
protected MicronautTestValue buildMicronautTestValue(Class<?> testClass) {
|
||||
testProperties.put("kestra.jdbc.executor.thread-count", Runtime.getRuntime().availableProcessors() * 4);
|
||||
testProperties.put("kestra.jdbc.executor.skip-failure", true);
|
||||
return AnnotationSupport
|
||||
.findAnnotation(testClass, KestraTest.class)
|
||||
.map(kestraTestAnnotation -> new MicronautTestValue(
|
||||
|
||||
@@ -148,16 +148,22 @@ class WorkerTest {
|
||||
worker.run();
|
||||
|
||||
List<WorkerTaskResult> workerTaskResult = new ArrayList<>();
|
||||
Flux<WorkerTaskResult> receiveWorkerTaskResults = TestsUtils.receive(workerTaskResultQueue, either -> workerTaskResult.add(either.getLeft()));
|
||||
|
||||
WorkerTask workerTask = workerTask(999000);
|
||||
|
||||
workerTaskQueue.emit(workerTask);
|
||||
workerTaskQueue.emit(workerTask);
|
||||
workerTaskQueue.emit(workerTask);
|
||||
workerTaskQueue.emit(workerTask);
|
||||
|
||||
WorkerTask notKilled = workerTask(2000);
|
||||
|
||||
Flux<WorkerTaskResult> receiveWorkerTaskResults = TestsUtils.receive(workerTaskResultQueue, either -> {
|
||||
if (List.of(workerTask.getTaskRun().getExecutionId(), notKilled.getTaskRun().getExecutionId()).contains(either.getLeft().getTaskRun().getExecutionId())) {
|
||||
workerTaskResult.add(either.getLeft());
|
||||
}
|
||||
});
|
||||
// toto
|
||||
|
||||
workerTaskQueue.emit(workerTask);
|
||||
workerTaskQueue.emit(workerTask);
|
||||
workerTaskQueue.emit(workerTask);
|
||||
workerTaskQueue.emit(workerTask);
|
||||
|
||||
workerTaskQueue.emit(notKilled);
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
Reference in New Issue
Block a user