feat(): retry-flow (#3392)

* feat(): retry-flow

* fix: rename behavior

* feat: created a metadata props for executions

* fix(ui): translate + new metadata prop
This commit is contained in:
YannC
2024-04-03 14:33:54 +02:00
committed by YannC
parent 90f28cc8e8
commit 81f281e92b
31 changed files with 604 additions and 96 deletions

View File

@@ -93,6 +93,9 @@ public class Execution implements DeletedInterface, TenantInterface {
@Builder.Default
boolean deleted = false;
@With
ExecutionMetadata metadata;
/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow} and inputs.
*
@@ -134,6 +137,9 @@ public class Execution implements DeletedInterface, TenantInterface {
public static class ExecutionBuilder {
void prebuild() {
this.originalId = this.id;
this.metadata = ExecutionMetadata.builder()
.originalCreatedDate(Instant.now())
.build();
}
}
@@ -165,7 +171,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.parentId,
this.originalId,
this.trigger,
this.deleted
this.deleted,
this.metadata
);
}
@@ -197,7 +204,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.parentId,
this.originalId,
this.trigger,
this.deleted
this.deleted,
this.metadata
);
}
@@ -217,7 +225,8 @@ public class Execution implements DeletedInterface, TenantInterface {
childExecutionId != null ? this.getId() : null,
this.originalId,
this.trigger,
this.deleted
this.deleted,
this.metadata
);
}
@@ -449,7 +458,7 @@ public class Execution implements DeletedInterface, TenantInterface {
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'", taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask()) && taskRun.getState().isFailed();
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry()) && taskRun.getState().isFailed();
});
}

View File

@@ -0,0 +1,25 @@
package io.kestra.core.models.executions;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.With;
import java.time.Instant;
@Builder(toBuilder = true)
@Setter
@Getter
public class ExecutionMetadata {
@Builder.Default
@With
Integer attemptNumber = 1;
Instant originalCreatedDate;
public ExecutionMetadata nextAttempt() {
return this.toBuilder()
.attemptNumber(this.attemptNumber + 1)
.build();
}
}

View File

@@ -3,7 +3,7 @@ package io.kestra.core.models.executions;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.validation.constraints.NotNull;
@@ -219,19 +219,21 @@ public class TaskRun implements TenantInterface {
}
/**
* This method is used when the retry is apply on a task
* but the retry type is NEW_EXECUTION
*
* @param task Contains the retry configuration
* @return The next retry date, null if maxAttempt is reached
* @param retry Contains the retry configuration
* @param execution Contains the attempt number and original creation date
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/
public Instant nextRetryDate(Task task) {
if (this.attempts == null || this.attempts.isEmpty() || task.getRetry() == null || this.getAttempts().size() >= task.getRetry().getMaxAttempt()) {
public Instant nextRetryDate(AbstractRetry retry, Execution execution) {
if (retry.getMaxAttempt() != null && execution.getMetadata().getAttemptNumber() >= retry.getMaxAttempt()) {
return null;
}
Instant base = this.lastAttempt().getState().maxDate();
Instant nextDate = task.getRetry().nextRetryDate(this.attempts.size(), base);
if (task.getRetry().getMaxDuration() != null && nextDate.isAfter(this.lastAttempt().getState().minDate().plus(task.getRetry().getMaxDuration()))) {
Instant nextDate = retry.nextRetryDate(execution.getMetadata().getAttemptNumber(), base);
if (retry.getMaxDuration() != null && nextDate.isAfter(execution.getMetadata().getOriginalCreatedDate().plus(retry.getMaxDuration()))) {
return null;
}
@@ -239,8 +241,31 @@ public class TaskRun implements TenantInterface {
return nextDate;
}
public boolean shouldBeRetried(Task task) {
return this.nextRetryDate(task) != null;
/**
* This method is used when the Retry definition comes from the flow
* @param retry The retry configuration
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/
public Instant nextRetryDate(AbstractRetry retry) {
if (this.attempts == null || this.attempts.isEmpty() || (retry.getMaxAttempt() != null && this.attemptNumber() >= retry.getMaxAttempt())) {
return null;
}
Instant base = this.lastAttempt().getState().maxDate();
Instant nextDate = retry.nextRetryDate(this.attempts.size(), base);
if (retry.getMaxDuration() != null && nextDate.isAfter(this.attempts.get(0).getState().minDate().plus(retry.getMaxDuration()))) {
return null;
}
return nextDate;
}
public boolean shouldBeRetried(AbstractRetry retry) {
if (retry == null) {
return false;
}
return this.nextRetryDate(retry) != null;
}

View File

@@ -16,6 +16,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
@@ -27,6 +28,10 @@ import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
@@ -35,10 +40,6 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;
@SuperBuilder(toBuilder = true)
@Getter
@@ -121,6 +122,9 @@ public class Flow implements DeletedInterface, TenantInterface {
@Valid
List<Output> outputs;
@Valid
protected AbstractRetry retry;
public Logger logger() {
return LoggerFactory.getLogger("flow." + this.id);
}

View File

@@ -158,6 +158,11 @@ public class State {
return this.current.isFailed() || this.isPaused();
}
@JsonIgnore
public boolean isResumable() {
return this.current.isPaused() || this.current.isRetrying();
}
@Introspected
public enum Type {
@@ -172,10 +177,11 @@ public class State {
KILLED,
CANCELLED,
QUEUED,
RETRYING;
RETRYING,
RETRIED;
public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED;
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED;
}
public boolean isCreated() {
@@ -195,8 +201,9 @@ public class State {
}
public boolean isRetrying() {
return this == Type.RETRYING;
return this == Type.RETRYING || this == Type.RETRIED;
}
}
@Value

View File

@@ -34,6 +34,9 @@ public abstract class AbstractRetry {
@Builder.Default
private Boolean warningOnRetry = false;
@Builder.Default
private Behavior behavior = Behavior.RETRY_FAILED_TASK;
public abstract Instant nextRetryDate(Integer attemptCount, Instant lastAttempt);
public <T> RetryPolicy<T> toPolicy() {
@@ -58,4 +61,9 @@ public abstract class AbstractRetry {
return new RetryPolicy<T>()
.withMaxAttempts(1);
}
public enum Behavior {
RETRY_FAILED_TASK,
CREATE_NEW_EXECUTION
}
}

View File

@@ -2,14 +2,13 @@ package io.kestra.core.runners;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.flows.State;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import java.time.Instant;
import jakarta.validation.constraints.NotNull;
@Value
@AllArgsConstructor
@Builder
@@ -25,8 +24,24 @@ public class ExecutionDelay {
@NotNull State.Type state;
@NotNull DelayType delayType;
@JsonIgnore
public String uid() {
return String.join("_", executionId, taskRunId);
}
/**
* For previous version, return RESUME_FLOW by default as it was the only case
* @return DelayType representing the action to do when
*/
public DelayType getDelayType() {
return delayType == null ? DelayType.RESUME_FLOW : delayType;
}
public enum DelayType {
RESUME_FLOW,
RESTART_FAILED_TASK,
RESTART_FAILED_FLOW
}
}

View File

@@ -7,7 +7,9 @@ import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.*;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.LogService;
import io.kestra.core.tasks.flows.Pause;
import io.kestra.core.tasks.flows.WorkingDirectory;
@@ -47,6 +49,9 @@ public class ExecutorService {
protected FlowExecutorInterface flowExecutorInterface;
@Inject
private ExecutionService executionService;
protected FlowExecutorInterface flowExecutorInterface() {
// bean is injected late, so we need to wait
if (this.flowExecutorInterface == null) {
@@ -429,7 +434,7 @@ public class ExecutorService {
return executor.withTaskRun(result, "handleChildNext");
}
private Executor handleChildWorkerTaskResult(Executor executor) throws InternalException {
private Executor handleChildWorkerTaskResult(Executor executor) throws Exception {
if (executor.getExecution().getTaskRunList() == null) {
return executor;
}
@@ -448,22 +453,55 @@ public class ExecutorService {
workerTaskResult.ifPresent(list::add);
}
/**
/*
* Check if the task is failed and if it has a retry policy
*/
if (!executor.getExecution().getState().isRetrying() && taskRun.getState().isFailed() && executor.getFlow().findTaskByTaskId(taskRun.getTaskId()).getRetry() != null) {
Instant nextRetryDate = taskRun.nextRetryDate(executor.getFlow().findTaskByTaskId(taskRun.getTaskId()));
if (nextRetryDate != null) {
executionDelays.add(
ExecutionDelay.builder()
.taskRunId(taskRun.getId())
.executionId(executor.getExecution().getId())
.date(nextRetryDate)
.state(State.Type.RUNNING)
.build()
);
executor.withExecution(executor.getExecution().withState(State.Type.RETRYING), "handleRetryTask");
if (!executor.getExecution().getState().isRetrying() &&
taskRun.getState().isFailed() &&
(executor.getFlow().findTaskByTaskId(taskRun.getTaskId()).getRetry() != null || executor.getFlow().getRetry() != null)
) {
Instant nextRetryDate;
AbstractRetry retry;
ExecutionDelay.ExecutionDelayBuilder executionDelayBuilder = ExecutionDelay.builder()
.taskRunId(taskRun.getId())
.executionId(executor.getExecution().getId());
// Case task has a retry
if (executor.getFlow().findTaskByTaskId(taskRun.getTaskId()).getRetry() != null) {
retry = executor.getFlow().findTaskByTaskId(taskRun.getTaskId()).getRetry();
AbstractRetry.Behavior behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
taskRun.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
executionDelayBuilder
.date(nextRetryDate)
.state(State.Type.RUNNING)
.delayType(behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
ExecutionDelay.DelayType.RESTART_FAILED_FLOW :
ExecutionDelay.DelayType.RESTART_FAILED_TASK);
}
// Case flow has a retry
else {
retry = executor.getFlow().getRetry();
AbstractRetry.Behavior behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
executionService.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
executionDelayBuilder
.date(nextRetryDate)
.state(State.Type.RUNNING)
.delayType(retry.getBehavior().equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
ExecutionDelay.DelayType.RESTART_FAILED_FLOW :
ExecutionDelay.DelayType.RESTART_FAILED_TASK);
}
if (nextRetryDate != null) {
executionDelays.add(executionDelayBuilder.build());
executor.withExecution(executor.getExecution()
.withState(retry.getBehavior().equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
State.Type.RETRIED :
State.Type.RETRYING
),
"handleRetryTask");
}
}
}
@@ -499,6 +537,7 @@ public class ExecutorService {
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(pauseTask.getDelay() != null ? pauseTask.getDelay() : pauseTask.getTimeout()))
.state(pauseTask.getDelay() != null ? State.Type.RUNNING : State.Type.FAILED)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
}
}
@@ -537,7 +576,6 @@ public class ExecutorService {
return executor.withWorkerTaskResults(workerTaskResults, "handleChildWorkerCreatedKilling");
}
private Executor handleListeners(Executor executor) {
if (!executor.getExecution().getState().isTerminated()) {
return executor;
@@ -578,7 +616,6 @@ public class ExecutorService {
return this.onEnd(executor);
}
private Executor handleRestart(Executor executor) {
if (executor.getExecution().getState().getCurrent() != State.Type.RESTARTED) {
return executor;
@@ -608,7 +645,6 @@ public class ExecutorService {
return executor.withExecution(newExecution, "handleKilling");
}
private Executor handleWorkerTask(Executor executor) throws InternalException {
if (executor.getExecution().getTaskRunList() == null || executor.getExecution().getState().getCurrent() == State.Type.KILLING) {
return executor;

View File

@@ -306,6 +306,7 @@ public class RunContext {
if (execution.getOriginalId() != null) {
executionMap.put("originalId", execution.getOriginalId());
}
builder
.put("execution", executionMap.build());

View File

@@ -10,11 +10,7 @@ import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.TimeoutExceededException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
@@ -54,14 +50,7 @@ import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -69,12 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.models.flows.State.Type.CREATED;
import static io.kestra.core.models.flows.State.Type.FAILED;
import static io.kestra.core.models.flows.State.Type.KILLED;
import static io.kestra.core.models.flows.State.Type.RUNNING;
import static io.kestra.core.models.flows.State.Type.SUCCESS;
import static io.kestra.core.models.flows.State.Type.WARNING;
import static io.kestra.core.models.flows.State.Type.*;
import static io.kestra.core.server.Service.ServiceState.TERMINATED_FORCED;
import static io.kestra.core.server.Service.ServiceState.TERMINATED_GRACEFULLY;
@@ -480,7 +464,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
state = WARNING;
}
if (workerTask.getTask().isAllowFailure() && !finalWorkerTask.getTaskRun().shouldBeRetried(workerTask.getTask()) && state.isFailed()) {
if (workerTask.getTask().isAllowFailure() && !finalWorkerTask.getTaskRun().shouldBeRetried(workerTask.getTask().getRetry()) && state.isFailed()) {
state = WARNING;
}

View File

@@ -12,6 +12,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.AbstractGraphTask;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
@@ -37,6 +38,7 @@ import reactor.core.publisher.Flux;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.function.Predicate;
@@ -76,7 +78,7 @@ public class ExecutionService {
* Retry set the given taskRun in created state
* and return the execution in running state
**/
public Execution retry(Execution execution, String taskRunId) {
public Execution retryTask(Execution execution, String taskRunId) {
List<TaskRun> newTaskRuns = execution
.getTaskRunList()
.stream()
@@ -140,6 +142,8 @@ public class ExecutionService {
execution.withState(State.Type.RESTARTED).getState()
);
newExecution = newExecution.withMetadata(execution.getMetadata().nextAttempt());
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}
@@ -217,6 +221,8 @@ public class ExecutionService {
taskRunId == null ? new State() : execution.withState(State.Type.RESTARTED).getState()
);
newExecution = newExecution.withMetadata(execution.getMetadata().nextAttempt());
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}
@@ -542,4 +548,28 @@ public class ExecutionService {
.flatMap(throwFunction(taskRun -> this.getAncestors(execution, taskRun).stream()))
.collect(Collectors.toSet());
}
/**
* This method is used to retrieve previous existing execution
* @param retry The retry define in the flow of the failed execution
* @param execution The failed execution
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/
public Instant nextRetryDate(AbstractRetry retry, Execution execution) {
if (retry.getMaxAttempt() != null && execution.getMetadata().getAttemptNumber() >= retry.getMaxAttempt()) {
return null;
}
Instant base = execution.getState().maxDate();
Instant originalCreatedDate = execution.getMetadata().getOriginalCreatedDate();
Instant nextDate = retry.nextRetryDate(execution.getMetadata().getAttemptNumber(), base);
if (retry.getMaxDuration() != null && nextDate.isAfter(originalCreatedDate.plus(retry.getMaxDuration()))) {
return null;
}
return nextDate;
}
}

View File

@@ -0,0 +1,193 @@
package io.kestra.core.tasks.flows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.utils.Await;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
@Slf4j
@Singleton
public class RetryCaseTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
protected RunnerUtils runnerUtils;
public void retryNewExecutionTaskDuration() throws TimeoutException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<List<State.Type>> stateHistory = new AtomicReference<>(new ArrayList<>());
executionQueue.receive(either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("retry-new-execution-task-duration") && execution.getState().getCurrent().isTerminated()) {
countDownLatch.countDown();
List<State.Type> stateHistoryList = stateHistory.get();
stateHistoryList.add(execution.getState().getCurrent());
stateHistory.set(stateHistoryList);
}
});
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-new-execution-task-duration",
null,
null
);
Await.until(() -> countDownLatch.getCount() == 0, Duration.ofSeconds(2), Duration.ofMinutes(1));
assertThat(stateHistory.get(), containsInAnyOrder(State.Type.RETRIED, State.Type.RETRIED, State.Type.FAILED));
}
public void retryNewExecutionTaskAttempts() throws TimeoutException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<List<State.Type>> stateHistory = new AtomicReference<>(new ArrayList<>());
executionQueue.receive(either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("retry-new-execution-task-attempts") && execution.getState().getCurrent().isTerminated()) {
countDownLatch.countDown();
List<State.Type> stateHistoryList = stateHistory.get();
stateHistoryList.add(execution.getState().getCurrent());
stateHistory.set(stateHistoryList);
}
});
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-new-execution-task-attempts",
null,
null
);
Await.until(() -> countDownLatch.getCount() == 0, Duration.ofSeconds(2), Duration.ofMinutes(1));
assertThat(stateHistory.get(), containsInAnyOrder(State.Type.RETRIED, State.Type.RETRIED, State.Type.FAILED));
}
public void retryNewExecutionFlowDuration() throws TimeoutException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<List<State.Type>> stateHistory = new AtomicReference<>(new ArrayList<>());
executionQueue.receive(either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("retry-new-execution-flow-duration") && execution.getState().getCurrent().isTerminated()) {
countDownLatch.countDown();
List<State.Type> stateHistoryList = stateHistory.get();
stateHistoryList.add(execution.getState().getCurrent());
stateHistory.set(stateHistoryList);
}
});
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-new-execution-flow-duration",
null,
null
);
Await.until(() -> countDownLatch.getCount() == 0, Duration.ofSeconds(2), Duration.ofMinutes(1));
assertThat(stateHistory.get(), containsInAnyOrder(State.Type.RETRIED, State.Type.RETRIED, State.Type.FAILED));
}
public void retryNewExecutionFlowAttempts() throws TimeoutException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<List<State.Type>> stateHistory = new AtomicReference<>(new ArrayList<>());
executionQueue.receive(either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("retry-new-execution-flow-attempts") && execution.getState().getCurrent().isTerminated()) {
countDownLatch.countDown();
List<State.Type> stateHistoryList = stateHistory.get();
stateHistoryList.add(execution.getState().getCurrent());
stateHistory.set(stateHistoryList);
}
});
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-new-execution-flow-attempts",
null,
null
);
Await.until(() -> countDownLatch.getCount() == 0, Duration.ofSeconds(2), Duration.ofMinutes(1));
assertThat(stateHistory.get(), containsInAnyOrder(State.Type.RETRIED, State.Type.RETRIED, State.Type.FAILED));
}
public void retryFailedTaskDuration() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-failed-task-duration",
null,
null
);
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(0).attemptNumber(), is(3));
}
public void retryFailedTaskAttempts() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-failed-task-attempts",
null,
null,
Duration.ofSeconds(20)
);
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(0).attemptNumber(), is(4));
}
public void retryFailedFlowDuration() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-failed-flow-duration",
null,
null
);
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(0).attemptNumber(), is(3));
}
public void retryFailedFlowAttempts() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-failed-flow-attempts",
null,
null
);
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(0).attemptNumber(), is(4));
}
}

View File

@@ -0,0 +1,12 @@
id: retry-failed-flow-attempts
namespace: io.kestra.tests
retry:
behavior: RETRY_FAILED_TASK
type: constant
maxAttempt: 4
interval: PT2S
tasks:
- id: fail
type: io.kestra.core.tasks.executions.Fail

View File

@@ -0,0 +1,12 @@
id: retry-failed-flow-duration
namespace: io.kestra.tests
retry:
behavior: RETRY_FAILED_TASK
type: constant
maxDuration: PT15S
interval: PT5S
tasks:
- id: fail
type: io.kestra.core.tasks.executions.Fail

View File

@@ -0,0 +1,12 @@
id: retry-failed-task-attempts
namespace: io.kestra.tests
tasks:
- id: fail
type: io.kestra.core.tasks.executions.Fail
retry:
behavior: RETRY_FAILED_TASK
type: constant
maxAttempt: 4
interval: PT2S

View File

@@ -0,0 +1,12 @@
id: retry-failed-task-duration
namespace: io.kestra.tests
tasks:
- id: fail
type: io.kestra.core.tasks.executions.Fail
retry:
behavior: RETRY_FAILED_TASK
type: constant
maxDuration: PT15S
interval: PT5S

View File

@@ -0,0 +1,12 @@
id: retry-new-execution-flow-attempts
namespace: io.kestra.tests
retry:
behavior: CREATE_NEW_EXECUTION
type: constant
maxAttempt: 3
interval: PT2S
tasks:
- id: fail
type: io.kestra.core.tasks.executions.Fail

View File

@@ -0,0 +1,12 @@
id: retry-new-execution-flow-duration
namespace: io.kestra.tests
retry:
behavior: CREATE_NEW_EXECUTION
type: constant
interval: PT5S
maxDuration: PT15S
tasks:
- id: fail
type: io.kestra.core.tasks.executions.Fail

View File

@@ -0,0 +1,12 @@
id: retry-new-execution-task-attempts
namespace: io.kestra.tests
tasks:
- id: fail
type: io.kestra.core.tasks.executions.Fail
retry:
behavior: CREATE_NEW_EXECUTION
type: constant
maxAttempt: 3
interval: PT2S

View File

@@ -0,0 +1,12 @@
id: retry-new-execution-task-duration
namespace: io.kestra.tests
tasks:
- id: fail
type: io.kestra.core.tasks.executions.Fail
retry:
behavior: CREATE_NEW_EXECUTION
type: constant
maxDuration: PT15S
interval: PT5S

View File

@@ -0,0 +1,15 @@
ALTER TABLE executions ALTER COLUMN "state_current" ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED'
) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.state.current'));

View File

@@ -11,4 +11,4 @@ ALTER TABLE executions MODIFY COLUMN `state_current` ENUM (
'CANCELLED',
'QUEUED',
'RETRYING'
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL;
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL;

View File

@@ -0,0 +1,15 @@
ALTER TABLE executions MODIFY COLUMN `state_current` ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED'
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TYPE state_type ADD VALUE IF NOT EXISTS 'RETRIED';

View File

@@ -22,23 +22,7 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.ExecutorService;
import io.kestra.core.runners.ExecutorState;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTaskRunning;
import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerRunning;
import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.services.AbstractFlowTriggerService;
@@ -855,7 +839,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
try {
// Handle paused tasks
if (executor.getExecution().findTaskRunByTaskRunId(executionDelay.getTaskRunId()).getState().getCurrent() == State.Type.PAUSED) {
if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW)) {
Execution markAsExecution = executionService.markAs(
pair.getKey(),
@@ -864,14 +848,19 @@ public class JdbcExecutor implements ExecutorInterface, Service {
);
executor = executor.withExecution(markAsExecution, "pausedRestart");
// Handle failed tasks
} else if (executor.getExecution().findTaskRunByTaskRunId(executionDelay.getTaskRunId()).getState().getCurrent().equals(State.Type.FAILED)) {
Execution newAttempt = executionService.retry(
}
// Handle failed tasks
else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESTART_FAILED_TASK)) {
Execution newAttempt = executionService.retryTask(
pair.getKey(),
executionDelay.getTaskRunId()
);
executor = executor.withExecution(newAttempt, "failedRetry");
executor = executor.withExecution(newAttempt, "retryFailedTask");
}
// Handle failed flow
else if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESTART_FAILED_FLOW)) {
Execution newExecution = executionService.replay(executor.getExecution(), null, null);
executor = executor.withExecution(newExecution, "retryFailedFlow");
}
} catch (Exception e) {
executor = handleFailedExecutionFromExecutor(executor, e);

View File

@@ -9,11 +9,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.*;
import io.kestra.core.tasks.flows.EachSequentialTest;
import io.kestra.core.tasks.flows.FlowCaseTest;
import io.kestra.core.tasks.flows.ForEachItemCaseTest;
import io.kestra.core.tasks.flows.PauseTest;
import io.kestra.core.tasks.flows.WorkingDirectoryTest;
import io.kestra.core.tasks.flows.*;
import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
@@ -83,6 +79,9 @@ public abstract class JdbcRunnerTest {
@Inject
private FlowConcurrencyCaseTest flowConcurrencyCaseTest;
@Inject
private RetryCaseTest retryCaseTest;
@BeforeAll
void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
@@ -321,4 +320,44 @@ public abstract class JdbcRunnerTest {
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
@Test
void retryNewExecutionTaskDuration() throws TimeoutException {
retryCaseTest.retryNewExecutionTaskDuration();
}
@Test
void retryNewExecutionTaskAttempts() throws TimeoutException {
retryCaseTest.retryNewExecutionTaskAttempts();
}
@Test
void retryNewExecutionFlowDuration() throws TimeoutException {
retryCaseTest.retryNewExecutionFlowDuration();
}
@Test
void retryNewExecutionFlowAttempts() throws TimeoutException {
retryCaseTest.retryNewExecutionFlowAttempts();
}
@Test
void retryFailedTaskDuration() throws TimeoutException {
retryCaseTest.retryFailedTaskDuration();
}
@Test
void retryFailedTaskAttempts() throws TimeoutException {
retryCaseTest.retryFailedTaskAttempts();
}
@Test
void retryFailedFlowDuration() throws TimeoutException {
retryCaseTest.retryFailedFlowDuration();
}
@Test
void retryFailedFlowAttempts() throws TimeoutException {
retryCaseTest.retryFailedFlowAttempts();
}
}

View File

@@ -232,7 +232,7 @@ public class MemoryExecutor implements ExecutorInterface {
EXECUTIONS.put(workerTaskResultDelay.getExecutionId(), executionState.from(markAsExecution));
executionQueue.emit(markAsExecution);
} else if (executionState.execution.findTaskRunByTaskRunId(workerTaskResultDelay.getTaskRunId()).getState().getCurrent().equals(State.Type.FAILED)) {
Execution newAttempt = executionService.retry(
Execution newAttempt = executionService.retryTask(
executionState.execution,
workerTaskResultDelay.getTaskRunId()
);

View File

@@ -140,7 +140,9 @@
{key: this.$t("created date"), value: this.execution.state.histories[0].date, date: true},
{key: this.$t("updated date"), value: this.stop(), date: true},
{key: this.$t("duration"), value: this.execution.state.histories, duration: true},
{key: this.$t("steps"), value: stepCount}
{key: this.$t("steps"), value: stepCount},
{key: this.$t("attempt"), value: this.execution.metadata.attemptNumber},
{key: this.$t("originalCreatedDate"), value: this.execution.metadata.originalCreatedDate, date: true},
];
if (this.execution.parentId) {

View File

@@ -606,7 +606,7 @@
},
"export": "Export as a ZIP file"
},
"continue backfill":"Continue the backfill",
"continue backfill": "Continue the backfill",
"delete backfill": "Delete the backfill",
"pause backfill": "Pause the backfill",
"backfill executions": "Backfill executions",
@@ -634,6 +634,7 @@
"false": "False",
"undefined": "Undefined",
"execution-include-non-terminated": "Include non-terminated executions?",
"execution-warn-deleting-still-running": "Executions still running will not be stopped and must be killed before being deleting."
"execution-warn-deleting-still-running": "Executions still running will not be stopped and must be killed before being deleting.",
"originalCreatedDate": "Original creation date"
}
}

View File

@@ -101,7 +101,7 @@
"add flow": "Ajouter un flow",
"from": "De",
"to": "à",
"steps": "étapes",
"steps": "Étapes",
"state": "État",
"search term in message": "Chercher dans le message",
"search": "Chercher",
@@ -610,6 +610,7 @@
"false": "Faux",
"undefined": "Non défini",
"execution-include-non-terminated": "Inclure les exécutions non terminées ?",
"execution-warn-deleting-still-running": "Les exécutions en cours ne seront pas arrêtées et doivent être stoppées avant d'être supprimées."
"execution-warn-deleting-still-running": "Les exécutions en cours ne seront pas arrêtées et doivent être stoppées avant d'être supprimées.",
"originalCreatedDate": "Date de création originale"
}
}

View File

@@ -109,6 +109,14 @@ const STATE = Object.freeze({
isRunning: false,
isKillable: true,
isFailed: false
},
RETRIED: {
name: "RETRIED",
colorClass: "gray",
icon: Refresh,
isRunning: false,
isKillable: false,
isFailed: false
}
});
@@ -161,6 +169,10 @@ export default class State {
return STATE.RETRYING.name;
}
static get RETRIED() {
return STATE.RETRIED.name;
}
static isRunning(state) {
return STATE[state] && STATE[state].isRunning;
}