fix(tests): add sleep before restarting to make sure the queue is clean (#11597)

* fix(tests): add sleep before restarting to make sure the queue is clean

* fix(tests): add retry on a flaky test

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
This commit is contained in:
Nicolas K.
2025-09-30 17:07:27 +02:00
committed by GitHub
parent 8771aa86a6
commit d243ba65e9
6 changed files with 35 additions and 53 deletions

View File

@@ -1,5 +1,6 @@
package io.kestra.cli.services;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
@@ -57,7 +58,8 @@ class FileChangedEventListenerTest {
}
}
@Test
@FlakyTest
@RetryingTest(2)
void test() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
// remove the flow if it already exists
@@ -95,6 +97,7 @@ class FileChangedEventListenerTest {
);
}
@FlakyTest
@RetryingTest(2)
void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");

View File

@@ -156,7 +156,7 @@ public class FlowConcurrencyCaseTest {
// we restart the first one, it should be queued then fail again.
Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1);
Execution restarted = executionService.restart(failedExecution, null);
Execution executionResult1 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), restarted);
Execution executionResult1 = runnerUtils.restartExecution(e -> e.getState().getCurrent().equals(Type.FAILED), restarted);
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2);
assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED);

View File

@@ -46,7 +46,7 @@ public class RestartCaseTest {
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(3);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
Execution finishedRestartedExecution = runnerUtils.restartExecution(
execution -> execution.getState().getCurrent() == State.Type.SUCCESS && execution.getId().equals(firstExecution.getId()),
restartedExec
);
@@ -82,7 +82,7 @@ public class RestartCaseTest {
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(1);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
Execution finishedRestartedExecution = runnerUtils.restartExecution(
execution -> execution.getState().getCurrent() == State.Type.FAILED && execution.getTaskRunList().getFirst().getAttempts().size() == 2,
restartedExec
);
@@ -114,7 +114,7 @@ public class RestartCaseTest {
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(4);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
Execution finishedRestartedExecution = runnerUtils.restartExecution(
execution -> execution.getState().getCurrent() == State.Type.FAILED && execution.findTaskRunsByTaskId("failStep").stream().findFirst().get().getAttempts().size() == 2,
restartedExec
);
@@ -169,20 +169,18 @@ public class RestartCaseTest {
Execution restart = executionService.restart(execution, null);
assertThat(restart.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution restartEnded = runnerUtils.emitAndAwaitExecution(
Execution restartEnded = runnerUtils.restartExecution(
e -> e.getState().getCurrent() == State.Type.FAILED,
restart,
Duration.ofSeconds(60)
restart
);
assertThat(restartEnded.getState().getCurrent()).isEqualTo(State.Type.FAILED);
Execution newRestart = executionService.restart(restartEnded, null);
restartEnded = runnerUtils.emitAndAwaitExecution(
restartEnded = runnerUtils.restartExecution(
e -> e.getState().getCurrent() == State.Type.FAILED,
newRestart,
Duration.ofSeconds(60)
newRestart
);
assertThat(restartEnded.getState().getCurrent()).isEqualTo(State.Type.FAILED);
@@ -198,22 +196,19 @@ public class RestartCaseTest {
// there is 3 values so we must restart it 3 times to end the 3 subflows
Execution restarted1 = executionService.restart(execution, null);
execution = runnerUtils.emitAndAwaitExecution(
execution = runnerUtils.restartExecution(
e -> e.getState().getCurrent() == State.Type.FAILED && e.getFlowId().equals("restart-parent"),
restarted1,
Duration.ofSeconds(10)
restarted1
);
Execution restarted2 = executionService.restart(execution, null);
execution = runnerUtils.emitAndAwaitExecution(
execution = runnerUtils.restartExecution(
e -> e.getState().getCurrent() == State.Type.FAILED && e.getFlowId().equals("restart-parent"),
restarted2,
Duration.ofSeconds(10)
restarted2
);
Execution restarted3 = executionService.restart(execution, null);
execution = runnerUtils.emitAndAwaitExecution(
execution = runnerUtils.restartExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS && e.getFlowId().equals("restart-parent"),
restarted3,
Duration.ofSeconds(10)
restarted3
);
assertThat(execution.getTaskRunList()).hasSize(6);
@@ -239,10 +234,9 @@ public class RestartCaseTest {
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(2);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
Execution finishedRestartedExecution = runnerUtils.restartExecution(
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess(),
restartedExec,
Duration.ofSeconds(60)
restartedExec
);
assertThat(finishedRestartedExecution).isNotNull();
@@ -274,10 +268,9 @@ public class RestartCaseTest {
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(2);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
Execution finishedRestartedExecution = runnerUtils.restartExecution(
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess(),
restartedExec,
Duration.ofSeconds(60)
restartedExec
);
assertThat(finishedRestartedExecution).isNotNull();
assertThat(finishedRestartedExecution.getId()).isEqualTo(firstExecution.getId());

View File

@@ -233,7 +233,7 @@ public class ForEachItemCaseTest {
assertThat(triggeredExecs).extracting(e -> e.getState().getCurrent()).containsOnly(FAILED);
Execution restarted = executionService.restart(failedExecution, null);
final Execution successExecution = runnerUtils.emitAndAwaitExecution(
final Execution successExecution = runnerUtils.restartExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS && e.getFlowId().equals("restart-for-each-item"),
restarted
);

View File

@@ -1,33 +1,7 @@
package io.kestra.runner.mysql;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.jdbc.runner.JdbcRunnerTest;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
public class MysqlRunnerTest extends JdbcRunnerTest {
@Disabled("We have a bug here in the queue where no FAILED event is sent, so the state store is not cleaned")
@Test
@Override
@LoadFlows({"flows/valids/restart-with-finally.yaml"})
protected void restartFailedWithFinally() throws Exception {
restartCaseTest.restartFailedWithFinally();
}
@Disabled("Should fail the second time, but is success")
@Test
@Override
@LoadFlows({"flows/valids/restart_local_errors.yaml"})
protected void restartFailedThenFailureWithLocalErrors() throws Exception {
restartCaseTest.restartFailedThenFailureWithLocalErrors();
}
@Disabled("Is success, but is not terminated")
@Test
@Override
@LoadFlows({"flows/valids/restart-with-after-execution.yaml"})
protected void restartFailedWithAfterExecution() throws Exception {
restartCaseTest.restartFailedWithAfterExecution();
}
}

View File

@@ -166,6 +166,18 @@ public class TestRunnerUtils {
return emitAndAwaitExecution(predicate, execution, Duration.ofSeconds(20));
}
public Execution restartExecution(Predicate<Execution> predicate, Execution execution)
throws QueueException, InterruptedException {
return restartExecution(predicate, execution, Duration.ofSeconds(20));
}
public Execution restartExecution(Predicate<Execution> predicate, Execution execution, Duration duration)
throws QueueException, InterruptedException {
//We need to wait before restarting to make sure the execution is cleaned before we restart.
Thread.sleep(100L);
return emitAndAwaitExecution(predicate, execution, duration);
}
public Execution emitAndAwaitExecution(Predicate<Execution> predicate, Execution execution, Duration duration)
throws QueueException {