feat(core): retry flowable (#3473)

* feat(core): retry flowable

* feat(core): migrate retry memory runner test to jdbc/kafka runner test

* feat(core): memory executor delayType use
This commit is contained in:
YannC
2024-04-08 16:43:21 +02:00
committed by GitHub
parent cbcd9b07a7
commit e58ff958a0
13 changed files with 425 additions and 126 deletions

View File

@@ -453,59 +453,85 @@ public class ExecutorService {
workerTaskResult.ifPresent(list::add);
}
Task task = executor.getFlow().findTaskByTaskId(taskRun.getTaskId());
TaskRun parentTaskRun = null;
Task parentTask = null;
if (taskRun.getParentTaskRunId() != null) {
parentTaskRun = executor.getExecution().findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
parentTask = executor.getFlow().findTaskByTaskId(parentTaskRun.getTaskId());
}
/*
* Check if the task is failed and if it has a retry policy
*/
if (!executor.getExecution().getState().isRetrying() &&
taskRun.getState().isFailed() &&
(executor.getFlow().findTaskByTaskId(taskRun.getTaskId()).getRetry() != null || executor.getFlow().getRetry() != null)
!(task instanceof FlowableTask<?>) &&
(executor.getFlow().findTaskByTaskId(taskRun.getTaskId()).getRetry() != null ||
executor.getFlow().getRetry() != null ||
(parentTask != null && parentTask.getRetry() != null))
) {
Instant nextRetryDate;
AbstractRetry retry;
AbstractRetry.Behavior behavior;
ExecutionDelay.ExecutionDelayBuilder executionDelayBuilder = ExecutionDelay.builder()
.taskRunId(taskRun.getId())
.executionId(executor.getExecution().getId());
// Case task has a retry
if (executor.getFlow().findTaskByTaskId(taskRun.getTaskId()).getRetry() != null) {
retry = executor.getFlow().findTaskByTaskId(taskRun.getTaskId()).getRetry();
AbstractRetry.Behavior behavior = retry.getBehavior();
if (task.getRetry() != null) {
retry = task.getRetry();
behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
taskRun.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
}
// Case parent task has a retry
else if (parentTask != null && parentTask.getRetry() != null) {
retry = parentTask.getRetry();
behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
taskRun.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
}
// Case flow has a retry
else {
retry = executor.getFlow().getRetry();
behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
executionService.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
}
if (nextRetryDate != null) {
executionDelayBuilder
.date(nextRetryDate)
.state(State.Type.RUNNING)
.delayType(behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
ExecutionDelay.DelayType.RESTART_FAILED_FLOW :
ExecutionDelay.DelayType.RESTART_FAILED_TASK);
}
// Case flow has a retry
else {
retry = executor.getFlow().getRetry();
AbstractRetry.Behavior behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
executionService.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
executionDelayBuilder
.date(nextRetryDate)
.state(State.Type.RUNNING)
.delayType(retry.getBehavior().equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
ExecutionDelay.DelayType.RESTART_FAILED_FLOW :
ExecutionDelay.DelayType.RESTART_FAILED_TASK);
}
if (nextRetryDate != null) {
executionDelays.add(executionDelayBuilder.build());
executor.withExecution(executor.getExecution()
.withTaskRun(taskRun.withState(State.Type.RETRYING))
.withState(retry.getBehavior().equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
State.Type.RETRIED :
State.Type.RETRYING
),
"handleRetryTask");
// Prevent workerTaskResult of flowable to be sent
// because one of its children is retrying
if (taskRun.getParentTaskRunId() != null) {
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId())).toList();
}
}
}
// If the task is retrying
// make sure that the workerTaskResult of the parent task is not sent
if (taskRun.getState().isRetrying() && taskRun.getParentTaskRunId() != null) {
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId())).toList();
}
}
executor.withWorkerTaskDelays(executionDelays, "handleChildWorkerTaskResult");
if (list.isEmpty()) {

View File

@@ -1,75 +1,71 @@
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/*
* Disabled in favor of JDBC/Kafka runner tests
*/
public class RetryTest extends AbstractMemoryRunnerTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Test
void retrySuccess() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-success");
assertThat(execution.getState().getCurrent(), is(State.Type.WARNING));
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
}
@Test
void retrySuccessAtFirstAttempt() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-success-first-attempt");
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1));
}
@Test
void retryFailed() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-failed");
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
@Test
void retryRandom() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-random");
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(3));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
@Test
void retryExpo() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-expo");
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(3));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
@Test
void retryFail() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-and-fail");
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
// @Test
// void retrySuccess() throws TimeoutException {
// Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-success");
//
// assertThat(execution.getState().getCurrent(), is(State.Type.WARNING));
// assertThat(execution.getTaskRunList(), hasSize(1));
// assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
// }
//
// @Test
// void retrySuccessAtFirstAttempt() throws TimeoutException {
// Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-success-first-attempt");
//
// assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
// assertThat(execution.getTaskRunList(), hasSize(1));
// assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1));
// }
//
// @Test
// void retryFailed() throws TimeoutException {
// Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-failed");
//
// assertThat(execution.getTaskRunList(), hasSize(2));
// assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
// assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
// }
//
// @Test
// void retryRandom() throws TimeoutException {
// Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-random");
//
// assertThat(execution.getTaskRunList(), hasSize(1));
// assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(3));
// assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
// }
//
// @Test
// void retryExpo() throws TimeoutException {
// Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-expo");
//
// assertThat(execution.getTaskRunList(), hasSize(1));
// assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(3));
// assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
// }
//
// @Test
// void retryFail() throws TimeoutException {
// Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-and-fail");
//
// assertThat(execution.getTaskRunList(), hasSize(2));
// assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
// assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
//
// }
}

View File

@@ -19,8 +19,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
@Slf4j
@Singleton
@@ -33,6 +32,55 @@ public class RetryCaseTest {
@Inject
protected RunnerUtils runnerUtils;
public void retrySuccess() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-success");
assertThat(execution.getState().getCurrent(), is(State.Type.WARNING));
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
}
public void retrySuccessAtFirstAttempt() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-success-first-attempt");
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1));
}
public void retryFailed() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-failed");
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
public void retryRandom() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-random");
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(3));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
public void retryExpo() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-expo");
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(3));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
public void retryFail() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-and-fail");
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
public void retryNewExecutionTaskDuration() throws TimeoutException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<List<State.Type>> stateHistory = new AtomicReference<>(new ArrayList<>());
@@ -190,4 +238,60 @@ public class RetryCaseTest {
assertThat(execution.getTaskRunList().get(0).attemptNumber(), is(4));
}
public void retryFlowable() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-flowable",
null,
null
);
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(1).attemptNumber(), is(3));
}
public void retryFlowableChild() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-flowable-child",
null,
null
);
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(1).attemptNumber(), is(3));
}
public void retryFlowableNestedChild() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-flowable-nested-child",
null,
null
);
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(2).attemptNumber(), is(3));
}
public void retryFlowableParallel() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-flowable-parallel",
null,
null
);
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(1).attemptNumber(), greaterThanOrEqualTo(2));
assertThat(execution.getTaskRunList().get(2).attemptNumber(), greaterThanOrEqualTo(2));
}
}

View File

@@ -0,0 +1,13 @@
id: retry-flowable-child
namespace: io.kestra.tests
tasks:
- id: flowable
type: io.kestra.core.tasks.flows.Sequential
tasks:
- id: child
type: io.kestra.core.tasks.executions.Fail
retry:
type: constant
interval: PT2S
maxAttempt: 3

View File

@@ -0,0 +1,16 @@
id: retry-flowable-nested-child
namespace: io.kestra.tests
tasks:
- id: flowable
type: io.kestra.core.tasks.flows.Sequential
tasks:
- id: flowable2
type: io.kestra.core.tasks.flows.Sequential
tasks:
- id: child
type: io.kestra.core.tasks.executions.Fail
retry:
type: constant
interval: PT2S
maxAttempt: 3

View File

@@ -0,0 +1,14 @@
id: retry-flowable-parallel
namespace: io.kestra.tests
tasks:
- id: flowable
type: io.kestra.core.tasks.flows.EachParallel
value: [0, 1]
retry:
type: constant
interval: PT2S
maxAttempt: 3
tasks:
- id: child
type: io.kestra.core.tasks.executions.Fail

View File

@@ -0,0 +1,13 @@
id: retry-flowable
namespace: io.kestra.tests
tasks:
- id: flowable
type: io.kestra.core.tasks.flows.Sequential
retry:
type: constant
interval: PT2S
maxAttempt: 3
tasks:
- id: child
type: io.kestra.core.tasks.executions.Fail

View File

@@ -0,0 +1,7 @@
package io.kestra.runner.h2;
import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
public class H2RunnerRetryTest extends JdbcRunnerRetryTest {
}

View File

@@ -0,0 +1,7 @@
package io.kestra.runner.mysql;
import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
public class MysqlRunnerRetryTest extends JdbcRunnerRetryTest {
}

View File

@@ -0,0 +1,7 @@
package io.kestra.runner.postgres;
import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
public class PostgresRunnerRetryTest extends JdbcRunnerRetryTest {
}

View File

@@ -0,0 +1,136 @@
package io.kestra.jdbc.runner;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.tasks.flows.RetryCaseTest;
import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.TimeoutException;
@MicronautTest(transactional = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
public abstract class JdbcRunnerRetryTest {
@Inject
private StandAloneRunner runner;
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
protected RunnerUtils runnerUtils;
@Inject
protected LocalFlowRepositoryLoader repositoryLoader;
@Inject
private RetryCaseTest retryCaseTest;
@BeforeAll
void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
TestsUtils.loads(repositoryLoader);
runner.setSchedulerEnabled(false);
runner.run();
}
@Test
void retrySuccess() throws TimeoutException {
retryCaseTest.retrySuccess();
}
@Test
void retrySuccessAtFirstAttempt() throws TimeoutException {
retryCaseTest.retrySuccessAtFirstAttempt();
}
@Test
void retryFailed() throws TimeoutException {
retryCaseTest.retryFailed();
}
@Test
void retryRandom() throws TimeoutException {
retryCaseTest.retryRandom();
}
@Test
void retryExpo() throws TimeoutException {
retryCaseTest.retryExpo();
}
@Test
void retryFail() throws TimeoutException {
retryCaseTest.retryFail();
}
@Test
void retryNewExecutionTaskDuration() throws TimeoutException {
retryCaseTest.retryNewExecutionTaskDuration();
}
@Test
void retryNewExecutionTaskAttempts() throws TimeoutException {
retryCaseTest.retryNewExecutionTaskAttempts();
}
@Test
void retryNewExecutionFlowDuration() throws TimeoutException {
retryCaseTest.retryNewExecutionFlowDuration();
}
@Test
void retryNewExecutionFlowAttempts() throws TimeoutException {
retryCaseTest.retryNewExecutionFlowAttempts();
}
@Test
void retryFailedTaskDuration() throws TimeoutException {
retryCaseTest.retryFailedTaskDuration();
}
@Test
void retryFailedTaskAttempts() throws TimeoutException {
retryCaseTest.retryFailedTaskAttempts();
}
@Test
void retryFailedFlowDuration() throws TimeoutException {
retryCaseTest.retryFailedFlowDuration();
}
@Test
void retryFailedFlowAttempts() throws TimeoutException {
retryCaseTest.retryFailedFlowAttempts();
}
@Test
void retryFlowable() throws TimeoutException {
retryCaseTest.retryFlowable();
}
@Test
void retryFlowableChild() throws TimeoutException {
retryCaseTest.retryFlowableChild();
}
@Test
void retryFlowableNestedChild() throws TimeoutException {
retryCaseTest.retryFlowableNestedChild();
}
@Test
void retryFlowableParallel() throws TimeoutException {
retryCaseTest.retryFlowableParallel();
}
}

View File

@@ -320,44 +320,4 @@ public abstract class JdbcRunnerTest {
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
@Test
void retryNewExecutionTaskDuration() throws TimeoutException {
retryCaseTest.retryNewExecutionTaskDuration();
}
@Test
void retryNewExecutionTaskAttempts() throws TimeoutException {
retryCaseTest.retryNewExecutionTaskAttempts();
}
@Test
void retryNewExecutionFlowDuration() throws TimeoutException {
retryCaseTest.retryNewExecutionFlowDuration();
}
@Test
void retryNewExecutionFlowAttempts() throws TimeoutException {
retryCaseTest.retryNewExecutionFlowAttempts();
}
@Test
void retryFailedTaskDuration() throws TimeoutException {
retryCaseTest.retryFailedTaskDuration();
}
@Test
void retryFailedTaskAttempts() throws TimeoutException {
retryCaseTest.retryFailedTaskAttempts();
}
@Test
void retryFailedFlowDuration() throws TimeoutException {
retryCaseTest.retryFailedFlowDuration();
}
@Test
void retryFailedFlowAttempts() throws TimeoutException {
retryCaseTest.retryFailedFlowAttempts();
}
}

View File

@@ -223,7 +223,7 @@ public class MemoryExecutor implements ExecutorInterface {
try {
ExecutionState executionState = EXECUTIONS.get(workerTaskResultDelay.getExecutionId());
if (executionState.execution.findTaskRunByTaskRunId(workerTaskResultDelay.getTaskRunId()).getState().getCurrent() == State.Type.PAUSED) {
if (workerTaskResultDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW)) {
Execution markAsExecution = executionService.markAs(
executionState.execution,
workerTaskResultDelay.getTaskRunId(),
@@ -231,7 +231,7 @@ public class MemoryExecutor implements ExecutorInterface {
);
EXECUTIONS.put(workerTaskResultDelay.getExecutionId(), executionState.from(markAsExecution));
executionQueue.emit(markAsExecution);
} else if (executionState.execution.findTaskRunByTaskRunId(workerTaskResultDelay.getTaskRunId()).getState().getCurrent().equals(State.Type.FAILED)) {
} else if (workerTaskResultDelay.getDelayType().equals(ExecutionDelay.DelayType.RESTART_FAILED_TASK)) {
Execution newAttempt = executionService.retryTask(
executionState.execution,
workerTaskResultDelay.getTaskRunId()