Compare commits

...

4 Commits

Author SHA1 Message Date
brian.mulier
811af5c5a1 WIP 2025-12-22 12:10:41 +01:00
brian.mulier
1b73ddd097 WIP 2025-12-22 12:10:41 +01:00
brian.mulier
9803ecc6d0 fix(core): close FlowListeners properly 2025-12-22 12:10:41 +01:00
brian.mulier
b4871fcb15 fix(tests): remove test UncaughtExceptionHandler 2025-12-22 12:10:41 +01:00
15 changed files with 161 additions and 182 deletions

View File

@@ -24,6 +24,9 @@ dependencies {
// reactor
api "io.projectreactor:reactor-core"
// awaitility
api 'org.awaitility:awaitility'
// micronaut
api "io.micronaut.data:micronaut-data-model"
implementation "io.micronaut:micronaut-http-server-netty"

View File

@@ -5,6 +5,8 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Bean;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.queues.QueueFactoryInterface;
@@ -15,7 +17,6 @@ import io.kestra.core.services.FlowListenersInterface;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -26,15 +27,14 @@ import jakarta.inject.Singleton;
@Singleton
@Slf4j
public class FlowListeners implements FlowListenersInterface {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final QueueInterface<FlowInterface> flowQueue;
private final List<FlowWithSource> flows;
private final List<Consumer<List<FlowWithSource>>> consumers = new ArrayList<>();
private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new ArrayList<>();
private final PluginDefaultService pluginDefaultService;
private Runnable queueListenerCancellation;
@Inject
public FlowListeners(
FlowRepositoryInterface flowRepository,
@@ -49,8 +49,9 @@ public class FlowListeners implements FlowListenersInterface {
@Override
public void run() {
synchronized (this) {
if (this.isStarted.compareAndSet(false, true)) {
this.flowQueue.receive(either -> {
if (queueListenerCancellation == null) {
log.info("STARTING FLOW LISTENER: {}", this);
queueListenerCancellation = this.flowQueue.receive(either -> {
FlowWithSource flow;
if (either.isRight()) {
flow = FlowWithException.from(either.getRight().getRecord(), either.getRight(), log).orElse(null);
@@ -154,4 +155,18 @@ public class FlowListeners implements FlowListenersInterface {
// we forced a deep clone to avoid concurrency where instance are changed during iteration (especially scheduler).
return new ArrayList<>(this.flows);
}
@PreDestroy
@Override
public void close() throws Exception {
synchronized (this) {
boolean b = queueListenerCancellation != null;
log.info("THREAD STACKTRACE: {}", (Object) Thread.currentThread().getStackTrace());
log.info("LISTENER NOT NULL : {}", b);
log.info("LISTENER THIS : {}", this);
if (b) {
queueListenerCancellation.run();
}
}
}
}

View File

@@ -6,7 +6,7 @@ import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public interface FlowListenersInterface {
public interface FlowListenersInterface extends AutoCloseable {
void run();
void listen(Consumer<List<FlowWithSource>> consumer);

View File

@@ -11,21 +11,17 @@ public final class ThreadUncaughtExceptionHandler implements UncaughtExceptionHa
@Override
public void uncaughtException(Thread t, Throwable e) {
boolean isTest = KestraContext.getContext().getEnvironments().contains("test");
try {
// cannot use FormattingLogger due to a dependency loop
log.error("Caught an exception in {}. {}", t, isTest ? "Keeping it running for test." : "Shutting down.", e);
log.error("Caught an exception in {}. Shutting down.", t, e);
} catch (Throwable errorInLogging) {
// If logging fails, e.g. due to missing memory, at least try to log the
// message and the cause for the failed logging.
System.err.println(e.getMessage());
System.err.println(errorInLogging.getMessage());
} finally {
if (!isTest) {
KestraContext.getContext().shutdown();
Runtime.getRuntime().exit(1);
}
KestraContext.getContext().shutdown();
Runtime.getRuntime().exit(1);
}
}
}

View File

@@ -1,25 +1,25 @@
package io.kestra.core.runners;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.property.Property;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -27,6 +27,9 @@ abstract public class FlowListenersTest {
@Inject
protected FlowRepositoryInterface flowRepository;
@Inject
protected FlowListenersInterface flowListenersService;
protected static FlowWithSource create(String tenantId, String flowId, String taskId) {
FlowWithSource flow = FlowWithSource.builder()
.id(flowId)
@@ -44,60 +47,56 @@ abstract public class FlowListenersTest {
private static final Logger LOG = LoggerFactory.getLogger(FlowListenersTest.class);
public void suite(FlowListenersInterface flowListenersService) throws TimeoutException {
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
flowListenersService.run();
@Test
public void all() throws Exception {
FlowListenersInterface finalFlowListenersService = flowListenersService;
try (finalFlowListenersService) {
finalFlowListenersService.run();
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
AtomicInteger count = new AtomicInteger();
AtomicInteger count = new AtomicInteger();
flowListenersService.listen(flows -> count.set(getFlowsForTenant(flowListenersService, tenant).size()));
flowListenersService.listen(flows -> count.set(getFlowsForTenant(flowListenersService, tenant).size()));
// initial state
LOG.info("-----------> wait for zero");
Await.until(() -> count.get() == 0, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isZero();
// resend on startup done for kafka
LOG.info("-----------> wait for zero kafka");
if (flowListenersService.getClass().getName().equals("io.kestra.ee.runner.kafka.KafkaFlowListeners")) {
// initial state
LOG.info("-----------> wait for zero");
Await.until(() -> count.get() == 0, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isZero();
// create first
LOG.info("-----------> create first flow");
FlowWithSource first = create(tenant, "first_" + IdUtils.create(), "test");
FlowWithSource firstUpdated = create(tenant, first.getId(), "test2");
flowRepository.create(GenericFlow.of(first));
Await.until(() -> "Expected to have 1 flow but got " + count.get(), () -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
// create the same id than first, no additional flows
first = flowRepository.update(GenericFlow.of(firstUpdated), first);
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
FlowWithSource second = create(tenant, "second_" + IdUtils.create(), "test");
// create a new one
flowRepository.create(GenericFlow.of(second));
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
// delete first
FlowWithSource deleted = flowRepository.delete(first);
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
// restore must works
flowRepository.create(GenericFlow.of(first));
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
}
// create first
LOG.info("-----------> create fist flow");
FlowWithSource first = create(tenant, "first_" + IdUtils.create(), "test");
FlowWithSource firstUpdated = create(tenant, first.getId(), "test2");
flowRepository.create(GenericFlow.of(first));
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
// create the same id than first, no additional flows
first = flowRepository.update(GenericFlow.of(firstUpdated), first);
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
FlowWithSource second = create(tenant, "second_" + IdUtils.create(), "test");
// create a new one
flowRepository.create(GenericFlow.of(second));
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
// delete first
FlowWithSource deleted = flowRepository.delete(first);
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
// restore must works
flowRepository.create(GenericFlow.of(first));
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
}
public List<FlowWithSource> getFlowsForTenant(FlowListenersInterface flowListenersService, String tenantId){
public List<FlowWithSource> getFlowsForTenant(FlowListenersInterface flowListenersService, String tenantId) {
return flowListenersService.flows().stream()
.filter(f -> tenantId.equals(f.getTenantId()))
.toList();

View File

@@ -44,6 +44,6 @@ public class SleepTrigger extends AbstractTrigger implements PollingTriggerInter
@Override
public Duration getInterval() {
return null;
return Duration.ofSeconds(1);
}
}

View File

@@ -1,32 +1,6 @@
package io.kestra.runner.h2;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.FlowListenersTest;
import io.kestra.core.services.PluginDefaultService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
class H2FlowListenersTest extends FlowListenersTest {
@Inject
FlowRepositoryInterface flowRepository;
@Inject
@Named(QueueFactoryInterface.FLOW_NAMED)
QueueInterface<FlowInterface> flowQueue;
@Inject
PluginDefaultService pluginDefaultService;
@Test
public void all() throws TimeoutException {
// we don't inject FlowListeners to remove a flaky test
this.suite(new FlowListeners(flowRepository, flowQueue, pluginDefaultService));
}
}
}

View File

@@ -1,17 +1,6 @@
package io.kestra.runner.mysql;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.FlowListenersTest;
import jakarta.inject.Inject;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
class MysqlFlowListenersTest extends FlowListenersTest {
@Inject
FlowListeners flowListenersService;
@Test
public void all() throws TimeoutException {
this.suite(flowListenersService);
}
}
}

View File

@@ -1,17 +1,6 @@
package io.kestra.schedulers.mysql;
import io.kestra.core.runners.FlowListeners;
import io.kestra.jdbc.runner.JdbcScheduler;
import io.kestra.scheduler.AbstractScheduler;
import io.kestra.scheduler.SchedulerExecutionStateInterface;
import io.kestra.scheduler.SchedulerScheduleTest;
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
);
}
}

View File

@@ -1,17 +1,6 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.FlowListenersTest;
import jakarta.inject.Inject;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
class PostgresFlowListenersTest extends FlowListenersTest {
@Inject
FlowListeners flowListenersService;
@Test
public void all() throws TimeoutException {
this.suite(flowListenersService);
}
}
}

View File

@@ -1,17 +1,6 @@
package io.kestra.schedulers.postgres;
import io.kestra.core.runners.FlowListeners;
import io.kestra.jdbc.runner.JdbcScheduler;
import io.kestra.scheduler.AbstractScheduler;
import io.kestra.scheduler.SchedulerExecutionStateInterface;
import io.kestra.scheduler.SchedulerScheduleTest;
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
);
}
}

View File

@@ -1056,7 +1056,9 @@ public class JdbcExecutor implements ExecutorInterface {
log.error("Unable to kill the execution {}", killedExecution.getExecutionId(), e);
}
log.error("KILLING FROM EXECUTOR");
Executor executor = killingOrAfterKillState(killedExecution.getExecutionId(), Optional.ofNullable(killedExecution.getExecutionState()));
log.error("KILLED FROM EXECUTOR");
// Check whether kill event should be propagated to downstream executions.
// By default, always propagate the ExecutionKill to sub-flows (for backward compatibility).
@@ -1083,20 +1085,24 @@ public class JdbcExecutor implements ExecutorInterface {
}
private Executor killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
return executionRepository.lock(executionId, pair -> {
Execution currentExecution = pair.getLeft();
FlowInterface flow = flowMetaStore.findByExecution(currentExecution).orElseThrow();
try {
return executionRepository.lock(executionId, pair -> {
Execution currentExecution = pair.getLeft();
FlowInterface flow = flowMetaStore.findByExecution(currentExecution).orElseThrow();
// remove it from the queued store if it was queued so it would not be restarted
if (currentExecution.getState().isQueued()) {
executionQueuedStorage.remove(currentExecution);
}
// remove it from the queued store if it was queued so it would not be restarted
if (currentExecution.getState().isQueued()) {
executionQueuedStorage.remove(currentExecution);
}
Execution killing = executionService.kill(currentExecution, flow, afterKillState);
Executor current = new Executor(currentExecution, null)
.withExecution(killing, "joinKillingExecution");
return Pair.of(current, pair.getRight());
});
Execution killing = executionService.kill(currentExecution, flow, afterKillState);
Executor current = new Executor(currentExecution, null)
.withExecution(killing, "joinKillingExecution");
return Pair.of(current, pair.getRight());
});
} catch (Exception e) {
throw new RuntimeException("ISSUE WHILE KILLING EXECUTION " + executionId, e);
}
}
private void toExecution(Executor executor) {

View File

@@ -23,6 +23,7 @@ import io.micronaut.context.annotation.Value;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.exception.DataException;
@@ -301,20 +302,27 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
Timer timer = this.metricRegistry
.timer(MetricRegistry.METRIC_QUEUE_RECEIVE_DURATION, MetricRegistry.METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION, tags);
StackTraceElement[] parentStackTrace = Thread.currentThread().getStackTrace();
return this.poll(() -> timer.record(() -> {
Result<Record> fetch = dslContextWrapper.transactionResult(configuration -> {
DSLContext ctx = DSL.using(configuration);
Result<Record> fetch;
try {
fetch = dslContextWrapper.transactionResult(configuration -> {
DSLContext ctx = DSL.using(configuration);
Result<Record> result = this.receiveFetch(ctx, consumerGroup, maxOffset.get(), forUpdate);
Result<Record> result = this.receiveFetch(ctx, consumerGroup, maxOffset.get(), forUpdate);
if (!result.isEmpty()) {
List<Integer> offsets = result.map(record -> record.get("offset", Integer.class));
if (!result.isEmpty()) {
List<Integer> offsets = result.map(record -> record.get("offset", Integer.class));
maxOffset.set(offsets.getLast());
}
maxOffset.set(offsets.getLast());
}
return result;
});
return result;
});
} catch (Exception e) {
log.error("Error while receiving messages from JDBC queue. Thread stacktrace: {}", parentStackTrace, e);
throw e;
}
this.send(fetch, consumer);
@@ -426,13 +434,14 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
@SuppressWarnings("BusyWait")
protected Runnable poll(Supplier<Integer> runnable) {
AtomicBoolean queriedToStop = new AtomicBoolean(false);
AtomicBoolean running = new AtomicBoolean(true);
poolExecutor.execute(() -> {
List<Configuration.Step> steps = configuration.computeSteps();
Duration sleep = configuration.minPollInterval;
ZonedDateTime lastPoll = ZonedDateTime.now();
while (running.get() && !this.isClosed.get()) {
while (!queriedToStop.get() && !this.isClosed.get()) {
if (!this.isPaused.get()) {
try {
Integer count = runnable.get();
@@ -471,9 +480,21 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
throw new RuntimeException(e);
}
}
running.set(false);
});
return () -> running.set(false);
return () -> {
queriedToStop.set(true);
try {
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofMillis(10))
.until(() -> !running.get());
} catch (Exception e) {
log.warn("Error while stopping polling", e);
}
};
}
protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {

View File

@@ -19,6 +19,7 @@ import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
@@ -59,7 +60,7 @@ public class JdbcScheduler extends AbstractScheduler {
});
// No-op consumption of the trigger queue, so the events are purged from the queue
this.triggerQueue.receive(Scheduler.class, trigger -> { });
this.receiveCancellations.add(this.triggerQueue.receive(Scheduler.class, trigger -> { }));
}
@Override

View File

@@ -25,6 +25,7 @@ import io.kestra.core.runners.FlowListeners;
import io.kestra.core.utils.Await;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -85,11 +86,18 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.truncatedTo(ChronoUnit.HOURS);
}
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
);
) {
@SneakyThrows
@Override
public void close() {
super.close();
flowListenersServiceSpy.close();
}
};
}
@BeforeEach
@@ -139,7 +147,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
// wait for execution
Flux<Execution> receiveExecutions = TestsUtils.receive(executionQueue, throwConsumer(either -> {
Execution execution = either.getLeft();
@@ -200,7 +208,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
scheduler.run();
Await.until(() -> {
@@ -235,7 +243,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
@@ -281,7 +289,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
@@ -326,7 +334,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(lastTrigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
scheduler.run();
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
@@ -358,7 +366,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.build();
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
scheduler.run();
Await.until(() -> {
@@ -422,7 +430,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
scheduler.run();
// Wait 3s to see if things happen
@@ -462,7 +470,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(2);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, throwConsumer(either -> {
Execution execution = either.getLeft();
@@ -522,7 +530,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
@@ -635,7 +643,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, throwConsumer(either -> {
Execution execution = either.getLeft();
@@ -708,7 +716,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, throwConsumer(either -> {
Execution execution = either.getLeft();
@@ -741,4 +749,4 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getNextExecutionDate().isAfter(lastTrigger.getNextExecutionDate().plusSeconds(10))).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(20));
}
}
}
}