mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(executor): handle errors tasks
This commit is contained in:
@@ -3,12 +3,14 @@ package org.floworc.core.executions;
|
||||
import lombok.*;
|
||||
import lombok.experimental.Wither;
|
||||
import org.floworc.core.flows.State;
|
||||
import org.floworc.core.tasks.Task;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Value
|
||||
@Builder
|
||||
@@ -20,7 +22,6 @@ public class Execution {
|
||||
private String flowId;
|
||||
|
||||
@Wither
|
||||
@Builder.Default
|
||||
private List<TaskRun> taskRunList;
|
||||
|
||||
@Wither
|
||||
@@ -52,6 +53,30 @@ public class Execution {
|
||||
return find.get();
|
||||
}
|
||||
|
||||
public List<TaskRun> findTaskRunByTask(List<Task> tasks) {
|
||||
List<String> taskIds = tasks
|
||||
.stream()
|
||||
.map(Task::getId)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return this.taskRunList
|
||||
.stream()
|
||||
.filter(taskRun -> taskIds.contains(taskRun.getTaskId()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public boolean hasFailed() {
|
||||
return this.taskRunList
|
||||
.stream()
|
||||
.anyMatch(taskRun -> taskRun.getState().isFailed());
|
||||
}
|
||||
|
||||
public boolean hasFailed(List<Task> tasks) {
|
||||
return this.findTaskRunByTask(tasks)
|
||||
.stream()
|
||||
.anyMatch(taskRun -> taskRun.getState().isFailed());
|
||||
}
|
||||
|
||||
public Execution withTaskRun(TaskRun taskRun) {
|
||||
ArrayList<TaskRun> newTaskRunList = new ArrayList<>(this.taskRunList);
|
||||
|
||||
|
||||
@@ -48,16 +48,6 @@ public class ExecutionService {
|
||||
return Optional.of(tasks.get(0).toTaskRun(execution));
|
||||
}
|
||||
|
||||
// all done
|
||||
long terminatedCount = taskRunList
|
||||
.stream()
|
||||
.filter(taskRun -> taskRun.getState().isTerninated())
|
||||
.count();
|
||||
|
||||
if (terminatedCount == tasks.size()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
// find first running to handle child tasks
|
||||
Optional<TaskRun> firstRunning = taskRunList
|
||||
.stream()
|
||||
@@ -93,16 +83,23 @@ public class ExecutionService {
|
||||
.filter(taskRun -> taskRun.getState().isTerninated())
|
||||
.findFirst();
|
||||
|
||||
if (lastTerminated.isPresent()) {
|
||||
if (lastTerminated.get().getState().getCurrent() == State.Type.FAILED) {
|
||||
log.warn("Must find errors path");
|
||||
return Optional.of(new ArrayList<>());
|
||||
} else {
|
||||
int index = taskRunList.indexOf(lastTerminated.get());
|
||||
// all done
|
||||
long terminatedCount = taskRunList
|
||||
.stream()
|
||||
.filter(taskRun -> taskRun.getState().isTerninated())
|
||||
.count();
|
||||
|
||||
if (tasks.size() > index - 1) {
|
||||
if (lastTerminated.isPresent()) {
|
||||
int lastIndex = taskRunList.indexOf(lastTerminated.get());
|
||||
|
||||
if (lastTerminated.get().getState().isFailed()) {
|
||||
return this.getNexts(execution, tasks.get(lastIndex).getErrors());
|
||||
} else if (terminatedCount == tasks.size()) {
|
||||
return Optional.empty();
|
||||
} else {
|
||||
if (tasks.size() > lastIndex - 1) {
|
||||
return Optional.of(tasks
|
||||
.get(index + 1)
|
||||
.get(lastIndex + 1)
|
||||
.toTaskRun(execution)
|
||||
);
|
||||
}
|
||||
@@ -129,7 +126,10 @@ public class ExecutionService {
|
||||
// all childs are done, continue the main flow
|
||||
if (nexts.isEmpty()) {
|
||||
WorkerTask workerTask = WorkerTask.builder()
|
||||
.taskRun(execution.findTaskRunById(parentTaskRun.getId()).withState(State.Type.SUCCESS))
|
||||
.taskRun(execution
|
||||
.findTaskRunById(parentTaskRun.getId())
|
||||
.withState(execution.hasFailed(childs.get()) ? State.Type.FAILED : State.Type.SUCCESS)
|
||||
)
|
||||
.task(parent)
|
||||
.build();
|
||||
|
||||
@@ -142,6 +142,7 @@ public class ExecutionService {
|
||||
return Optional.of(new ArrayList<>());
|
||||
}
|
||||
|
||||
// give more works
|
||||
return nexts;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,10 +42,21 @@ public class Flow {
|
||||
.flatMap(task -> task.findById(id).stream())
|
||||
.findFirst();
|
||||
|
||||
if (find.isEmpty()) {
|
||||
throw new IllegalArgumentException("Can't find task with id '" + id + "' on flow '" + this.id + "'");
|
||||
if (find.isPresent()) {
|
||||
return find.get();
|
||||
}
|
||||
|
||||
return find.get();
|
||||
if (this.errors != null) {
|
||||
Optional<Task> errors = this.errors
|
||||
.stream()
|
||||
.flatMap(task -> task.findById(id).stream())
|
||||
.findFirst();
|
||||
|
||||
if (errors.isPresent()) {
|
||||
return errors.get();
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Can't find task with id '" + id + "' on flow '" + this.id + "'");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,6 +58,11 @@ public class State {
|
||||
return this.current == Type.RUNNING;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isFailed() {
|
||||
return this.current == Type.FAILED;
|
||||
}
|
||||
|
||||
public enum Type {
|
||||
CREATED,
|
||||
RUNNING,
|
||||
|
||||
@@ -11,6 +11,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Slf4j
|
||||
public class ExecutionState implements Runnable {
|
||||
private final Object lock = new Object();
|
||||
private final QueueInterface<Execution> executionQueue;
|
||||
private final QueueInterface<WorkerTask> workerTaskResultQueue;
|
||||
private static ConcurrentHashMap<String, Execution> executions = new ConcurrentHashMap<>();
|
||||
@@ -26,34 +27,35 @@ public class ExecutionState implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
this.executionQueue.receive(message -> {
|
||||
Execution execution = message.getBody();
|
||||
synchronized (lock) {
|
||||
Execution execution = message.getBody();
|
||||
|
||||
if (execution.getState().isTerninated()) {
|
||||
executions.remove(message.getKey());
|
||||
} else {
|
||||
synchronized (executionQueue) {
|
||||
if (execution.getState().isTerninated()) {
|
||||
executions.remove(message.getKey());
|
||||
} else {
|
||||
executions.put(message.getKey(), execution);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.workerTaskResultQueue.receive(message -> {
|
||||
TaskRun taskRun = message.getBody().getTaskRun();
|
||||
synchronized (lock) {
|
||||
TaskRun taskRun = message.getBody().getTaskRun();
|
||||
|
||||
if (!executions.containsKey(taskRun.getExecutionId())) {
|
||||
throw new RuntimeException("Unable to find execution '" + taskRun.getExecutionId() + "' on ExecutionState");
|
||||
if (!executions.containsKey(taskRun.getExecutionId())) {
|
||||
throw new RuntimeException("Unable to find execution '" + taskRun.getExecutionId() + "' on ExecutionState");
|
||||
}
|
||||
|
||||
Execution execution = executions.get(taskRun.getExecutionId());
|
||||
Execution newExecution = execution.withTaskRun(taskRun);
|
||||
|
||||
this.executionQueue.emit(
|
||||
QueueMessage.<Execution>builder()
|
||||
.key(newExecution.getId())
|
||||
.body(newExecution)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
Execution execution = executions.get(taskRun.getExecutionId());
|
||||
Execution newExecution = execution.withTaskRun(taskRun);
|
||||
|
||||
this.executionQueue.emit(
|
||||
QueueMessage.<Execution>builder()
|
||||
.key(newExecution.getId())
|
||||
.body(newExecution)
|
||||
.build()
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -114,7 +114,9 @@ public class Executor implements Runnable {
|
||||
}
|
||||
|
||||
private void onEnd(Flow flow, Execution execution) {
|
||||
Execution newExecution = execution.withState(State.Type.SUCCESS);
|
||||
Execution newExecution = execution.withState(
|
||||
execution.hasFailed() ? State.Type.FAILED : State.Type.SUCCESS
|
||||
);
|
||||
|
||||
flow.logger().info(
|
||||
"[execution: {}] Flow completed with state {} in {}",
|
||||
|
||||
@@ -90,8 +90,7 @@ public class StandAloneRunner implements RunnerInterface {
|
||||
.build()
|
||||
);
|
||||
|
||||
|
||||
this.poolExecutor.awaitTermination(5, TimeUnit.MINUTES);
|
||||
this.poolExecutor.awaitTermination(1, TimeUnit.MINUTES);
|
||||
|
||||
return receive.get();
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface FlowableTask {
|
||||
|
||||
default boolean hasChildTasks() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -36,6 +36,13 @@ abstract public class Task {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
if (this.errors != null) {
|
||||
return this.errors
|
||||
.stream()
|
||||
.flatMap(task -> task.findById(id).stream())
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,11 +24,19 @@ class StandAloneRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void run() throws IOException, InterruptedException {
|
||||
void full() throws IOException, InterruptedException {
|
||||
Flow flow = Utils.parse("flows/full.yaml");
|
||||
Execution execution = runner.run(flow);
|
||||
|
||||
assertThat(execution.getTaskRunList(), hasSize(13));
|
||||
}
|
||||
|
||||
@Test
|
||||
void errors() throws IOException, InterruptedException {
|
||||
Flow flow = Utils.parse("flows/errors.yaml");
|
||||
Execution execution = runner.run(flow);
|
||||
|
||||
assertThat(execution.getTaskRunList(), hasSize(7));
|
||||
}
|
||||
|
||||
}
|
||||
29
core/src/test/resources/flows/errors.yaml
Normal file
29
core/src/test/resources/flows/errors.yaml
Normal file
@@ -0,0 +1,29 @@
|
||||
id: errors
|
||||
namespace: org.floworc.tests
|
||||
|
||||
tasks:
|
||||
- id: failed
|
||||
type: org.floworc.core.tasks.scripts.Bash
|
||||
commands:
|
||||
- 'exit 1'
|
||||
errors:
|
||||
- id: 2nd
|
||||
type: org.floworc.core.tasks.debugs.Echo
|
||||
format: second {{todo}}
|
||||
|
||||
- id: 3rd
|
||||
type: org.floworc.core.tasks.flows.Parallel
|
||||
tasks:
|
||||
- id: 3rd - 1st
|
||||
type: org.floworc.core.tasks.flows.Parallel
|
||||
tasks:
|
||||
- id: 3rd - 1st - 1st
|
||||
type: org.floworc.core.tasks.flows.Parallel
|
||||
tasks:
|
||||
- id: 3rd - 1st - 1st - 1st
|
||||
type: org.floworc.core.tasks.flows.Parallel
|
||||
tasks:
|
||||
- id: 3rd - 1st - 1st - 1st - last
|
||||
type: org.floworc.core.tasks.scripts.Bash
|
||||
commands:
|
||||
- echo "3rd - 1st - 1st - 1st - last{{id}}"
|
||||
Reference in New Issue
Block a user