mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
4 Commits
run-develo
...
fix/flaky-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3e858ffb8 | ||
|
|
94fa411b2b | ||
|
|
91486169e1 | ||
|
|
0ca9a90ebf |
@@ -3,6 +3,7 @@ package io.kestra.core.queues;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.models.Pauseable;
|
||||
import io.kestra.core.utils.Either;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
@@ -54,4 +55,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
}
|
||||
|
||||
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
|
||||
|
||||
default void deleteByKey(String key) throws QueueException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
default void deleteByKeys(List<String> keys) throws QueueException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
default void emitOnly(String consumerGroup, T message) throws QueueException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import io.kestra.core.services.FlowListenersInterface;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
@@ -30,8 +31,8 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
private final AtomicBoolean isStarted = new AtomicBoolean(false);
|
||||
private final QueueInterface<FlowInterface> flowQueue;
|
||||
private final List<FlowWithSource> flows;
|
||||
private final List<Consumer<List<FlowWithSource>>> consumers = new ArrayList<>();
|
||||
private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new ArrayList<>();
|
||||
private final List<Consumer<List<FlowWithSource>>> consumers = new CopyOnWriteArrayList<>();
|
||||
private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final PluginDefaultService pluginDefaultService;
|
||||
|
||||
|
||||
@@ -2,6 +2,10 @@ package io.kestra.core.utils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class ListUtils {
|
||||
public static <T> List<T> emptyOnNull(List<T> list) {
|
||||
@@ -71,4 +75,13 @@ public class ListUtils {
|
||||
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> Predicate<T> distinctByKeyPredicate(Function<? super T,Object> keyExtractor) {
|
||||
Map<Object,Boolean> seen = new ConcurrentHashMap<>();
|
||||
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
|
||||
}
|
||||
|
||||
public static <T> List<T> distinctByKey(List<T> withDuplicates, Function<? super T, Object> keyExtractor) {
|
||||
return withDuplicates.stream().filter(distinctByKeyPredicate(keyExtractor)).toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,13 +14,10 @@ import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.junit.jupiter.api.TestInstance.Lifecycle;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@TestInstance(Lifecycle.PER_CLASS)
|
||||
class DateFilterTest {
|
||||
public static final ZonedDateTime NOW = ZonedDateTime.parse("2013-09-08T16:19:12.123456+01");
|
||||
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@@ -52,4 +56,19 @@ class ListUtilsTest {
|
||||
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString("not a list"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString(List.of(1, 2, 3)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void distinctByKey() {
|
||||
List<Execution> executions = List.of(
|
||||
Execution.builder().id("1").flowId("flow1").state(State.of(State.Type.CREATED, Collections.emptyList())).build(),
|
||||
Execution.builder().id("1").flowId("flow1").state(State.of(State.Type.RUNNING, List.of(new State.History(State.Type.CREATED, Instant.now().minus(Duration.ofSeconds(2)))))).build(),
|
||||
Execution.builder().id("2").flowId("flow2").build()
|
||||
);
|
||||
|
||||
List<Execution> distinctExecutions = ListUtils.distinctByKey(executions, Execution::getId);
|
||||
|
||||
assertThat(distinctExecutions.size()).isEqualTo(2);
|
||||
assertThat(distinctExecutions.stream().map(Execution::getId)).containsExactlyInAnyOrder("1", "2");
|
||||
assertThat(distinctExecutions.stream().filter(e -> e.getId().equals("1")).findFirst().get().getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.junit.jupiter.api.TestInstance.Lifecycle;
|
||||
|
||||
@TestInstance(Lifecycle.PER_CLASS)
|
||||
public class MysqlRunnerRetryTest extends JdbcRunnerRetryTest {
|
||||
|
||||
}
|
||||
|
||||
@@ -293,7 +293,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
|
||||
Await.until(() -> this.allFlows != null, Duration.ofMillis(100), Duration.ofMinutes(5));
|
||||
|
||||
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
|
||||
this.receiveCancellations.addFirst(this.executionQueue.receiveBatch(
|
||||
Executor.class,
|
||||
executions -> {
|
||||
List<CompletableFuture<Void>> futures = executions.stream()
|
||||
@@ -302,7 +302,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
}
|
||||
));
|
||||
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
|
||||
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receiveBatch(
|
||||
Executor.class,
|
||||
workerTaskResults -> {
|
||||
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
|
||||
@@ -1121,14 +1121,14 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
|
||||
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
|
||||
if (cleanExecutionQueue && isTerminated) {
|
||||
((JdbcQueue<Execution>) executionQueue).deleteByKey(executor.getExecution().getId());
|
||||
executionQueue.deleteByKey(executor.getExecution().getId());
|
||||
}
|
||||
|
||||
// emit for other consumers than the executor if no failure
|
||||
if (hasFailure) {
|
||||
this.executionQueue.emit(executor.getExecution());
|
||||
} else {
|
||||
((JdbcQueue<Execution>) this.executionQueue).emitOnly(null, executor.getExecution());
|
||||
this.executionQueue.emitOnly(null, executor.getExecution());
|
||||
}
|
||||
|
||||
Execution execution = executor.getExecution();
|
||||
@@ -1206,8 +1206,8 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
|
||||
.map(taskRun -> taskRun.getId())
|
||||
.toList();
|
||||
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
|
||||
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
|
||||
workerTaskResultQueue.deleteByKeys(taskRunKeys);
|
||||
workerJobQueue.deleteByKeys(taskRunKeys);
|
||||
}
|
||||
}
|
||||
} catch (QueueException e) {
|
||||
|
||||
@@ -39,10 +39,10 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@JdbcRunnerEnabled
|
||||
public class JdbcIndexer implements Indexer {
|
||||
private final LogRepositoryInterface logRepository;
|
||||
private final JdbcQueue<LogEntry> logQueue;
|
||||
private final QueueInterface<LogEntry> logQueue;
|
||||
|
||||
private final MetricRepositoryInterface metricRepository;
|
||||
private final JdbcQueue<MetricEntry> metricQueue;
|
||||
private final QueueInterface<MetricEntry> metricQueue;
|
||||
private final MetricRegistry metricRegistry;
|
||||
private final List<Runnable> receiveCancellations = new ArrayList<>();
|
||||
|
||||
@@ -67,9 +67,9 @@ public class JdbcIndexer implements Indexer {
|
||||
QueueService queueService
|
||||
) {
|
||||
this.logRepository = logRepository;
|
||||
this.logQueue = (JdbcQueue<LogEntry>) logQueue;
|
||||
this.logQueue = logQueue;
|
||||
this.metricRepository = metricRepositor;
|
||||
this.metricQueue = (JdbcQueue<MetricEntry>) metricQueue;
|
||||
this.metricQueue = metricQueue;
|
||||
this.metricRegistry = metricRegistry;
|
||||
this.eventPublisher = eventPublisher;
|
||||
this.skipExecutionService = skipExecutionService;
|
||||
@@ -91,7 +91,7 @@ public class JdbcIndexer implements Indexer {
|
||||
this.sendBatch(metricQueue, metricRepository);
|
||||
}
|
||||
|
||||
protected <T> void sendBatch(JdbcQueue<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
|
||||
protected <T> void sendBatch(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
|
||||
this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
|
||||
// first, log all deserialization issues
|
||||
eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage()));
|
||||
|
||||
@@ -34,7 +34,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@KestraTest(rebuildContext = true)
|
||||
@KestraTest
|
||||
abstract public class AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected ApplicationContext applicationContext;
|
||||
|
||||
@@ -42,7 +42,6 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@TestInstance(Lifecycle.PER_CLASS)
|
||||
public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected FlowListeners flowListenersService;
|
||||
@@ -687,4 +686,4 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getNextExecutionDate().isAfter(lastTrigger.getNextExecutionDate().plusSeconds(10))).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(20));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,31 @@
|
||||
package io.kestra.core.junit.extensions;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKilled;
|
||||
import io.kestra.core.models.executions.ExecutionKilledExecution;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.queues.TestQueueFactory;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.TestRunner;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||
import io.micronaut.test.annotation.MicronautTestValue;
|
||||
import io.micronaut.test.context.TestContext;
|
||||
import io.micronaut.test.extensions.junit5.MicronautJunit5Extension;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.junit.platform.commons.support.AnnotationSupport;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@Slf4j
|
||||
public class KestraTestExtension extends MicronautJunit5Extension {
|
||||
private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(KestraTestExtension.class);
|
||||
|
||||
@@ -42,14 +60,18 @@ public class KestraTestExtension extends MicronautJunit5Extension {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeAll(ExtensionContext extensionContext) throws Exception {
|
||||
super.beforeAll(extensionContext);
|
||||
KestraTest kestraTest = extensionContext.getTestClass()
|
||||
public void beforeTestExecution(ExtensionContext context) throws Exception {
|
||||
super.beforeTestExecution(context);
|
||||
|
||||
TestQueueFactory.testExecutions.set(new ArrayList<>());
|
||||
|
||||
KestraTest kestraTest = context.getTestClass()
|
||||
.orElseThrow()
|
||||
.getAnnotation(KestraTest.class);
|
||||
if (kestraTest.startRunner()){
|
||||
|
||||
if (kestraTest.startRunner()) {
|
||||
TestRunner runner = applicationContext.getBean(TestRunner.class);
|
||||
if (!runner.isRunning()){
|
||||
if (!runner.isRunning()) {
|
||||
runner.setSchedulerEnabled(kestraTest.startScheduler());
|
||||
runner.setWorkerEnabled(kestraTest.startWorker());
|
||||
runner.run();
|
||||
@@ -62,5 +84,82 @@ public class KestraTestExtension extends MicronautJunit5Extension {
|
||||
super.afterTestExecution(context);
|
||||
|
||||
TestsUtils.queueConsumersCleanup();
|
||||
|
||||
List<Execution> executionsToKill = TestQueueFactory.testExecutions.get();
|
||||
if (!executionsToKill.isEmpty()
|
||||
&& applicationContext.containsBean(ExecutionRepositoryInterface.class)
|
||||
&& applicationContext.containsBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED))) {
|
||||
ExecutionRepositoryInterface executionRepository = applicationContext.getBean(ExecutionRepositoryInterface.class);
|
||||
QueueInterface<ExecutionKilled> killQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED));
|
||||
|
||||
KestraTest kestraTest = context.getTestClass()
|
||||
.orElseThrow()
|
||||
.getAnnotation(KestraTest.class);
|
||||
// We only wait for KILLED state if the runner is started, otherwise we just emit the kill event and it may be processed upon starting a test with a runner
|
||||
List<Execution> killedExecutions = retryingExecutionKill(executionsToKill, executionRepository, killQueue, 10, kestraTest.startRunner());
|
||||
|
||||
executionsToKill.removeIf(execution -> killedExecutions.stream().anyMatch(killedExecution ->
|
||||
Objects.equals(execution.getTenantId(), killedExecution.getTenantId())
|
||||
&& Objects.equals(execution.getId(), killedExecution.getId())
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private List<Execution> retryingExecutionKill(List<Execution> testExecutions, ExecutionRepositoryInterface executionRepository, QueueInterface<ExecutionKilled> killQueue, int retriesLeft, boolean shouldWaitForKill) throws InterruptedException {
|
||||
try {
|
||||
List<Execution> runningExecutions = ListUtils.distinctByKey(
|
||||
testExecutions.stream().flatMap(launchedExecution -> executionRepository.findById(launchedExecution.getTenantId(), launchedExecution.getId()).stream()).toList(),
|
||||
Execution::getId
|
||||
).stream().filter(inRepository -> !inRepository.getState().isTerminated()).toList();
|
||||
|
||||
runningExecutions.forEach(inRepository -> emitKillMessage(killQueue, inRepository));
|
||||
|
||||
if (shouldWaitForKill) {
|
||||
try {
|
||||
waitForKilled(executionRepository, runningExecutions);
|
||||
} catch (TimeoutException e) {
|
||||
log.warn("Some executions remained in KILLING", e);
|
||||
}
|
||||
}
|
||||
return runningExecutions;
|
||||
} catch (ConcurrentModificationException e) {
|
||||
// We intentionally don't use a CopyOnWriteArrayList to retry on concurrent modification exceptions to make sure to get rid of flakiness due to overflowing executions
|
||||
if (retriesLeft <= 0) {
|
||||
log.warn("Couldn't kill executions after test execution, due to concurrent modifications, this could impact further tests", e);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
Thread.sleep(100);
|
||||
return retryingExecutionKill(testExecutions, executionRepository, killQueue, retriesLeft - 1, shouldWaitForKill);
|
||||
}
|
||||
}
|
||||
|
||||
private void emitKillMessage(QueueInterface<ExecutionKilled> killQueue, Execution inRepository) {
|
||||
log.warn("Execution {} is still running after test execution, killing it", inRepository.getId());
|
||||
try {
|
||||
killQueue.emit(ExecutionKilledExecution.builder()
|
||||
.tenantId(inRepository.getTenantId())
|
||||
.executionId(inRepository.getId())
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.isOnKillCascade(true)
|
||||
.build()
|
||||
);
|
||||
} catch (QueueException e) {
|
||||
log.warn("Couldn't kill execution {} after test execution", inRepository.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForKilled(ExecutionRepositoryInterface executionRepository, List<Execution> runningExecutions) throws TimeoutException {
|
||||
Await.until(() -> runningExecutions.stream()
|
||||
.map(execution -> executionRepository.findById(execution.getTenantId(), execution.getId()))
|
||||
.allMatch(maybeExecution -> maybeExecution.map(inRepository -> {
|
||||
boolean terminated = inRepository.getState().isTerminated();
|
||||
if (!terminated) {
|
||||
log.warn("Execution {} has a pending KILL request but is still in state {} ", inRepository.getId(), inRepository.getState().getCurrent());
|
||||
}
|
||||
return terminated;
|
||||
})
|
||||
.orElse(true))
|
||||
, Duration.ofMillis(50), Duration.ofSeconds(10));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.micronaut.context.annotation.*;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.*;
|
||||
|
||||
@Factory
|
||||
@Requires(bean = QueueFactoryInterface.class)
|
||||
public class TestQueueFactory {
|
||||
public static final InheritableThreadLocal<List<Execution>> testExecutions = new InheritableThreadLocal<>();
|
||||
|
||||
private QueueInterface<Execution> delegate;
|
||||
|
||||
public TestQueueFactory(QueueFactoryInterface queueFactoryInterface) {
|
||||
this.delegate = queueFactoryInterface.execution();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Singleton
|
||||
@Replaces(named = QueueFactoryInterface.EXECUTION_NAMED)
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<Execution> execution() {
|
||||
return (QueueInterface<Execution>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{QueueInterface.class}, (proxy, method, args) -> {
|
||||
try {
|
||||
if (method.getName().contains("emit")) {
|
||||
Arrays.stream(args).filter(arg -> arg instanceof Execution).forEach(arg -> {
|
||||
synchronized (testExecutions.get()) {
|
||||
testExecutions.get().add((Execution) arg);
|
||||
}
|
||||
});
|
||||
}
|
||||
return method.invoke(this.delegate, args);
|
||||
} catch (Exception e) {
|
||||
throw Optional.ofNullable(e.getCause()).orElse(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user