mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
Compare commits
4 Commits
feat/agent
...
fix/remove
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
811af5c5a1 | ||
|
|
1b73ddd097 | ||
|
|
9803ecc6d0 | ||
|
|
b4871fcb15 |
@@ -24,6 +24,9 @@ dependencies {
|
||||
// reactor
|
||||
api "io.projectreactor:reactor-core"
|
||||
|
||||
// awaitility
|
||||
api 'org.awaitility:awaitility'
|
||||
|
||||
// micronaut
|
||||
api "io.micronaut.data:micronaut-data-model"
|
||||
implementation "io.micronaut:micronaut-http-server-netty"
|
||||
|
||||
@@ -5,6 +5,8 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.micronaut.context.annotation.Bean;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
@@ -15,7 +17,6 @@ import io.kestra.core.services.FlowListenersInterface;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@@ -26,15 +27,14 @@ import jakarta.inject.Singleton;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class FlowListeners implements FlowListenersInterface {
|
||||
|
||||
private final AtomicBoolean isStarted = new AtomicBoolean(false);
|
||||
private final QueueInterface<FlowInterface> flowQueue;
|
||||
private final List<FlowWithSource> flows;
|
||||
private final List<Consumer<List<FlowWithSource>>> consumers = new ArrayList<>();
|
||||
private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new ArrayList<>();
|
||||
|
||||
private final PluginDefaultService pluginDefaultService;
|
||||
|
||||
private Runnable queueListenerCancellation;
|
||||
|
||||
@Inject
|
||||
public FlowListeners(
|
||||
FlowRepositoryInterface flowRepository,
|
||||
@@ -49,8 +49,9 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (this) {
|
||||
if (this.isStarted.compareAndSet(false, true)) {
|
||||
this.flowQueue.receive(either -> {
|
||||
if (queueListenerCancellation == null) {
|
||||
log.info("STARTING FLOW LISTENER: {}", this);
|
||||
queueListenerCancellation = this.flowQueue.receive(either -> {
|
||||
FlowWithSource flow;
|
||||
if (either.isRight()) {
|
||||
flow = FlowWithException.from(either.getRight().getRecord(), either.getRight(), log).orElse(null);
|
||||
@@ -154,4 +155,18 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
// we forced a deep clone to avoid concurrency where instance are changed during iteration (especially scheduler).
|
||||
return new ArrayList<>(this.flows);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
synchronized (this) {
|
||||
boolean b = queueListenerCancellation != null;
|
||||
log.info("THREAD STACKTRACE: {}", (Object) Thread.currentThread().getStackTrace());
|
||||
log.info("LISTENER NOT NULL : {}", b);
|
||||
log.info("LISTENER THIS : {}", this);
|
||||
if (b) {
|
||||
queueListenerCancellation.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface FlowListenersInterface {
|
||||
public interface FlowListenersInterface extends AutoCloseable {
|
||||
void run();
|
||||
|
||||
void listen(Consumer<List<FlowWithSource>> consumer);
|
||||
|
||||
@@ -11,21 +11,17 @@ public final class ThreadUncaughtExceptionHandler implements UncaughtExceptionHa
|
||||
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
boolean isTest = KestraContext.getContext().getEnvironments().contains("test");
|
||||
|
||||
try {
|
||||
// cannot use FormattingLogger due to a dependency loop
|
||||
log.error("Caught an exception in {}. {}", t, isTest ? "Keeping it running for test." : "Shutting down.", e);
|
||||
log.error("Caught an exception in {}. Shutting down.", t, e);
|
||||
} catch (Throwable errorInLogging) {
|
||||
// If logging fails, e.g. due to missing memory, at least try to log the
|
||||
// message and the cause for the failed logging.
|
||||
System.err.println(e.getMessage());
|
||||
System.err.println(errorInLogging.getMessage());
|
||||
} finally {
|
||||
if (!isTest) {
|
||||
KestraContext.getContext().shutdown();
|
||||
Runtime.getRuntime().exit(1);
|
||||
}
|
||||
KestraContext.getContext().shutdown();
|
||||
Runtime.getRuntime().exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,25 +1,25 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -27,6 +27,9 @@ abstract public class FlowListenersTest {
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
protected FlowListenersInterface flowListenersService;
|
||||
|
||||
protected static FlowWithSource create(String tenantId, String flowId, String taskId) {
|
||||
FlowWithSource flow = FlowWithSource.builder()
|
||||
.id(flowId)
|
||||
@@ -44,60 +47,56 @@ abstract public class FlowListenersTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FlowListenersTest.class);
|
||||
|
||||
public void suite(FlowListenersInterface flowListenersService) throws TimeoutException {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
flowListenersService.run();
|
||||
@Test
|
||||
public void all() throws Exception {
|
||||
FlowListenersInterface finalFlowListenersService = flowListenersService;
|
||||
try (finalFlowListenersService) {
|
||||
finalFlowListenersService.run();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
|
||||
flowListenersService.listen(flows -> count.set(getFlowsForTenant(flowListenersService, tenant).size()));
|
||||
flowListenersService.listen(flows -> count.set(getFlowsForTenant(flowListenersService, tenant).size()));
|
||||
|
||||
// initial state
|
||||
LOG.info("-----------> wait for zero");
|
||||
Await.until(() -> count.get() == 0, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isZero();
|
||||
|
||||
// resend on startup done for kafka
|
||||
LOG.info("-----------> wait for zero kafka");
|
||||
if (flowListenersService.getClass().getName().equals("io.kestra.ee.runner.kafka.KafkaFlowListeners")) {
|
||||
// initial state
|
||||
LOG.info("-----------> wait for zero");
|
||||
Await.until(() -> count.get() == 0, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isZero();
|
||||
|
||||
// create first
|
||||
LOG.info("-----------> create first flow");
|
||||
FlowWithSource first = create(tenant, "first_" + IdUtils.create(), "test");
|
||||
FlowWithSource firstUpdated = create(tenant, first.getId(), "test2");
|
||||
|
||||
|
||||
flowRepository.create(GenericFlow.of(first));
|
||||
Await.until(() -> "Expected to have 1 flow but got " + count.get(), () -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
// create the same id than first, no additional flows
|
||||
first = flowRepository.update(GenericFlow.of(firstUpdated), first);
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
FlowWithSource second = create(tenant, "second_" + IdUtils.create(), "test");
|
||||
// create a new one
|
||||
flowRepository.create(GenericFlow.of(second));
|
||||
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
|
||||
|
||||
// delete first
|
||||
FlowWithSource deleted = flowRepository.delete(first);
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
// restore must works
|
||||
flowRepository.create(GenericFlow.of(first));
|
||||
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
// create first
|
||||
LOG.info("-----------> create fist flow");
|
||||
FlowWithSource first = create(tenant, "first_" + IdUtils.create(), "test");
|
||||
FlowWithSource firstUpdated = create(tenant, first.getId(), "test2");
|
||||
|
||||
|
||||
flowRepository.create(GenericFlow.of(first));
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
// create the same id than first, no additional flows
|
||||
first = flowRepository.update(GenericFlow.of(firstUpdated), first);
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
FlowWithSource second = create(tenant, "second_" + IdUtils.create(), "test");
|
||||
// create a new one
|
||||
flowRepository.create(GenericFlow.of(second));
|
||||
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
|
||||
|
||||
// delete first
|
||||
FlowWithSource deleted = flowRepository.delete(first);
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
// restore must works
|
||||
flowRepository.create(GenericFlow.of(first));
|
||||
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
|
||||
|
||||
}
|
||||
|
||||
public List<FlowWithSource> getFlowsForTenant(FlowListenersInterface flowListenersService, String tenantId){
|
||||
public List<FlowWithSource> getFlowsForTenant(FlowListenersInterface flowListenersService, String tenantId) {
|
||||
return flowListenersService.flows().stream()
|
||||
.filter(f -> tenantId.equals(f.getTenantId()))
|
||||
.toList();
|
||||
|
||||
@@ -44,6 +44,6 @@ public class SleepTrigger extends AbstractTrigger implements PollingTriggerInter
|
||||
|
||||
@Override
|
||||
public Duration getInterval() {
|
||||
return null;
|
||||
return Duration.ofSeconds(1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,32 +1,6 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.runners.FlowListenersTest;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class H2FlowListenersTest extends FlowListenersTest {
|
||||
|
||||
@Inject
|
||||
FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.FLOW_NAMED)
|
||||
QueueInterface<FlowInterface> flowQueue;
|
||||
|
||||
@Inject
|
||||
PluginDefaultService pluginDefaultService;
|
||||
|
||||
@Test
|
||||
public void all() throws TimeoutException {
|
||||
// we don't inject FlowListeners to remove a flaky test
|
||||
this.suite(new FlowListeners(flowRepository, flowQueue, pluginDefaultService));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,17 +1,6 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.runners.FlowListenersTest;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class MysqlFlowListenersTest extends FlowListenersTest {
|
||||
@Inject
|
||||
FlowListeners flowListenersService;
|
||||
|
||||
@Test
|
||||
public void all() throws TimeoutException {
|
||||
this.suite(flowListenersService);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,17 +1,6 @@
|
||||
package io.kestra.schedulers.mysql;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
import io.kestra.scheduler.AbstractScheduler;
|
||||
import io.kestra.scheduler.SchedulerExecutionStateInterface;
|
||||
import io.kestra.scheduler.SchedulerScheduleTest;
|
||||
|
||||
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,17 +1,6 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.runners.FlowListenersTest;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class PostgresFlowListenersTest extends FlowListenersTest {
|
||||
@Inject
|
||||
FlowListeners flowListenersService;
|
||||
|
||||
@Test
|
||||
public void all() throws TimeoutException {
|
||||
this.suite(flowListenersService);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,17 +1,6 @@
|
||||
package io.kestra.schedulers.postgres;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
import io.kestra.scheduler.AbstractScheduler;
|
||||
import io.kestra.scheduler.SchedulerExecutionStateInterface;
|
||||
import io.kestra.scheduler.SchedulerScheduleTest;
|
||||
|
||||
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1056,7 +1056,9 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
log.error("Unable to kill the execution {}", killedExecution.getExecutionId(), e);
|
||||
}
|
||||
|
||||
log.error("KILLING FROM EXECUTOR");
|
||||
Executor executor = killingOrAfterKillState(killedExecution.getExecutionId(), Optional.ofNullable(killedExecution.getExecutionState()));
|
||||
log.error("KILLED FROM EXECUTOR");
|
||||
|
||||
// Check whether kill event should be propagated to downstream executions.
|
||||
// By default, always propagate the ExecutionKill to sub-flows (for backward compatibility).
|
||||
@@ -1083,20 +1085,24 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
}
|
||||
|
||||
private Executor killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
|
||||
return executionRepository.lock(executionId, pair -> {
|
||||
Execution currentExecution = pair.getLeft();
|
||||
FlowInterface flow = flowMetaStore.findByExecution(currentExecution).orElseThrow();
|
||||
try {
|
||||
return executionRepository.lock(executionId, pair -> {
|
||||
Execution currentExecution = pair.getLeft();
|
||||
FlowInterface flow = flowMetaStore.findByExecution(currentExecution).orElseThrow();
|
||||
|
||||
// remove it from the queued store if it was queued so it would not be restarted
|
||||
if (currentExecution.getState().isQueued()) {
|
||||
executionQueuedStorage.remove(currentExecution);
|
||||
}
|
||||
// remove it from the queued store if it was queued so it would not be restarted
|
||||
if (currentExecution.getState().isQueued()) {
|
||||
executionQueuedStorage.remove(currentExecution);
|
||||
}
|
||||
|
||||
Execution killing = executionService.kill(currentExecution, flow, afterKillState);
|
||||
Executor current = new Executor(currentExecution, null)
|
||||
.withExecution(killing, "joinKillingExecution");
|
||||
return Pair.of(current, pair.getRight());
|
||||
});
|
||||
Execution killing = executionService.kill(currentExecution, flow, afterKillState);
|
||||
Executor current = new Executor(currentExecution, null)
|
||||
.withExecution(killing, "joinKillingExecution");
|
||||
return Pair.of(current, pair.getRight());
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("ISSUE WHILE KILLING EXECUTION " + executionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void toExecution(Executor executor) {
|
||||
|
||||
@@ -23,6 +23,7 @@ import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.jooq.*;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.exception.DataException;
|
||||
@@ -301,20 +302,27 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
|
||||
Timer timer = this.metricRegistry
|
||||
.timer(MetricRegistry.METRIC_QUEUE_RECEIVE_DURATION, MetricRegistry.METRIC_QUEUE_RECEIVE_DURATION_DESCRIPTION, tags);
|
||||
StackTraceElement[] parentStackTrace = Thread.currentThread().getStackTrace();
|
||||
return this.poll(() -> timer.record(() -> {
|
||||
Result<Record> fetch = dslContextWrapper.transactionResult(configuration -> {
|
||||
DSLContext ctx = DSL.using(configuration);
|
||||
Result<Record> fetch;
|
||||
try {
|
||||
fetch = dslContextWrapper.transactionResult(configuration -> {
|
||||
DSLContext ctx = DSL.using(configuration);
|
||||
|
||||
Result<Record> result = this.receiveFetch(ctx, consumerGroup, maxOffset.get(), forUpdate);
|
||||
Result<Record> result = this.receiveFetch(ctx, consumerGroup, maxOffset.get(), forUpdate);
|
||||
|
||||
if (!result.isEmpty()) {
|
||||
List<Integer> offsets = result.map(record -> record.get("offset", Integer.class));
|
||||
if (!result.isEmpty()) {
|
||||
List<Integer> offsets = result.map(record -> record.get("offset", Integer.class));
|
||||
|
||||
maxOffset.set(offsets.getLast());
|
||||
}
|
||||
maxOffset.set(offsets.getLast());
|
||||
}
|
||||
|
||||
return result;
|
||||
});
|
||||
return result;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("Error while receiving messages from JDBC queue. Thread stacktrace: {}", parentStackTrace, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
this.send(fetch, consumer);
|
||||
|
||||
@@ -426,13 +434,14 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
|
||||
@SuppressWarnings("BusyWait")
|
||||
protected Runnable poll(Supplier<Integer> runnable) {
|
||||
AtomicBoolean queriedToStop = new AtomicBoolean(false);
|
||||
AtomicBoolean running = new AtomicBoolean(true);
|
||||
|
||||
poolExecutor.execute(() -> {
|
||||
List<Configuration.Step> steps = configuration.computeSteps();
|
||||
Duration sleep = configuration.minPollInterval;
|
||||
ZonedDateTime lastPoll = ZonedDateTime.now();
|
||||
while (running.get() && !this.isClosed.get()) {
|
||||
while (!queriedToStop.get() && !this.isClosed.get()) {
|
||||
if (!this.isPaused.get()) {
|
||||
try {
|
||||
Integer count = runnable.get();
|
||||
@@ -471,9 +480,21 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
running.set(false);
|
||||
});
|
||||
|
||||
return () -> running.set(false);
|
||||
return () -> {
|
||||
queriedToStop.set(true);
|
||||
try {
|
||||
Awaitility.await()
|
||||
.atMost(Duration.ofSeconds(30))
|
||||
.pollInterval(Duration.ofMillis(10))
|
||||
.until(() -> !running.get());
|
||||
} catch (Exception e) {
|
||||
log.warn("Error while stopping polling", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
|
||||
|
||||
@@ -19,6 +19,7 @@ import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
@@ -59,7 +60,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
});
|
||||
|
||||
// No-op consumption of the trigger queue, so the events are purged from the queue
|
||||
this.triggerQueue.receive(Scheduler.class, trigger -> { });
|
||||
this.receiveCancellations.add(this.triggerQueue.receive(Scheduler.class, trigger -> { }));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -25,6 +25,7 @@ import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.utils.Await;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import lombok.SneakyThrows;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
@@ -85,11 +86,18 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
.truncatedTo(ChronoUnit.HOURS);
|
||||
}
|
||||
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
);
|
||||
) {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public void close() {
|
||||
super.close();
|
||||
flowListenersServiceSpy.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
@@ -139,7 +147,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receiveExecutions = TestsUtils.receive(executionQueue, throwConsumer(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -200,7 +208,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> {
|
||||
@@ -235,7 +243,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -281,7 +289,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -326,7 +334,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(lastTrigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
@@ -358,7 +366,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
.build();
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> {
|
||||
@@ -422,7 +430,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
scheduler.run();
|
||||
|
||||
// Wait 3s to see if things happen
|
||||
@@ -462,7 +470,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(2);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, throwConsumer(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -522,7 +530,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -635,7 +643,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, throwConsumer(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -708,7 +716,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, throwConsumer(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -741,4 +749,4 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getNextExecutionDate().isAfter(lastTrigger.getNextExecutionDate().plusSeconds(10))).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(20));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user