feat(core): allow listeners to continue even if a listeners failed (#217)

This commit is contained in:
Ludovic DEHON
2020-11-18 09:48:51 +01:00
committed by GitHub
parent a9e569b8df
commit d2684ca644
7 changed files with 122 additions and 4 deletions

View File

@@ -23,6 +23,7 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.CRC32;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
@Value
@@ -177,7 +178,7 @@ public class Execution implements DeletedInterface {
* @param parentTaskRun the parent task
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks, List<ResolvedTask> resolvedErrors, TaskRun parentTaskRun) {
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks, @Nullable List<ResolvedTask> resolvedErrors, TaskRun parentTaskRun) {
resolvedTasks = removeDisabled(resolvedTasks);
resolvedErrors = removeDisabled(resolvedErrors);
@@ -187,6 +188,13 @@ public class Execution implements DeletedInterface {
return resolvedErrors == null ? new ArrayList<>() : resolvedErrors;
}
return resolvedTasks;
}
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks) {
resolvedTasks = removeDisabled(resolvedTasks);
return resolvedTasks;
}

View File

@@ -367,8 +367,7 @@ public abstract class AbstractExecutor implements Runnable {
return this.saveFlowableOutput(
FlowableUtils.resolveSequentialNexts(
execution,
currentTasks,
new ArrayList<>()
currentTasks
),
flow,
execution,

View File

@@ -16,12 +16,23 @@ import java.util.stream.Collectors;
@Slf4j
public class FlowableUtils {
public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks);
return FlowableUtils.innerResolveSequentialNexts(execution, currentTasks, null);
}
public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors
) {
return FlowableUtils.resolveSequentialNexts(execution, tasks, errors, null);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, null);
return FlowableUtils.innerResolveSequentialNexts(execution, currentTasks, null);
}
public static List<NextTaskRun> resolveSequentialNexts(
@@ -32,6 +43,14 @@ public class FlowableUtils {
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, parentTaskRun);
return FlowableUtils.innerResolveSequentialNexts(execution, currentTasks, parentTaskRun);
}
private static List<NextTaskRun> innerResolveSequentialNexts(
Execution execution,
List<ResolvedTask> currentTasks,
TaskRun parentTaskRun
) {
// nothing
if (currentTasks == null || currentTasks.size() == 0) {
return new ArrayList<>();

View File

@@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.flows.State;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -19,6 +20,9 @@ public class ListenersTest extends AbstractMemoryRunnerTest {
private void initListeners() throws IOException, URISyntaxException {
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/listeners.yaml")));
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/listeners-flowable.yaml")));
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/listeners-multiple.yaml")));
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/listeners-multiple-failed.yaml")));
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/listeners-failed.yaml")));
}
@Test
@@ -78,4 +82,44 @@ public class ListenersTest extends AbstractMemoryRunnerTest {
assertThat(execution.getTaskRunList().get(2).getTaskId(), is("execution"));
assertThat(execution.getTaskRunList().get(2).getOutputs().get("value"), is(execution.getTaskRunList().get(1).getId()));
}
@Test
void multipleListeners() throws TimeoutException {
Execution execution = runnerUtils.runOne(
"org.kestra.tests",
"listeners-multiple"
);
assertThat(execution.getTaskRunList().size(), is(3));
assertThat(execution.getTaskRunList().get(1).getTaskId(), is("l1"));
assertThat(execution.getTaskRunList().get(2).getTaskId(), is("l2"));
}
@Test
void failedListeners() throws TimeoutException {
Execution execution = runnerUtils.runOne(
"org.kestra.tests",
"listeners-failed"
);
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList().size(), is(2));
assertThat(execution.getTaskRunList().get(1).getTaskId(), is("ko"));
assertThat(execution.getTaskRunList().get(1).getState().getCurrent(), is(State.Type.FAILED));
}
@Test
void failedMultipleListeners() throws TimeoutException {
Execution execution = runnerUtils.runOne(
"org.kestra.tests",
"listeners-multiple-failed"
);
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList().size(), is(3));
assertThat(execution.getTaskRunList().get(1).getTaskId(), is("ko"));
assertThat(execution.getTaskRunList().get(1).getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(2).getTaskId(), is("l2"));
assertThat(execution.getTaskRunList().get(2).getState().getCurrent(), is(State.Type.SUCCESS));
}
}

View File

@@ -0,0 +1,13 @@
id: listeners-failed
namespace: org.kestra.tests
listeners:
- tasks:
- id: ko
type: org.kestra.core.tasks.scripts.Bash
commands:
- 'exit 1'
tasks:
- id: first
type: org.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"

View File

@@ -0,0 +1,18 @@
id: listeners-multiple-failed
namespace: org.kestra.tests
listeners:
- tasks:
- id: ko
type: org.kestra.core.tasks.scripts.Bash
commands:
- 'exit 1'
- tasks:
- id: l2
type: org.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"
tasks:
- id: first
type: org.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"

View File

@@ -0,0 +1,17 @@
id: listeners-multiple
namespace: org.kestra.tests
listeners:
- tasks:
- id: l1
type: org.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"
- tasks:
- id: l2
type: org.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"
tasks:
- id: first
type: org.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"