Compare commits

...

4 Commits

Author SHA1 Message Date
brian.mulier
b3e858ffb8 fix(tests): wait for KILL in TestExecution cleaner 2025-11-07 20:24:10 +01:00
brian.mulier
94fa411b2b fix(tests): remove ThreadLocal + add distinct on TestExecution cleaner 2025-11-07 16:47:58 +01:00
brian.mulier
91486169e1 fix(tests): add TestQueueFactory setup in beforeEach
Otherwise the rebuildContext was creating a new applicationContext that doesn't contain the testExecutions
2025-11-07 16:15:02 +01:00
brian.mulier
0ca9a90ebf feat(tests): intercept created executions through queue proxy & kill them if running after test 2025-11-07 16:15:02 +01:00
12 changed files with 213 additions and 28 deletions

View File

@@ -3,6 +3,7 @@ package io.kestra.core.queues;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.Pauseable;
import io.kestra.core.utils.Either;
import org.apache.commons.lang3.NotImplementedException;
import java.io.Closeable;
import java.util.List;
@@ -54,4 +55,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default void deleteByKey(String key) throws QueueException {
throw new NotImplementedException();
}
default void deleteByKeys(List<String> keys) throws QueueException {
throw new NotImplementedException();
}
default void emitOnly(String consumerGroup, T message) throws QueueException {
throw new NotImplementedException();
}
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
throw new NotImplementedException();
}
}

View File

@@ -15,6 +15,7 @@ import io.kestra.core.services.FlowListenersInterface;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -30,8 +31,8 @@ public class FlowListeners implements FlowListenersInterface {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final QueueInterface<FlowInterface> flowQueue;
private final List<FlowWithSource> flows;
private final List<Consumer<List<FlowWithSource>>> consumers = new ArrayList<>();
private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new ArrayList<>();
private final List<Consumer<List<FlowWithSource>>> consumers = new CopyOnWriteArrayList<>();
private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new CopyOnWriteArrayList<>();
private final PluginDefaultService pluginDefaultService;

View File

@@ -2,6 +2,10 @@ package io.kestra.core.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
public class ListUtils {
public static <T> List<T> emptyOnNull(List<T> list) {
@@ -71,4 +75,13 @@ public class ListUtils {
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
}
}
private static <T> Predicate<T> distinctByKeyPredicate(Function<? super T,Object> keyExtractor) {
Map<Object,Boolean> seen = new ConcurrentHashMap<>();
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}
public static <T> List<T> distinctByKey(List<T> withDuplicates, Function<? super T, Object> keyExtractor) {
return withDuplicates.stream().filter(distinctByKeyPredicate(keyExtractor)).toList();
}
}

View File

@@ -14,13 +14,10 @@ import java.util.Date;
import java.util.Map;
import jakarta.inject.Inject;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@TestInstance(Lifecycle.PER_CLASS)
class DateFilterTest {
public static final ZonedDateTime NOW = ZonedDateTime.parse("2013-09-08T16:19:12.123456+01");

View File

@@ -1,7 +1,11 @@
package io.kestra.core.utils;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
@@ -52,4 +56,19 @@ class ListUtilsTest {
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString("not a list"));
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString(List.of(1, 2, 3)));
}
}
@Test
void distinctByKey() {
List<Execution> executions = List.of(
Execution.builder().id("1").flowId("flow1").state(State.of(State.Type.CREATED, Collections.emptyList())).build(),
Execution.builder().id("1").flowId("flow1").state(State.of(State.Type.RUNNING, List.of(new State.History(State.Type.CREATED, Instant.now().minus(Duration.ofSeconds(2)))))).build(),
Execution.builder().id("2").flowId("flow2").build()
);
List<Execution> distinctExecutions = ListUtils.distinctByKey(executions, Execution::getId);
assertThat(distinctExecutions.size()).isEqualTo(2);
assertThat(distinctExecutions.stream().map(Execution::getId)).containsExactlyInAnyOrder("1", "2");
assertThat(distinctExecutions.stream().filter(e -> e.getId().equals("1")).findFirst().get().getState().getCurrent()).isEqualTo(State.Type.CREATED);
}
}

View File

@@ -1,10 +1,7 @@
package io.kestra.runner.mysql;
import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
@TestInstance(Lifecycle.PER_CLASS)
public class MysqlRunnerRetryTest extends JdbcRunnerRetryTest {
}

View File

@@ -293,7 +293,7 @@ public class JdbcExecutor implements ExecutorInterface {
Await.until(() -> this.allFlows != null, Duration.ofMillis(100), Duration.ofMinutes(5));
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
this.receiveCancellations.addFirst(this.executionQueue.receiveBatch(
Executor.class,
executions -> {
List<CompletableFuture<Void>> futures = executions.stream()
@@ -302,7 +302,7 @@ public class JdbcExecutor implements ExecutorInterface {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
));
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receiveBatch(
Executor.class,
workerTaskResults -> {
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
@@ -1121,14 +1121,14 @@ public class JdbcExecutor implements ExecutorInterface {
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
if (cleanExecutionQueue && isTerminated) {
((JdbcQueue<Execution>) executionQueue).deleteByKey(executor.getExecution().getId());
executionQueue.deleteByKey(executor.getExecution().getId());
}
// emit for other consumers than the executor if no failure
if (hasFailure) {
this.executionQueue.emit(executor.getExecution());
} else {
((JdbcQueue<Execution>) this.executionQueue).emitOnly(null, executor.getExecution());
this.executionQueue.emitOnly(null, executor.getExecution());
}
Execution execution = executor.getExecution();
@@ -1206,8 +1206,8 @@ public class JdbcExecutor implements ExecutorInterface {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
workerTaskResultQueue.deleteByKeys(taskRunKeys);
workerJobQueue.deleteByKeys(taskRunKeys);
}
}
} catch (QueueException e) {

View File

@@ -39,10 +39,10 @@ import lombok.extern.slf4j.Slf4j;
@JdbcRunnerEnabled
public class JdbcIndexer implements Indexer {
private final LogRepositoryInterface logRepository;
private final JdbcQueue<LogEntry> logQueue;
private final QueueInterface<LogEntry> logQueue;
private final MetricRepositoryInterface metricRepository;
private final JdbcQueue<MetricEntry> metricQueue;
private final QueueInterface<MetricEntry> metricQueue;
private final MetricRegistry metricRegistry;
private final List<Runnable> receiveCancellations = new ArrayList<>();
@@ -67,9 +67,9 @@ public class JdbcIndexer implements Indexer {
QueueService queueService
) {
this.logRepository = logRepository;
this.logQueue = (JdbcQueue<LogEntry>) logQueue;
this.logQueue = logQueue;
this.metricRepository = metricRepositor;
this.metricQueue = (JdbcQueue<MetricEntry>) metricQueue;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
this.eventPublisher = eventPublisher;
this.skipExecutionService = skipExecutionService;
@@ -91,7 +91,7 @@ public class JdbcIndexer implements Indexer {
this.sendBatch(metricQueue, metricRepository);
}
protected <T> void sendBatch(JdbcQueue<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
protected <T> void sendBatch(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
// first, log all deserialization issues
eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage()));

View File

@@ -34,7 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
@KestraTest(rebuildContext = true)
@KestraTest
abstract public class AbstractSchedulerTest {
@Inject
protected ApplicationContext applicationContext;

View File

@@ -42,7 +42,6 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;
@TestInstance(Lifecycle.PER_CLASS)
public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected FlowListeners flowListenersService;
@@ -687,4 +686,4 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getNextExecutionDate().isAfter(lastTrigger.getNextExecutionDate().plusSeconds(10))).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(20));
}
}
}
}

View File

@@ -1,13 +1,31 @@
package io.kestra.core.junit.extensions;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.TestQueueFactory;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.TestRunner;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.test.annotation.MicronautTestValue;
import io.micronaut.test.context.TestContext;
import io.micronaut.test.extensions.junit5.MicronautJunit5Extension;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.platform.commons.support.AnnotationSupport;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeoutException;
@Slf4j
public class KestraTestExtension extends MicronautJunit5Extension {
private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(KestraTestExtension.class);
@@ -42,14 +60,18 @@ public class KestraTestExtension extends MicronautJunit5Extension {
}
@Override
public void beforeAll(ExtensionContext extensionContext) throws Exception {
super.beforeAll(extensionContext);
KestraTest kestraTest = extensionContext.getTestClass()
public void beforeTestExecution(ExtensionContext context) throws Exception {
super.beforeTestExecution(context);
TestQueueFactory.testExecutions.set(new ArrayList<>());
KestraTest kestraTest = context.getTestClass()
.orElseThrow()
.getAnnotation(KestraTest.class);
if (kestraTest.startRunner()){
if (kestraTest.startRunner()) {
TestRunner runner = applicationContext.getBean(TestRunner.class);
if (!runner.isRunning()){
if (!runner.isRunning()) {
runner.setSchedulerEnabled(kestraTest.startScheduler());
runner.setWorkerEnabled(kestraTest.startWorker());
runner.run();
@@ -62,5 +84,82 @@ public class KestraTestExtension extends MicronautJunit5Extension {
super.afterTestExecution(context);
TestsUtils.queueConsumersCleanup();
List<Execution> executionsToKill = TestQueueFactory.testExecutions.get();
if (!executionsToKill.isEmpty()
&& applicationContext.containsBean(ExecutionRepositoryInterface.class)
&& applicationContext.containsBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED))) {
ExecutionRepositoryInterface executionRepository = applicationContext.getBean(ExecutionRepositoryInterface.class);
QueueInterface<ExecutionKilled> killQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED));
KestraTest kestraTest = context.getTestClass()
.orElseThrow()
.getAnnotation(KestraTest.class);
// We only wait for KILLED state if the runner is started, otherwise we just emit the kill event and it may be processed upon starting a test with a runner
List<Execution> killedExecutions = retryingExecutionKill(executionsToKill, executionRepository, killQueue, 10, kestraTest.startRunner());
executionsToKill.removeIf(execution -> killedExecutions.stream().anyMatch(killedExecution ->
Objects.equals(execution.getTenantId(), killedExecution.getTenantId())
&& Objects.equals(execution.getId(), killedExecution.getId())
));
}
}
private List<Execution> retryingExecutionKill(List<Execution> testExecutions, ExecutionRepositoryInterface executionRepository, QueueInterface<ExecutionKilled> killQueue, int retriesLeft, boolean shouldWaitForKill) throws InterruptedException {
try {
List<Execution> runningExecutions = ListUtils.distinctByKey(
testExecutions.stream().flatMap(launchedExecution -> executionRepository.findById(launchedExecution.getTenantId(), launchedExecution.getId()).stream()).toList(),
Execution::getId
).stream().filter(inRepository -> !inRepository.getState().isTerminated()).toList();
runningExecutions.forEach(inRepository -> emitKillMessage(killQueue, inRepository));
if (shouldWaitForKill) {
try {
waitForKilled(executionRepository, runningExecutions);
} catch (TimeoutException e) {
log.warn("Some executions remained in KILLING", e);
}
}
return runningExecutions;
} catch (ConcurrentModificationException e) {
// We intentionally don't use a CopyOnWriteArrayList to retry on concurrent modification exceptions to make sure to get rid of flakiness due to overflowing executions
if (retriesLeft <= 0) {
log.warn("Couldn't kill executions after test execution, due to concurrent modifications, this could impact further tests", e);
return Collections.emptyList();
}
Thread.sleep(100);
return retryingExecutionKill(testExecutions, executionRepository, killQueue, retriesLeft - 1, shouldWaitForKill);
}
}
private void emitKillMessage(QueueInterface<ExecutionKilled> killQueue, Execution inRepository) {
log.warn("Execution {} is still running after test execution, killing it", inRepository.getId());
try {
killQueue.emit(ExecutionKilledExecution.builder()
.tenantId(inRepository.getTenantId())
.executionId(inRepository.getId())
.state(ExecutionKilled.State.REQUESTED)
.isOnKillCascade(true)
.build()
);
} catch (QueueException e) {
log.warn("Couldn't kill execution {} after test execution", inRepository.getId(), e);
}
}
private void waitForKilled(ExecutionRepositoryInterface executionRepository, List<Execution> runningExecutions) throws TimeoutException {
Await.until(() -> runningExecutions.stream()
.map(execution -> executionRepository.findById(execution.getTenantId(), execution.getId()))
.allMatch(maybeExecution -> maybeExecution.map(inRepository -> {
boolean terminated = inRepository.getState().isTerminated();
if (!terminated) {
log.warn("Execution {} has a pending KILL request but is still in state {} ", inRepository.getId(), inRepository.getState().getCurrent());
}
return terminated;
})
.orElse(true))
, Duration.ofMillis(50), Duration.ofSeconds(10));
}
}

View File

@@ -0,0 +1,43 @@
package io.kestra.core.queues;
import io.kestra.core.models.executions.Execution;
import io.micronaut.context.annotation.*;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.lang.reflect.Proxy;
import java.util.*;
@Factory
@Requires(bean = QueueFactoryInterface.class)
public class TestQueueFactory {
public static final InheritableThreadLocal<List<Execution>> testExecutions = new InheritableThreadLocal<>();
private QueueInterface<Execution> delegate;
public TestQueueFactory(QueueFactoryInterface queueFactoryInterface) {
this.delegate = queueFactoryInterface.execution();
}
@SuppressWarnings("unchecked")
@Singleton
@Replaces(named = QueueFactoryInterface.EXECUTION_NAMED)
@Named(QueueFactoryInterface.EXECUTION_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<Execution> execution() {
return (QueueInterface<Execution>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{QueueInterface.class}, (proxy, method, args) -> {
try {
if (method.getName().contains("emit")) {
Arrays.stream(args).filter(arg -> arg instanceof Execution).forEach(arg -> {
synchronized (testExecutions.get()) {
testExecutions.get().add((Execution) arg);
}
});
}
return method.invoke(this.delegate, args);
} catch (Exception e) {
throw Optional.ofNullable(e.getCause()).orElse(e);
}
});
}
}