mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
wip: showcase JDBC executor being stuck on any exception in queue consumption
This commit is contained in:
@@ -557,6 +557,10 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
final FlowWithSource flow = findFlowOrThrow(execution);
|
||||
executor = executor.withFlow(flow);
|
||||
|
||||
if (execution.getId().startsWith("shouldFail")) {
|
||||
throw new RuntimeException("Simulated failure");
|
||||
}
|
||||
|
||||
// schedule it for later if needed
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED && execution.getScheduleDate() != null && execution.getScheduleDate().isAfter(Instant.now())) {
|
||||
ExecutionDelay executionDelay = ExecutionDelay.builder()
|
||||
|
||||
@@ -3,14 +3,18 @@ package io.kestra.jdbc.runner;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.MessageTooBigException;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.AbstractRunnerTest;
|
||||
import io.kestra.core.runners.InputsTest;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -19,9 +23,14 @@ import org.slf4j.event.Level;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static io.kestra.core.repositories.AbstractFlowRepositoryTest.TEST_NAMESPACE;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
@@ -31,6 +40,9 @@ public abstract class JdbcRunnerTest extends AbstractRunnerTest {
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
public static final String NAMESPACE = "io.kestra.tests";
|
||||
|
||||
@Test
|
||||
@@ -50,6 +62,14 @@ public abstract class JdbcRunnerTest extends AbstractRunnerTest {
|
||||
).hasSize(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows("flows/valids/return.yaml")
|
||||
void stuckExecutor() throws QueueException, InterruptedException, TimeoutException {
|
||||
Execution wrongExecution = Execution.newExecution(flowRepository.findById(MAIN_TENANT, "io.kestra.tests", "return").get(), Collections.emptyList()).withScheduleDate(Instant.now().minus(Instant.now().toEpochMilli() - 100000, ChronoUnit.MILLIS));
|
||||
executionQueue.emit(wrongExecution.toBuilder().id("shouldFail" + IdUtils.create()).build());
|
||||
runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "return");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"})
|
||||
void waitForChildTaskWarning() throws Exception {
|
||||
|
||||
Reference in New Issue
Block a user