feat(): retry revamp (#3322)

* feat(): init revamp

* feat(): first working iteration

* feat(): first working iteration

* feat(): cleanup code

* feat(): correct implem of new retrying state

* feat(): memoryExecutor + handle errors

* fix(core): handle allow failure

* fix(): better coverage + change random

* fix: review changes

* fix: prevent flow from continuing while retrying
This commit is contained in:
YannC
2024-03-25 10:35:45 +01:00
committed by GitHub
parent ec06079f6b
commit 96f1eef502
21 changed files with 316 additions and 117 deletions

View File

@@ -23,21 +23,15 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.Builder;
import lombok.Value;
import lombok.With;
import lombok.extern.slf4j.Slf4j;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.CRC32;
@@ -253,8 +247,16 @@ public class Execution implements DeletedInterface, TenantInterface {
List<TaskRun> errorsFlow = this.findTaskRunByTasks(resolvedErrors, parentTaskRun);
if (errorsFlow.size() > 0 || this.hasFailed(resolvedTasks, parentTaskRun)) {
// Check if flow has failed task
if (!errorsFlow.isEmpty() || this.hasFailed(resolvedTasks, parentTaskRun)) {
// Check if among the failed task, they will be retried
if (!this.hasFailedNoRetry(resolvedTasks, parentTaskRun)) {
return new ArrayList<>();
}
return resolvedErrors == null ? new ArrayList<>() : resolvedErrors;
}
return resolvedTasks;
@@ -399,6 +401,19 @@ public class Execution implements DeletedInterface, TenantInterface {
.anyMatch(taskRun -> taskRun.getState().isFailed());
}
public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
return this.findTaskRunByTasks(resolvedTasks, parentTaskRun)
.stream()
.anyMatch(taskRun -> {
ResolvedTask resolvedTask = resolvedTasks.stream().filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst().orElse(null);
if (resolvedTask == null) {
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'", taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask()) && taskRun.getState().isFailed();
});
}
public boolean hasCreated() {
return this.taskRunList != null && this.taskRunList
.stream()

View File

@@ -1,23 +1,20 @@
package io.kestra.core.models.executions;
import io.kestra.core.models.TenantInterface;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.With;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@ToString
@EqualsAndHashCode
@@ -220,4 +217,31 @@ public class TaskRun implements TenantInterface {
", state=" + this.getState().getCurrent().toString() +
")";
}
/**
*
* @param task Contains the retry configuration
* @return The next retry date, null if maxAttempt is reached
*/
public Instant nextRetryDate(Task task) {
if (this.attempts == null || this.attempts.isEmpty() || task.getRetry() == null || this.getAttempts().size() >= task.getRetry().getMaxAttempt()) {
return null;
}
Instant base = this.lastAttempt().getState().maxDate();
Instant nextDate = task.getRetry().nextRetryDate(this.attempts.size(), base);
if (task.getRetry().getMaxDuration() != null && nextDate.isAfter(this.lastAttempt().getState().minDate().plus(task.getRetry().getMaxDuration()))) {
return null;
}
return nextDate;
}
public boolean shouldBeRetried(Task task) {
return this.nextRetryDate(task) != null;
}
}

View File

@@ -108,6 +108,14 @@ public class State {
return this.histories.get(this.histories.size() - 1).getDate();
}
public Instant minDate() {
if (this.histories.size() == 0) {
return Instant.now();
}
return this.histories.get(0).getDate();
}
@JsonIgnore
public boolean isTerminated() {
return this.current.isTerminated();
@@ -140,6 +148,11 @@ public class State {
return this.current.isPaused();
}
@JsonIgnore
public boolean isRetrying() {
return this.current.isRetrying();
}
@JsonIgnore
public boolean isRestartable() {
return this.current.isFailed() || this.isPaused();
@@ -158,7 +171,8 @@ public class State {
FAILED,
KILLED,
CANCELLED,
QUEUED;
QUEUED,
RETRYING;
public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED;
@@ -179,6 +193,10 @@ public class State {
public boolean isPaused() {
return this == Type.PAUSED;
}
public boolean isRetrying() {
return this == Type.RETRYING;
}
}
@Value

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.tasks.retrys;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.constraints.Min;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -10,7 +11,7 @@ import lombok.experimental.SuperBuilder;
import net.jodah.failsafe.RetryPolicy;
import java.time.Duration;
import jakarta.validation.constraints.Min;
import java.time.Instant;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@@ -33,6 +34,8 @@ public abstract class AbstractRetry {
@Builder.Default
private Boolean warningOnRetry = false;
public abstract Instant nextRetryDate(Integer attemptCount, Instant lastAttempt);
public <T> RetryPolicy<T> toPolicy() {
RetryPolicy<T> policy = new RetryPolicy<>();

View File

@@ -1,6 +1,7 @@
package io.kestra.core.models.tasks.retrys;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -8,7 +9,7 @@ import lombok.experimental.SuperBuilder;
import net.jodah.failsafe.RetryPolicy;
import java.time.Duration;
import jakarta.validation.constraints.NotNull;
import java.time.Instant;
@SuperBuilder
@Getter
@@ -29,4 +30,9 @@ public class Constant extends AbstractRetry {
return policy
.withDelay(interval);
}
@Override
public Instant nextRetryDate(Integer attemptCount, Instant lastAttempt) {
return lastAttempt.plus(interval);
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.models.tasks.retrys;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -8,8 +9,8 @@ import lombok.experimental.SuperBuilder;
import net.jodah.failsafe.RetryPolicy;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import jakarta.validation.constraints.NotNull;
@SuperBuilder
@Getter
@@ -40,4 +41,18 @@ public class Exponential extends AbstractRetry {
return policy;
}
@Override
public Instant nextRetryDate(Integer attemptCount, Instant lastAttempt) {
Duration computedInterval = interval.multipliedBy(
(long) (this.delayFactor == null ? 2 : this.delayFactor.intValue()) * (attemptCount - 1)
);
Instant next = lastAttempt.plus(computedInterval);
if (next.isAfter(lastAttempt.plus(maxInterval))) {
return lastAttempt.plus(maxInterval);
}
return next;
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.models.tasks.retrys;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -8,8 +9,8 @@ import lombok.experimental.SuperBuilder;
import net.jodah.failsafe.RetryPolicy;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import jakarta.validation.constraints.NotNull;
@SuperBuilder
@Getter
@@ -33,4 +34,11 @@ public class Random extends AbstractRetry {
return policy
.withDelay(minInterval.toMillis(), maxInterval.toMillis(), ChronoUnit.MILLIS);
}
@Override
public Instant nextRetryDate(Integer attemptCount, Instant lastAttempt) {
java.util.Random random = new java.util.Random();
long randomMillis = random.nextLong(minInterval.toMillis(), maxInterval.toMillis());
return lastAttempt.plusMillis(randomMillis);
}
}

View File

@@ -1,22 +1,12 @@
package io.kestra.core.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.ExecutionUpdatableTask;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.*;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.LogService;
import io.kestra.core.tasks.flows.Pause;
@@ -28,13 +18,8 @@ import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -450,6 +435,7 @@ public class ExecutorService {
}
List<WorkerTaskResult> list = new ArrayList<>();
List<ExecutionDelay> executionDelays = new ArrayList<>();
for (TaskRun taskRun : executor.getExecution().getTaskRunList()) {
if (taskRun.getState().isRunning()) {
@@ -461,8 +447,29 @@ public class ExecutorService {
workerTaskResult.ifPresent(list::add);
}
/**
* Check if the task is failed and if it has a retry policy
*/
if (!executor.getExecution().getState().isRetrying() && taskRun.getState().isFailed() && executor.getFlow().findTaskByTaskId(taskRun.getTaskId()).getRetry() != null) {
Instant nextRetryDate = taskRun.nextRetryDate(executor.getFlow().findTaskByTaskId(taskRun.getTaskId()));
if (nextRetryDate != null) {
executionDelays.add(
ExecutionDelay.builder()
.taskRunId(taskRun.getId())
.executionId(executor.getExecution().getId())
.date(nextRetryDate)
.state(State.Type.RUNNING)
.build()
);
executor.withExecution(executor.getExecution().withState(State.Type.RETRYING), "handleRetryTask");
}
}
}
executor.withWorkerTaskDelays(executionDelays, "handleChildWorkerTaskResult");
if (list.isEmpty()) {
return executor;
}
@@ -555,7 +562,7 @@ public class ExecutorService {
}
private Executor handleEnd(Executor executor) {
if (executor.getExecution().getState().isTerminated() || executor.getExecution().getState().isPaused()) {
if (executor.getExecution().getState().isTerminated() || executor.getExecution().getState().isPaused() || executor.getExecution().getState().isRetrying()) {
return executor;
}

View File

@@ -18,7 +18,6 @@ import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueException;
@@ -44,7 +43,6 @@ import lombok.Getter;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.Timeout;
import org.slf4j.Logger;
import org.slf4j.event.Level;
@@ -433,56 +431,8 @@ public class Worker implements Service, Runnable, AutoCloseable {
AtomicReference<WorkerTask> current = new AtomicReference<>(workerTask);
// creating the retry can fail
RetryPolicy<WorkerTask> workerTaskRetryPolicy;
try {
workerTaskRetryPolicy = AbstractRetry.retryPolicy(workerTask.getTask().getRetry());
} catch(IllegalStateException e) {
WorkerTask finalWorkerTask = workerTask.fail();
WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask);
RunContext runContext = workerTask
.getRunContext()
.forWorker(this.applicationContext, workerTask);
runContext.logger().error("Exception while trying to build the retry policy", e);
this.workerTaskResultQueue.emit(workerTaskResult);
return workerTaskResult;
}
// run
WorkerTask finalWorkerTask = Failsafe
.with(workerTaskRetryPolicy
.handleResultIf(result -> result.getTaskRun().lastAttempt() != null &&
result.getTaskRun().lastAttempt().getState().getCurrent() == FAILED &&
!killedExecution.contains(result.getTaskRun().getExecutionId())
)
.onRetry(e -> {
WorkerTask lastResult = e.getLastResult();
if (cleanUp) {
lastResult.getRunContext().cleanup();
}
lastResult = this.cleanUpTransient(lastResult);
current.set(lastResult);
metricRegistry
.counter(
MetricRegistry.METRIC_WORKER_RETRYED_COUNT,
metricRegistry.tags(
current.get(),
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(e.getAttemptCount())
)
)
.increment();
this.workerTaskResultQueue.emit(
new WorkerTaskResult(lastResult)
);
})
)
.get(() -> this.runAttempt(current.get()));
WorkerTask finalWorkerTask = this.runAttempt(current.get());
// save dynamic WorkerResults since cleanUpTransient will remove them
List<WorkerTaskResult> dynamicWorkerResults = finalWorkerTask.getRunContext().dynamicWorkerResults();
@@ -511,7 +461,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
state = WARNING;
}
if (workerTask.getTask().isAllowFailure() && state.isFailed()) {
if (workerTask.getTask().isAllowFailure() && !finalWorkerTask.getTaskRun().shouldBeRetried(workerTask.getTask()) && state.isFailed()) {
state = WARNING;
}

View File

@@ -32,7 +32,6 @@ import lombok.Builder;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import reactor.core.publisher.Flux;
import java.io.IOException;
@@ -43,7 +42,8 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.*;
import static io.kestra.core.utils.Rethrow.throwFunction;
import static io.kestra.core.utils.Rethrow.throwPredicate;
@Singleton
@Slf4j
@@ -71,6 +71,27 @@ public class ExecutionService {
@Inject
private ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
/**
* Retry set the given taskRun in created state
* and return the execution in running state
**/
public Execution retry(Execution execution, String taskRunId) {
List<TaskRun> newTaskRuns = execution
.getTaskRunList()
.stream()
.map(taskRun -> {
if (taskRun.getId().equals(taskRunId)) {
return taskRun
.withState(State.Type.CREATED);
}
return taskRun;
})
.toList();
return execution.withTaskRunList(newTaskRuns).withState(State.Type.RUNNING);
}
public Execution restart(final Execution execution, @Nullable Integer revision) throws Exception {
if (!(execution.getState().isTerminated() || execution.getState().isPaused())) {
throw new IllegalStateException("Execution must be terminated to be restarted, " +

View File

@@ -8,8 +8,6 @@ import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -48,11 +46,30 @@ public class RetryTest extends AbstractMemoryRunnerTest {
}
@Test
void retryInvalid() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-invalid");
void retryRandom() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-random");
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), nullValue());
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(3));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
@Test
void retryExpo() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-expo");
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(3));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
@Test
void retryFail() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "retry-and-fail");
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
}

View File

@@ -144,7 +144,7 @@ public class PauseTest extends AbstractMemoryRunnerTest {
execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(5)
Duration.ofSeconds(10)
);
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

View File

@@ -0,0 +1,12 @@
id: retry-expo
namespace: io.kestra.tests
tasks:
- id: failed
type: io.kestra.core.tasks.executions.Fail
retry:
type: exponential
maxAttempt: 3
maxDuration: PT15S
interval: PT0.25s
maxInterval: PT5S

View File

@@ -0,0 +1,22 @@
id: retry-and-fail
namespace: io.kestra.tests
description: >
Retrying step should prevent following steps from executing.
tasks:
- id: keepFailingWithRetry
type: io.kestra.core.tasks.executions.Fail
description: Keep retrying with a simple retry defined.
retry:
type: constant
maxAttempt: 5
interval: PT2S
- id: thisTaskShouldNotBeExecuted
type: io.kestra.core.tasks.log.Log
message: "The previous task should fail"
errors:
- id: error
type: io.kestra.core.tasks.log.Log
message: once

View File

@@ -0,0 +1,12 @@
id: retry-random
namespace: io.kestra.tests
tasks:
- id: failed
type: io.kestra.core.tasks.executions.Fail
retry:
type: random
maxAttempt: 3
maxDuration: PT15S
minInterval: PT0.20S
maxInterval: PT0.50S

View File

@@ -0,0 +1,14 @@
ALTER TABLE executions ALTER COLUMN "state_current" ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING'
) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.state.current'));

View File

@@ -0,0 +1,14 @@
ALTER TABLE executions MODIFY COLUMN `state_current` ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING'
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TYPE state_type ADD VALUE IF NOT EXISTS 'RETRYING';

View File

@@ -818,7 +818,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
return taskDefaultService.injectDefaults(flow, execution);
}
private void executionDelaySend() {
/** ExecutionDelay is currently two type of execution :
* <br/>
* - Paused flow that will be restart after an interval/timeout
* <br/>
* - Failed flow that will be retried after an interval
**/
private void executionDelaySend() {
if (isShutdown) {
return;
}
@@ -828,6 +834,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
Executor executor = new Executor(pair.getLeft(), null);
try {
// Handle paused tasks
if (executor.getExecution().findTaskRunByTaskRunId(executionDelay.getTaskRunId()).getState().getCurrent() == State.Type.PAUSED) {
Execution markAsExecution = executionService.markAs(
@@ -837,6 +844,14 @@ public class JdbcExecutor implements ExecutorInterface, Service {
);
executor = executor.withExecution(markAsExecution, "pausedRestart");
// Handle failed tasks
} else if (executor.getExecution().findTaskRunByTaskRunId(executionDelay.getTaskRunId()).getState().getCurrent().equals(State.Type.FAILED)) {
Execution newAttempt = executionService.retry(
pair.getKey(),
executionDelay.getTaskRunId()
);
executor = executor.withExecution(newAttempt, "failedRetry");
}
} catch (Exception e) {
executor = handleFailedExecutionFromExecutor(executor, e);
@@ -858,7 +873,12 @@ public class JdbcExecutor implements ExecutorInterface, Service {
return taskRuns
.stream()
.anyMatch(taskRun -> {
String deduplicationKey = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue();
// As retry is now handled outside the worker,
// we now add the attempt size to the deduplication key
String deduplicationKey = taskRun.getParentTaskRunId() + "-" +
taskRun.getTaskId() + "-" +
taskRun.getValue() + "-" +
(taskRun.getAttempts() != null ? taskRun.getAttempts().size() : 0);
if (executorState.getChildDeduplication().containsKey(deduplicationKey)) {
log.trace("Duplicate Nexts on execution '{}' with key '{}'", execution.getId(), deduplicationKey);
@@ -871,7 +891,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
private boolean deduplicateWorkerTask(Execution execution, ExecutorState executorState, TaskRun taskRun) {
String deduplicationKey = taskRun.getId();
String deduplicationKey = taskRun.getId() +
(taskRun.getAttempts() != null ? taskRun.getAttempts().size() : 0);
State.Type current = executorState.getWorkerTaskDeduplication().get(deduplicationKey);
if (current == taskRun.getState().getCurrent()) {
@@ -886,7 +907,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
private boolean deduplicateSubflowExecution(Execution execution, ExecutorState executorState, TaskRun taskRun) {
// There can be multiple executions for the same task, so we need to deduplicated with the worker task execution iteration
String deduplicationKey = taskRun.getId() + (taskRun.getIteration() == null ? "" : "-" + taskRun.getIteration());
State.Type current = executorState.getSubflowExecutionDeduplication().get(deduplicationKey);
State.Type current = executorState.getSubflowExecutionDeduplication().get(deduplicationKey);
if (current == taskRun.getState().getCurrent()) {
log.trace("Duplicate SubflowExecution on execution '{}' for taskRun '{}', value '{}, taskId '{}'", execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId());

View File

@@ -3,11 +3,7 @@ package io.kestra.runner.memory;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
@@ -30,7 +26,6 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -136,7 +131,10 @@ public class MemoryExecutor implements ExecutorInterface {
return;
}
if (message.getTaskRunList() == null || message.getTaskRunList().isEmpty() || message.getState().isCreated()) {
if (message.getTaskRunList() == null ||
message.getTaskRunList().isEmpty()||
message.getState().isCreated() ||
message.getState().getHistories().get(message.getState().getHistories().size()-2).getState().equals(State.Type.RETRYING)) {
this.handleExecution(saveExecution(message));
}
}
@@ -235,6 +233,14 @@ public class MemoryExecutor implements ExecutorInterface {
);
EXECUTIONS.put(workerTaskResultDelay.getExecutionId(), executionState.from(markAsExecution));
executionQueue.emit(markAsExecution);
} else if (executionState.execution.findTaskRunByTaskRunId(workerTaskResultDelay.getTaskRunId()).getState().getCurrent().equals(State.Type.FAILED)) {
Execution newAttempt = executionService.retry(
executionState.execution,
workerTaskResultDelay.getTaskRunId()
);
EXECUTIONS.put(workerTaskResultDelay.getExecutionId(), executionState.from(newAttempt));
executionQueue.emit(newAttempt);
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -470,7 +476,7 @@ public class MemoryExecutor implements ExecutorInterface {
private boolean deduplicateWorkerTask(Execution execution, TaskRun taskRun) {
ExecutionState executionState = EXECUTIONS.get(execution.getId());
String deduplicationKey = taskRun.getExecutionId() + "-" + taskRun.getId();
String deduplicationKey = taskRun.getExecutionId() + "-" + taskRun.getId() + "-" + taskRun.attemptNumber();
State.Type current = executionState.workerTaskDeduplication.get(deduplicationKey);
if (current == taskRun.getState().getCurrent()) {
@@ -488,7 +494,7 @@ public class MemoryExecutor implements ExecutorInterface {
return taskRuns
.stream()
.anyMatch(taskRun -> {
String deduplicationKey = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue();
String deduplicationKey = taskRun.getParentTaskRunId() + "-" + taskRun.getTaskId() + "-" + taskRun.getValue() + taskRun.attemptNumber();
if (executionState.childDeduplication.containsKey(deduplicationKey)) {
log.trace("Duplicate Nexts on execution '{}' with key '{}'", execution.getId(), deduplicationKey);

View File

@@ -8,6 +8,7 @@ import SkipPreviousCircle from "vue-material-design-icons/SkipPreviousCircle.vue
import AlertCircle from "vue-material-design-icons/AlertCircle.vue";
import DotsVerticalCircle from "vue-material-design-icons/DotsVerticalCircle.vue";
import MotionPauseOutline from "vue-material-design-icons/MotionPauseOutline.vue";
import Refresh from "vue-material-design-icons/Refresh.vue";
import Cancel from "vue-material-design-icons/Cancel.vue";
import {cssVariable} from "./global"
@@ -100,6 +101,14 @@ const STATE = Object.freeze({
isRunning: false,
isKillable: false,
isFailed: false,
},
RETRYING: {
name: "RETRYING",
colorClass: "gray",
icon: Refresh,
isRunning: false,
isKillable: true,
isFailed: false
}
});
@@ -148,6 +157,10 @@ export default class State {
return STATE.QUEUED.name;
}
static get RETRYING() {
return STATE.RETRYING.name;
}
static isRunning(state) {
return STATE[state] && STATE[state].isRunning;
}