Compare commits

...

3 Commits

Author SHA1 Message Date
brian.mulier
9f6ea7c0af WIP 2025-12-05 17:33:00 +01:00
brian.mulier
476b37aabe fix(tests): remove flakiness from WorkerTest.killed 2025-12-05 17:29:06 +01:00
brian.mulier
921b05fe5c fix(executions): ability to skip messages failure in executor 2025-12-03 15:05:27 +01:00
5 changed files with 76 additions and 20 deletions

View File

@@ -1,7 +1,10 @@
package io.kestra.core.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.function.*;
@Slf4j
public final class Rethrow {
@FunctionalInterface
public interface ConsumerChecked<T, E extends Exception> {
@@ -117,4 +120,17 @@ public final class Rethrow {
private static <E extends Exception, R> R throwException(Exception exception) throws E {
throw (E) exception;
}
public static <T> Consumer<T> failAwareConsumer(Consumer<T> consumer, boolean failSilently) {
return t -> {
try {
consumer.accept(t);
} catch (Exception exception) {
if (!failSilently) {
throw exception;
}
log.warn("Suppressed exception from consumer", exception);
}
};
}
}

View File

@@ -0,0 +1,29 @@
package io.kestra.core.utils;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
class RethrowTest {
@Test
void failAwareConsumer() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
RuntimeException runtimeException = assertThrows(RuntimeException.class, () -> Rethrow.failAwareConsumer(c -> {
throw new RuntimeException("loud");
}, false).accept("anything"));
assertThat(runtimeException.getMessage()).isEqualTo("loud");
assertDoesNotThrow(() -> Rethrow.failAwareConsumer(c -> {
throw new RuntimeException("silent");
}, true).accept("anything"));
assertThat(out.toString()).contains("Suppressed exception from consumer");
assertThat(out.toString()).contains("java.lang.RuntimeException: silent");
}
}

View File

@@ -69,6 +69,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.*;
import static io.kestra.core.utils.Rethrow.failAwareConsumer;
@SuppressWarnings("deprecation")
@Singleton
@@ -222,6 +223,7 @@ public class JdbcExecutor implements ExecutorInterface {
private final java.util.concurrent.ExecutorService workerTaskResultExecutorService;
private final java.util.concurrent.ExecutorService executionExecutorService;
private final int numberOfThreads;
private final boolean skipFailure;
/**
* Creates a new {@link JdbcExecutor} instance. Both constructor and field injection are used
@@ -241,7 +243,8 @@ public class JdbcExecutor implements ExecutorInterface {
final TracerFactory tracerFactory,
final ExecutorsUtils executorsUtils,
final MaintenanceService maintenanceService,
@Value("${kestra.jdbc.executor.thread-count:0}") final int threadCount
@Value("${kestra.jdbc.executor.thread-count:0}") final int threadCount,
@Value("${kestra.jdbc.executor.skip-failure:false}") final boolean skipFailure
) {
this.serviceLivenessCoordinator = serviceLivenessCoordinator;
this.flowMetaStore = flowMetaStore;
@@ -256,6 +259,7 @@ public class JdbcExecutor implements ExecutorInterface {
this.numberOfThreads = threadCount != 0 ? threadCount : Math.max(4, Runtime.getRuntime().availableProcessors());
this.workerTaskResultExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-worker-task-result-executor");
this.executionExecutorService = executorsUtils.maxCachedThreadPool(numberOfThreads, "jdbc-execution-executor");
this.skipFailure = skipFailure;
}
@PostConstruct
@@ -295,27 +299,27 @@ public class JdbcExecutor implements ExecutorInterface {
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
Executor.class,
executions -> {
failAwareConsumer(executions -> {
List<CompletableFuture<Void>> futures = executions.stream()
.map(execution -> CompletableFuture.runAsync(() -> executionQueue(execution), executionExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
}, skipFailure)
));
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
Executor.class,
workerTaskResults -> {
failAwareConsumer(workerTaskResults -> {
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
.map(workerTaskResult -> CompletableFuture.runAsync(() -> workerTaskResultQueue(workerTaskResult), workerTaskResultExecutorService))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
}, skipFailure)
));
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, this::multipleConditionEventQueue));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, failAwareConsumer(this::killQueue, skipFailure)));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, failAwareConsumer(this::subflowExecutionResultQueue, skipFailure)));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, failAwareConsumer(this::subflowExecutionEndQueue, skipFailure)));
this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, failAwareConsumer(this::multipleConditionEventQueue, skipFailure)));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(failAwareConsumer(this::clusterEventQueue, skipFailure))));
executionDelayFuture = scheduledDelay.scheduleAtFixedRate(
this::executionDelaySend,
@@ -369,9 +373,10 @@ public class JdbcExecutor implements ExecutorInterface {
}
);
// if we cannot create a FlowWithException, ignore the message
this.receiveCancellations.addFirst(flowQueue.receive(
FlowTopology.class,
either -> {
failAwareConsumer(either -> {
FlowInterface flow;
if (either.isRight()) {
log.error("Unable to deserialize a flow: {}", either.getRight().getMessage());
@@ -404,8 +409,7 @@ public class JdbcExecutor implements ExecutorInterface {
} catch (Exception e) {
log.error("Unable to save flow topology for flow " + flow.uid(), e);
}
}
}, skipFailure)
));
if (this.maintenanceService.isInMaintenanceMode()) {

View File

@@ -12,6 +12,7 @@ public class KestraTestExtension extends MicronautJunit5Extension {
@Override
protected MicronautTestValue buildMicronautTestValue(Class<?> testClass) {
testProperties.put("kestra.jdbc.executor.thread-count", Runtime.getRuntime().availableProcessors() * 4);
testProperties.put("kestra.jdbc.executor.skip-failure", true);
return AnnotationSupport
.findAnnotation(testClass, KestraTest.class)
.map(kestraTestAnnotation -> new MicronautTestValue(

View File

@@ -148,16 +148,22 @@ class WorkerTest {
worker.run();
List<WorkerTaskResult> workerTaskResult = new ArrayList<>();
Flux<WorkerTaskResult> receiveWorkerTaskResults = TestsUtils.receive(workerTaskResultQueue, either -> workerTaskResult.add(either.getLeft()));
WorkerTask workerTask = workerTask(999000);
workerTaskQueue.emit(workerTask);
workerTaskQueue.emit(workerTask);
workerTaskQueue.emit(workerTask);
workerTaskQueue.emit(workerTask);
WorkerTask notKilled = workerTask(2000);
Flux<WorkerTaskResult> receiveWorkerTaskResults = TestsUtils.receive(workerTaskResultQueue, either -> {
if (List.of(workerTask.getTaskRun().getExecutionId(), notKilled.getTaskRun().getExecutionId()).contains(either.getLeft().getTaskRun().getExecutionId())) {
workerTaskResult.add(either.getLeft());
}
});
// toto
workerTaskQueue.emit(workerTask);
workerTaskQueue.emit(workerTask);
workerTaskQueue.emit(workerTask);
workerTaskQueue.emit(workerTask);
workerTaskQueue.emit(notKilled);
Thread.sleep(500);