fix(core): there can be multiple ExecutionDelay in an execution (#2258)

there can be multiple ExecutionDelay in an execution, so we must have an UID that contains both the executionId and the taskRunId as key.

close #1223
This commit is contained in:
Loïc Mathieu
2023-10-26 08:43:53 +02:00
committed by GitHub
parent 04aaa69298
commit f0023e2e84
8 changed files with 59 additions and 8 deletions

View File

@@ -38,7 +38,7 @@ public class QueueService {
} else if (object.getClass() == WorkerTaskExecution.class) {
return ((WorkerTaskExecution) object).getExecution().getId();
} else if (object.getClass() == ExecutionDelay.class) {
return ((ExecutionDelay) object).getExecutionId();
return ((ExecutionDelay) object).uid();
} else if (object.getClass() == ExecutorState.class) {
return ((ExecutorState) object).getExecutionId();
} else if (object.getClass() == Setting.class) {

View File

@@ -1,11 +1,14 @@
package io.kestra.core.runners;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import java.time.Instant;
import java.util.Optional;
import javax.validation.constraints.NotNull;
@Value
@@ -22,4 +25,9 @@ public class ExecutionDelay {
Instant date;
@NotNull State.Type state;
@JsonIgnore
public String uid() {
return String.join("_", executionId, taskRunId);
}
}

View File

@@ -215,7 +215,7 @@ public class ExecutionService {
if (!isFlowable || s.equals(taskRunId)) {
TaskRun newTaskRun = originalTaskRun.withState(newState);
if (originalTaskRun.getAttempts() != null && originalTaskRun.getAttempts().size() > 0) {
if (originalTaskRun.getAttempts() != null && !originalTaskRun.getAttempts().isEmpty()) {
ArrayList<TaskRunAttempt> attempts = new ArrayList<>(originalTaskRun.getAttempts());
attempts.set(attempts.size() - 1, attempts.get(attempts.size() - 1).withState(newState));
newTaskRun = newTaskRun.withAttempts(attempts);
@@ -227,6 +227,11 @@ public class ExecutionService {
}
}
if (newExecution.getTaskRunList().stream().anyMatch(t -> t.getState().getCurrent() == State.Type.PAUSED)) {
// there is still some tasks paused, this can occur with parallel pause
return newExecution;
}
return newExecution
.withState(State.Type.RESTARTED);
}

View File

@@ -1,19 +1,23 @@
package io.kestra.core.tasks.flows;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
public class EachParallelTest extends AbstractMemoryRunnerTest {
@Test
void parallel() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "each-parallel");
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(8));
}
@@ -21,6 +25,7 @@ public class EachParallelTest extends AbstractMemoryRunnerTest {
void parallelNested() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "each-parallel-nested");
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(11));
}
}

View File

@@ -16,6 +16,7 @@ import org.junitpioneer.jupiter.RetryingTest;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
@@ -35,6 +36,11 @@ public class PauseTest extends AbstractMemoryRunnerTest {
suite.runDelay(runnerUtils);
}
@Test
void parallelDelay() throws Exception {
suite.runParallelDelay(runnerUtils);
}
@Test
void timeout() throws Exception {
suite.runTimeout(runnerUtils);
@@ -91,6 +97,13 @@ public class PauseTest extends AbstractMemoryRunnerTest {
assertThat(execution.getTaskRunList(), hasSize(3));
}
public void runParallelDelay(RunnerUtils runnerUtils) throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel-pause", Duration.ofMinutes(5));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(7));
}
public void runTimeout(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause-timeout");

View File

@@ -0,0 +1,15 @@
id: each-parallel-pause
namespace: io.kestra.tests
tasks:
- id: each
type: io.kestra.core.tasks.flows.EachParallel
value: ["toto", "tata", "titi"]
tasks:
- id: pause
type: io.kestra.core.tasks.flows.Pause
delay: PT1S
tasks:
- id: log
type: io.kestra.core.tasks.log.Log
message: Hello World

View File

@@ -241,6 +241,11 @@ public abstract class JdbcRunnerTest {
pauseTest.runDelay(runnerUtils);
}
@Test
public void pauseRunParallelDelay() throws Exception {
pauseTest.runParallelDelay(runnerUtils);
}
@Test
public void pauseRunTimeout() throws Exception {
pauseTest.runTimeout(runnerUtils);

View File

@@ -157,7 +157,7 @@ public class MemoryExecutor implements ExecutorInterface {
executor = executorService.process(executor);
if (executor.getNexts().size() > 0 && deduplicateNexts(execution, executor.getNexts())) {
if (!executor.getNexts().isEmpty() && deduplicateNexts(execution, executor.getNexts())) {
executor.withExecution(
executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()),
"onNexts"
@@ -170,10 +170,10 @@ public class MemoryExecutor implements ExecutorInterface {
toExecution(executor);
}
if (executor.getWorkerTasks().size() > 0) {
if (!executor.getWorkerTasks().isEmpty()) {
List<WorkerTask> workerTasksDedup = executor.getWorkerTasks().stream()
.filter(workerTask -> this.deduplicateWorkerTask(execution, workerTask.getTaskRun()))
.collect(Collectors.toList());
.toList();
// Send WorkerTask not flowable to the worker
workerTasksDedup
@@ -189,12 +189,12 @@ public class MemoryExecutor implements ExecutorInterface {
.forEach(workerTaskResultQueue::emit);
}
if (executor.getWorkerTaskResults().size() > 0) {
if (!executor.getWorkerTaskResults().isEmpty()) {
executor.getWorkerTaskResults()
.forEach(workerTaskResultQueue::emit);
}
if (executor.getExecutionDelays().size() > 0) {
if (!executor.getExecutionDelays().isEmpty()) {
executor.getExecutionDelays()
.forEach(workerTaskResultDelay -> {
long between = ChronoUnit.MICROS.between(Instant.now(), workerTaskResultDelay.getDate());
@@ -214,7 +214,7 @@ public class MemoryExecutor implements ExecutorInterface {
workerTaskResultDelay.getTaskRunId(),
workerTaskResultDelay.getState()
);
EXECUTIONS.put(workerTaskResultDelay.getExecutionId(), executionState.from(markAsExecution));
executionQueue.emit(markAsExecution);
}
} catch (Exception e) {