feat(executor): handle parallel tasks

This commit is contained in:
tchiotludo
2019-08-30 21:40:45 +02:00
parent d57e30c0c0
commit 469632944f
21 changed files with 281 additions and 103 deletions

View File

@@ -20,6 +20,7 @@ public class Execution {
private String flowId;
@Wither
@Builder.Default
private List<TaskRun> taskRunList;
@Wither
@@ -41,10 +42,10 @@ public class Execution {
public TaskRun findTaskRunById(String id) {
Optional<TaskRun> find = this.taskRunList
.stream()
.filter(task -> task.getId().equals(id))
.filter(taskRun -> taskRun.getId().equals(id))
.findFirst();
if (!find.isPresent()) {
if (find.isEmpty()) {
throw new IllegalArgumentException("Can't find taskrun with id '" + id + "' on execution '" + this.id + "'");
}

View File

@@ -2,16 +2,26 @@ package org.floworc.core.executions;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.flows.State;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.queues.QueueMessage;
import org.floworc.core.tasks.FlowableTask;
import org.floworc.core.tasks.Task;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
public class ExecutionService {
public static Optional<List<TaskRun>> getNexts(Execution execution, List<Task> tasks) {
private QueueInterface<WorkerTask> workerTaskResultQueue;
public ExecutionService(QueueInterface<WorkerTask> workerTaskResultQueue) {
this.workerTaskResultQueue = workerTaskResultQueue;
}
public Optional<List<TaskRun>> getNexts(Execution execution, List<Task> tasks) {
if (tasks.size() == 0) {
throw new IllegalStateException("Invalid execution " + execution.getId() + " on flow " +
execution.getFlowId() + " with 0 task"
@@ -23,9 +33,23 @@ public class ExecutionService {
return Optional.of(tasks.get(0).toTaskRun(execution));
}
// all done
long terminatedCount = execution
// find tasks related to current tasks
List<TaskRun> taskRunList = execution
.getTaskRunList()
.stream()
.filter(taskRun -> tasks
.stream()
.anyMatch(task -> task.getId().equals(taskRun.getTaskId()))
)
.collect(Collectors.toList());
// first one on current list
if (taskRunList.size() == 0) {
return Optional.of(tasks.get(0).toTaskRun(execution));
}
// all done
long terminatedCount = taskRunList
.stream()
.filter(taskRun -> taskRun.getState().isTerninated())
.count();
@@ -34,25 +58,26 @@ public class ExecutionService {
return Optional.empty();
}
// find first running
Optional<TaskRun> firstRunning = execution
.getTaskRunList()
// find first running to handle child tasks
Optional<TaskRun> firstRunning = taskRunList
.stream()
.filter(taskRun -> taskRun.getState().isRunning())
.findFirst();
if (firstRunning.isPresent()) {
return tasks
.get(execution.getTaskRunList().indexOf(firstRunning.get()))
.getChildTaskRun(execution);
Task parent = tasks.get(taskRunList.indexOf(firstRunning.get()));
if (!(parent instanceof FlowableTask) || !((FlowableTask) parent).hasChildTasks()) {
return Optional.of(new ArrayList<>());
} else {
return this.handleChilds(parent, firstRunning.get(), execution);
}
}
// reverse
ArrayList<TaskRun> reverse = new ArrayList<>(execution.getTaskRunList());
ArrayList<TaskRun> reverse = new ArrayList<>(taskRunList);
Collections.reverse(reverse);
// find last created
Optional<TaskRun> lastCreated = reverse
.stream()
.filter(taskRun -> taskRun.getState().getCurrent() == State.Type.CREATED)
@@ -73,7 +98,7 @@ public class ExecutionService {
log.warn("Must find errors path");
return Optional.of(new ArrayList<>());
} else {
int index = execution.getTaskRunList().indexOf(lastTerminated.get());
int index = taskRunList.indexOf(lastTerminated.get());
if (tasks.size() > index - 1) {
return Optional.of(tasks
@@ -86,4 +111,37 @@ public class ExecutionService {
return Optional.of(new ArrayList<>());
}
private Optional<List<TaskRun>> handleChilds(Task parent, TaskRun parentTaskRun, Execution execution) {
if (!(parent instanceof FlowableTask) || !((FlowableTask) parent).hasChildTasks()) {
throw new IllegalArgumentException("Invalid parent tasks with no childs");
}
Optional<List<Task>> childs = ((FlowableTask) parent).getChildTasks(execution);
// no childs, just continue
if (childs.isEmpty()) {
return Optional.of(new ArrayList<>());
}
Optional<List<TaskRun>> nexts = this.getNexts(execution, childs.get());
// all childs are done, continue the main flow
if (nexts.isEmpty()) {
WorkerTask workerTask = WorkerTask.builder()
.taskRun(execution.findTaskRunById(parentTaskRun.getId()).withState(State.Type.SUCCESS))
.task(parent)
.build();
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
.key(workerTask.getTaskRun().getExecutionId())
.body(workerTask)
.build()
);
return Optional.of(new ArrayList<>());
}
return nexts;
}
}

View File

@@ -57,5 +57,4 @@ public class TaskRun {
.state(new State())
.build();
}
}

View File

@@ -39,10 +39,10 @@ public class Flow {
public Task findTaskById(String id) {
Optional<Task> find = this.tasks
.stream()
.filter(task -> task.getId().equals(id))
.flatMap(task -> task.findById(id).stream())
.findFirst();
if (!find.isPresent()) {
if (find.isEmpty()) {
throw new IllegalArgumentException("Can't find task with id '" + id + "' on flow '" + this.id + "'");
}

View File

@@ -1,5 +1,6 @@
package org.floworc.core.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Value;
import org.floworc.core.executions.TaskRun;
@@ -47,10 +48,12 @@ public class State {
return duration.substring(0, duration.length() - 4) + "s";
}
@JsonIgnore
public boolean isTerninated() {
return this.current == Type.SKIPPED || this.current == Type.FAILED || this.current == Type.SUCCESS;
}
@JsonIgnore
public boolean isRunning() {
return this.current == Type.RUNNING;
}

View File

@@ -11,8 +11,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ExecutionState implements Runnable {
private QueueInterface<Execution> executionQueue;
private QueueInterface<WorkerTask> workerTaskResultQueue;
private final QueueInterface<Execution> executionQueue;
private final QueueInterface<WorkerTask> workerTaskResultQueue;
private static ConcurrentHashMap<String, Execution> executions = new ConcurrentHashMap<>();
public ExecutionState(
@@ -31,7 +31,9 @@ public class ExecutionState implements Runnable {
if (execution.getState().isTerninated()) {
executions.remove(message.getKey());
} else {
executions.put(message.getKey(), execution);
synchronized (executionQueue) {
executions.put(message.getKey(), execution);
}
}
});
@@ -43,8 +45,8 @@ public class ExecutionState implements Runnable {
}
Execution execution = executions.get(taskRun.getExecutionId());
Execution newExecution = execution.withTaskRun(taskRun);
this.executionQueue.emit(
QueueMessage.<Execution>builder()
.key(newExecution.getId())

View File

@@ -19,15 +19,18 @@ public class Executor implements Runnable {
private QueueInterface<Execution> executionQueue;
private QueueInterface<WorkerTask> workerTaskQueue;
private RepositoryInterface repository;
private ExecutionService executionService;
public Executor(
QueueInterface<Execution> executionQueue,
QueueInterface<WorkerTask> workerTaskQueue,
RepositoryInterface repositoryInterface
RepositoryInterface repositoryInterface,
ExecutionService executionService
) {
this.executionQueue = executionQueue;
this.workerTaskQueue = workerTaskQueue;
this.repository = repositoryInterface;
this.executionService = executionService;
}
@Override
@@ -35,17 +38,19 @@ public class Executor implements Runnable {
this.executionQueue.receive(message -> {
Execution execution = message.getBody();
if (!execution.getState().isTerninated()) {
Flow flow = this.repository
.getFlowById(execution.getFlowId())
.orElseThrow(() -> new IllegalArgumentException("Invalid flow id '" + execution.getFlowId() + "'"));
ExecutionService.getNexts(execution, flow.getTasks())
.ifPresentOrElse(
nexts -> this.onNexts(flow, execution, nexts),
() -> this.onEnd(flow, execution)
);
if (execution.getState().isTerninated()) {
return;
}
Flow flow = this.repository
.getFlowById(execution.getFlowId())
.orElseThrow(() -> new IllegalArgumentException("Invalid flow id '" + execution.getFlowId() + "'"));
this.executionService.getNexts(execution, flow.getTasks())
.ifPresentOrElse(
nexts -> this.onNexts(flow, execution, nexts),
() -> this.onEnd(flow, execution)
);
});
}
@@ -59,8 +64,9 @@ public class Executor implements Runnable {
return;
} else {
flow.logger().trace(
"[execution: {}] Found nexts {}",
"[execution: {}] Found {} next(s) {}",
execution.getId(),
nexts.size(),
nexts
);
}
@@ -75,6 +81,7 @@ public class Executor implements Runnable {
executionTasksRun.addAll(nexts);
}
// update Execution
newExecution = execution.withTaskRunList(executionTasksRun);
if (execution.getState().getCurrent() == State.Type.CREATED) {
@@ -93,6 +100,7 @@ public class Executor implements Runnable {
.build()
);
// submit TaskRun
final Execution finalNewExecution = newExecution;
nexts.forEach(taskRun -> this.workerTaskQueue.emit(QueueMessage.<WorkerTask>builder()
.key(finalNewExecution.getId())

View File

@@ -4,6 +4,7 @@ import org.floworc.core.executions.WorkerTask;
import org.floworc.core.flows.State;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.queues.QueueMessage;
import org.floworc.core.tasks.RunnableTask;
public class Worker implements Runnable {
private QueueInterface<WorkerTask> workerTaskQueue;
@@ -23,9 +24,10 @@ public class Worker implements Runnable {
public void run(WorkerTask workerTask) {
workerTask.logger().info(
"[execution: {}] [taskrun: {}] Task started",
"[execution: {}] [taskrun: {}] Task {} started",
workerTask.getTaskRun().getExecutionId(),
workerTask.getTaskRun().getId()
workerTask.getTaskRun().getId(),
workerTask.getTask().getClass().getSimpleName()
);
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
@@ -34,29 +36,33 @@ public class Worker implements Runnable {
.build()
);
try {
workerTask.getTask().run();
if (workerTask.getTask() instanceof RunnableTask) {
RunnableTask task = (RunnableTask) workerTask.getTask();
try {
task.run();
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
.key(workerTask.getTaskRun().getExecutionId())
.body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.SUCCESS)))
.build()
);
} catch (Exception e) {
workerTask.logger().error("Failed task", e);
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
.key(workerTask.getTaskRun().getExecutionId())
.body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.SUCCESS)))
.build()
);
} catch (Exception e) {
workerTask.logger().error("Failed task", e);
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
.key(workerTask.getTaskRun().getExecutionId())
.body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.FAILED)))
.build()
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
.key(workerTask.getTaskRun().getExecutionId())
.body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.FAILED)))
.build()
);
}
workerTask.logger().info(
"[execution: {}] [taskrun: {}] Task {} completed in {}",
workerTask.getTaskRun().getExecutionId(),
workerTask.getTaskRun().getId(),
workerTask.getTask().getClass().getSimpleName(),
workerTask.getTaskRun().getState().humanDuration()
);
}
workerTask.logger().info(
"[execution: {}] [taskrun: {}] Task completed in {}",
workerTask.getTaskRun().getExecutionId(),
workerTask.getTaskRun().getId(),
workerTask.getTaskRun().getState().humanDuration()
);
}
}

View File

@@ -3,6 +3,7 @@ package org.floworc.core.runners.types;
import com.devskiller.friendly_id.FriendlyId;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.executions.Execution;
import org.floworc.core.executions.ExecutionService;
import org.floworc.core.executions.WorkerTask;
import org.floworc.core.flows.Flow;
import org.floworc.core.flows.State;
@@ -28,12 +29,15 @@ public class StandAloneRunner implements RunnerInterface {
private LocalQueue<WorkerTask> workerTaskResultQueue;
private LocalRepository localRepository;
private ThreadPoolExecutor poolExecutor;
private ExecutionService executionService;
public StandAloneRunner(File basePath) {
this.executionQueue = new LocalQueue<>(QueueName.EXECUTIONS);
this.workerTaskQueue = new LocalQueue<>(QueueName.WORKERS);
this.workerTaskResultQueue = new LocalQueue<>(QueueName.WORKERS_RESULT);
this.localRepository = new LocalRepository(basePath);
this.executionService = new ExecutionService(this.workerTaskResultQueue);
}
@Override
@@ -42,7 +46,8 @@ public class StandAloneRunner implements RunnerInterface {
poolExecutor.execute(new Executor(
this.executionQueue,
this.workerTaskQueue,
this.localRepository
this.localRepository,
this.executionService
));
poolExecutor.execute(new ExecutionState(

View File

@@ -0,0 +1,27 @@
package org.floworc.core.tasks;
import org.floworc.core.executions.Execution;
import java.util.List;
import java.util.Optional;
public interface FlowableTask {
default boolean hasChildTasks() {
return false;
}
/**
* Return list of childs tasks for current execution
*
* @param execution current execution to allow filtering of the task ready to be consumed
* @return list of task ready to be consumed: <ul>
* <li>{@link Optional#empty()}: no childs tasks or no more tasks available.</li>
* <li>{@link Optional#of(Object)} with empty list: no childs available for now, retry later.</li>
* <li>{@link Optional#of(Object)} with a non empty list: all childs that must be run now.</li>
* </ul>
*/
default Optional<List<Task>> getChildTasks(Execution execution) {
return Optional.empty();
}
}

View File

@@ -0,0 +1,5 @@
package org.floworc.core.tasks;
public interface RunnableTask {
Void run() throws Exception;
}

View File

@@ -27,16 +27,10 @@ abstract public class Task {
private List<Task> errors;
abstract public Void run() throws Exception;
public List<TaskRun> toTaskRun(Execution execution) {
return Collections.singletonList(TaskRun.of(execution, this));
}
public Optional<List<TaskRun>> getChildTaskRun(Execution execution) {
return Optional.of(new ArrayList<>());
}
public Optional<Task> findById(String id) {
if (this.getId().equals(id)) {
return Optional.of(this);

View File

@@ -1,16 +1,15 @@
package org.floworc.core.tasks.debugs;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.tasks.RunnableTask;
import org.floworc.core.tasks.Task;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
public class Echo extends Task {
public class Echo extends Task implements RunnableTask {
private String format;
@Override

View File

@@ -5,22 +5,27 @@ import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.executions.Execution;
import org.floworc.core.executions.ExecutionService;
import org.floworc.core.executions.TaskRun;
import org.floworc.core.tasks.FlowableTask;
import org.floworc.core.tasks.Task;
import javax.validation.Valid;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
public class Parallel extends Task {
public class Parallel extends Task implements FlowableTask {
private Integer concurrent;
@Valid
private List<Task> tasks;
@Valid
private List<Task> errors;
@Override
public Optional<Task> findById(String id) {
Optional<Task> superFind = super.findById(id);
@@ -37,14 +42,25 @@ public class Parallel extends Task {
}
@Override
public Optional<List<TaskRun>> getChildTaskRun(Execution execution) {
return ExecutionService.getNexts(execution, this.tasks);
public boolean hasChildTasks() {
return true;
}
@Override
public Void run() {
log.info("Starting '{}'", this.tasks);
public Optional<List<Task>> getChildTasks(Execution execution) {
List<Task> notFind = this.tasks
.stream()
.filter(task -> execution
.getTaskRunList()
.stream()
.anyMatch(taskRun -> !taskRun.getTaskId().equals(task.getId()))
)
.collect(Collectors.toList());
return null;
if (notFind.size() == 0) {
return Optional.empty();
}
return Optional.of(notFind);
}
}

View File

@@ -1,9 +1,8 @@
package org.floworc.core.tasks.flows;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.tasks.FlowableTask;
import org.floworc.core.tasks.Task;
import java.util.List;
@@ -13,15 +12,8 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
public class Switch extends Task {
public class Switch extends Task implements FlowableTask {
private Map<String, List<Task>> cases;
private List<Task> defaults;
@Override
public Void run() {
log.info("Starting '{}'", this);
return null;
}
}

View File

@@ -1,10 +1,10 @@
package org.floworc.core.tasks.scripts;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.tasks.RunnableTask;
import org.floworc.core.tasks.Task;
import java.io.BufferedReader;
@@ -14,12 +14,11 @@ import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@Value
@Slf4j
public class Bash extends Task {
public class Bash extends Task implements RunnableTask {
private String[] commands;
@Override

View File

@@ -0,0 +1,49 @@
package org.floworc.core.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Debug {
private static final String NAME = Thread.currentThread().getStackTrace()[2].getClassName();
private static final Logger LOGGER = LoggerFactory.getLogger(NAME);
private static ObjectMapper MAPPER = new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
private static String caller() {
return Thread.currentThread().getStackTrace()[3].getClassName() + " -> " +
Thread.currentThread().getStackTrace()[3].getMethodName() + " # " +
Thread.currentThread().getStackTrace()[3].getLineNumber();
}
public static <T> String toJson(T arg) {
String output;
if (arg instanceof String) {
output = (String) arg;
} else if (arg instanceof byte[]) {
output = new String((byte[]) arg);
} else {
try {
output = MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(arg);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
return output;
}
@SafeVarargs
public static <T> void log(T... args) {
LOGGER.trace("\033[44;30m " + caller() + " \033[0m");
for (Object arg : args) {
LOGGER.trace("\033[46;30m " + arg.getClass().getName() + " \033[0m " + toJson(arg));
}
}
}

View File

@@ -28,7 +28,7 @@ class StandAloneRunnerTest {
Flow flow = Utils.parse("flows/full.yaml");
Execution execution = runner.run(flow);
assertThat(execution.getTaskRunList(), hasSize(4));
assertThat(execution.getTaskRunList(), hasSize(13));
}
}

View File

@@ -18,7 +18,7 @@ class YamlFlowParserTest {
Flow flow = Utils.parse("flows/full.yaml");
assertThat(flow.getId(), is("full"));
assertThat(flow.getTasks().size(), is(4));
assertThat(flow.getTasks().size(), is(5));
// third with all optionnals
Task optionnals = flow.getTasks().get(2);

View File

@@ -1,5 +1,5 @@
id: full
namespace: com.leroymerlin.data.supply
namespace: org.floworc.tests
#triggers:
#- type: schedule
@@ -34,23 +34,38 @@ tasks:
- id: 4th - 1st
type: org.floworc.core.tasks.scripts.Bash
commands:
- echo "{{id}}"
- echo "4th - 1st {{id}}"
- id: 4th - 2nd
type: org.floworc.core.tasks.scripts.Bash
commands:
- echo "{{id}}"
- echo "4th - 2nd {{id}}"
- id: 4th - 3rd
type: org.floworc.core.tasks.scripts.Bash
commands:
- echo "{{id}}"
- echo "4th- 3rd {{id}}"
- id: 4th - 4th
type: org.floworc.core.tasks.scripts.Bash
commands:
- echo "{{id}}"
- id: 4th - 5th
type: org.floworc.core.tasks.scripts.Bash
commands:
- echo "{{id}}"
type: org.floworc.core.tasks.flows.Parallel
tasks:
- id: 4th - 4th - 1st
type: org.floworc.core.tasks.flows.Parallel
tasks:
- id: 4th - 4th - 1st - 1st
type: org.floworc.core.tasks.flows.Parallel
tasks:
- id: 4th - 4th - 1st - 1st - 1st
type: org.floworc.core.tasks.flows.Parallel
tasks:
- id: 4th - 4th - 1st - 1st - 1st - last
type: org.floworc.core.tasks.scripts.Bash
commands:
- echo "4th - 4th - 1st - 1st - 1st - last{{id}}"
- id: lastte
type: org.floworc.core.tasks.scripts.Bash
commands:
- 'echo "last"'
- 'sleep 1'
#- type: python
# commands:

View File

@@ -18,6 +18,6 @@
<appender-ref ref="STDOUT" />
</root>
<logger name="org.floworc" level="DEBUG" />
<logger name="flow" level="DEBUG" />
<logger name="org.floworc" level="INFO" />
<logger name="flow" level="INFO" />
</configuration>