diff --git a/core/src/main/java/org/floworc/core/executions/Execution.java b/core/src/main/java/org/floworc/core/executions/Execution.java index 09553617a0..6a8f536d28 100644 --- a/core/src/main/java/org/floworc/core/executions/Execution.java +++ b/core/src/main/java/org/floworc/core/executions/Execution.java @@ -20,6 +20,7 @@ public class Execution { private String flowId; @Wither + @Builder.Default private List taskRunList; @Wither @@ -41,10 +42,10 @@ public class Execution { public TaskRun findTaskRunById(String id) { Optional find = this.taskRunList .stream() - .filter(task -> task.getId().equals(id)) + .filter(taskRun -> taskRun.getId().equals(id)) .findFirst(); - if (!find.isPresent()) { + if (find.isEmpty()) { throw new IllegalArgumentException("Can't find taskrun with id '" + id + "' on execution '" + this.id + "'"); } diff --git a/core/src/main/java/org/floworc/core/executions/ExecutionService.java b/core/src/main/java/org/floworc/core/executions/ExecutionService.java index a419f73ad3..45ba3b7b16 100644 --- a/core/src/main/java/org/floworc/core/executions/ExecutionService.java +++ b/core/src/main/java/org/floworc/core/executions/ExecutionService.java @@ -2,16 +2,26 @@ package org.floworc.core.executions; import lombok.extern.slf4j.Slf4j; import org.floworc.core.flows.State; +import org.floworc.core.queues.QueueInterface; +import org.floworc.core.queues.QueueMessage; +import org.floworc.core.tasks.FlowableTask; import org.floworc.core.tasks.Task; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; @Slf4j public class ExecutionService { - public static Optional> getNexts(Execution execution, List tasks) { + private QueueInterface workerTaskResultQueue; + + public ExecutionService(QueueInterface workerTaskResultQueue) { + this.workerTaskResultQueue = workerTaskResultQueue; + } + + public Optional> getNexts(Execution execution, List tasks) { if (tasks.size() == 0) { throw new IllegalStateException("Invalid execution " + execution.getId() + " on flow " + execution.getFlowId() + " with 0 task" @@ -23,9 +33,23 @@ public class ExecutionService { return Optional.of(tasks.get(0).toTaskRun(execution)); } - // all done - long terminatedCount = execution + // find tasks related to current tasks + List taskRunList = execution .getTaskRunList() + .stream() + .filter(taskRun -> tasks + .stream() + .anyMatch(task -> task.getId().equals(taskRun.getTaskId())) + ) + .collect(Collectors.toList()); + + // first one on current list + if (taskRunList.size() == 0) { + return Optional.of(tasks.get(0).toTaskRun(execution)); + } + + // all done + long terminatedCount = taskRunList .stream() .filter(taskRun -> taskRun.getState().isTerninated()) .count(); @@ -34,25 +58,26 @@ public class ExecutionService { return Optional.empty(); } - // find first running - Optional firstRunning = execution - .getTaskRunList() + // find first running to handle child tasks + Optional firstRunning = taskRunList .stream() .filter(taskRun -> taskRun.getState().isRunning()) .findFirst(); if (firstRunning.isPresent()) { - return tasks - .get(execution.getTaskRunList().indexOf(firstRunning.get())) - .getChildTaskRun(execution); + Task parent = tasks.get(taskRunList.indexOf(firstRunning.get())); + if (!(parent instanceof FlowableTask) || !((FlowableTask) parent).hasChildTasks()) { + return Optional.of(new ArrayList<>()); + } else { + return this.handleChilds(parent, firstRunning.get(), execution); + } } // reverse - ArrayList reverse = new ArrayList<>(execution.getTaskRunList()); + ArrayList reverse = new ArrayList<>(taskRunList); Collections.reverse(reverse); // find last created - Optional lastCreated = reverse .stream() .filter(taskRun -> taskRun.getState().getCurrent() == State.Type.CREATED) @@ -73,7 +98,7 @@ public class ExecutionService { log.warn("Must find errors path"); return Optional.of(new ArrayList<>()); } else { - int index = execution.getTaskRunList().indexOf(lastTerminated.get()); + int index = taskRunList.indexOf(lastTerminated.get()); if (tasks.size() > index - 1) { return Optional.of(tasks @@ -86,4 +111,37 @@ public class ExecutionService { return Optional.of(new ArrayList<>()); } + + private Optional> handleChilds(Task parent, TaskRun parentTaskRun, Execution execution) { + if (!(parent instanceof FlowableTask) || !((FlowableTask) parent).hasChildTasks()) { + throw new IllegalArgumentException("Invalid parent tasks with no childs"); + } + + Optional> childs = ((FlowableTask) parent).getChildTasks(execution); + + // no childs, just continue + if (childs.isEmpty()) { + return Optional.of(new ArrayList<>()); + } + + Optional> nexts = this.getNexts(execution, childs.get()); + + // all childs are done, continue the main flow + if (nexts.isEmpty()) { + WorkerTask workerTask = WorkerTask.builder() + .taskRun(execution.findTaskRunById(parentTaskRun.getId()).withState(State.Type.SUCCESS)) + .task(parent) + .build(); + + this.workerTaskResultQueue.emit(QueueMessage.builder() + .key(workerTask.getTaskRun().getExecutionId()) + .body(workerTask) + .build() + ); + + return Optional.of(new ArrayList<>()); + } + + return nexts; + } } diff --git a/core/src/main/java/org/floworc/core/executions/TaskRun.java b/core/src/main/java/org/floworc/core/executions/TaskRun.java index 3f69336399..30d193faed 100644 --- a/core/src/main/java/org/floworc/core/executions/TaskRun.java +++ b/core/src/main/java/org/floworc/core/executions/TaskRun.java @@ -57,5 +57,4 @@ public class TaskRun { .state(new State()) .build(); } - } diff --git a/core/src/main/java/org/floworc/core/flows/Flow.java b/core/src/main/java/org/floworc/core/flows/Flow.java index d793f928de..3b55872460 100644 --- a/core/src/main/java/org/floworc/core/flows/Flow.java +++ b/core/src/main/java/org/floworc/core/flows/Flow.java @@ -39,10 +39,10 @@ public class Flow { public Task findTaskById(String id) { Optional find = this.tasks .stream() - .filter(task -> task.getId().equals(id)) + .flatMap(task -> task.findById(id).stream()) .findFirst(); - if (!find.isPresent()) { + if (find.isEmpty()) { throw new IllegalArgumentException("Can't find task with id '" + id + "' on flow '" + this.id + "'"); } diff --git a/core/src/main/java/org/floworc/core/flows/State.java b/core/src/main/java/org/floworc/core/flows/State.java index 441c53f85c..5a1c432df8 100644 --- a/core/src/main/java/org/floworc/core/flows/State.java +++ b/core/src/main/java/org/floworc/core/flows/State.java @@ -1,5 +1,6 @@ package org.floworc.core.flows; +import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Value; import org.floworc.core.executions.TaskRun; @@ -47,10 +48,12 @@ public class State { return duration.substring(0, duration.length() - 4) + "s"; } + @JsonIgnore public boolean isTerninated() { return this.current == Type.SKIPPED || this.current == Type.FAILED || this.current == Type.SUCCESS; } + @JsonIgnore public boolean isRunning() { return this.current == Type.RUNNING; } diff --git a/core/src/main/java/org/floworc/core/runners/ExecutionState.java b/core/src/main/java/org/floworc/core/runners/ExecutionState.java index c19df5035c..3b11edde49 100644 --- a/core/src/main/java/org/floworc/core/runners/ExecutionState.java +++ b/core/src/main/java/org/floworc/core/runners/ExecutionState.java @@ -11,8 +11,8 @@ import java.util.concurrent.ConcurrentHashMap; @Slf4j public class ExecutionState implements Runnable { - private QueueInterface executionQueue; - private QueueInterface workerTaskResultQueue; + private final QueueInterface executionQueue; + private final QueueInterface workerTaskResultQueue; private static ConcurrentHashMap executions = new ConcurrentHashMap<>(); public ExecutionState( @@ -31,7 +31,9 @@ public class ExecutionState implements Runnable { if (execution.getState().isTerninated()) { executions.remove(message.getKey()); } else { - executions.put(message.getKey(), execution); + synchronized (executionQueue) { + executions.put(message.getKey(), execution); + } } }); @@ -43,8 +45,8 @@ public class ExecutionState implements Runnable { } Execution execution = executions.get(taskRun.getExecutionId()); - Execution newExecution = execution.withTaskRun(taskRun); + this.executionQueue.emit( QueueMessage.builder() .key(newExecution.getId()) diff --git a/core/src/main/java/org/floworc/core/runners/Executor.java b/core/src/main/java/org/floworc/core/runners/Executor.java index 52539f762f..2224d1f6af 100644 --- a/core/src/main/java/org/floworc/core/runners/Executor.java +++ b/core/src/main/java/org/floworc/core/runners/Executor.java @@ -19,15 +19,18 @@ public class Executor implements Runnable { private QueueInterface executionQueue; private QueueInterface workerTaskQueue; private RepositoryInterface repository; + private ExecutionService executionService; public Executor( QueueInterface executionQueue, QueueInterface workerTaskQueue, - RepositoryInterface repositoryInterface + RepositoryInterface repositoryInterface, + ExecutionService executionService ) { this.executionQueue = executionQueue; this.workerTaskQueue = workerTaskQueue; this.repository = repositoryInterface; + this.executionService = executionService; } @Override @@ -35,17 +38,19 @@ public class Executor implements Runnable { this.executionQueue.receive(message -> { Execution execution = message.getBody(); - if (!execution.getState().isTerninated()) { - Flow flow = this.repository - .getFlowById(execution.getFlowId()) - .orElseThrow(() -> new IllegalArgumentException("Invalid flow id '" + execution.getFlowId() + "'")); - - ExecutionService.getNexts(execution, flow.getTasks()) - .ifPresentOrElse( - nexts -> this.onNexts(flow, execution, nexts), - () -> this.onEnd(flow, execution) - ); + if (execution.getState().isTerninated()) { + return; } + + Flow flow = this.repository + .getFlowById(execution.getFlowId()) + .orElseThrow(() -> new IllegalArgumentException("Invalid flow id '" + execution.getFlowId() + "'")); + + this.executionService.getNexts(execution, flow.getTasks()) + .ifPresentOrElse( + nexts -> this.onNexts(flow, execution, nexts), + () -> this.onEnd(flow, execution) + ); }); } @@ -59,8 +64,9 @@ public class Executor implements Runnable { return; } else { flow.logger().trace( - "[execution: {}] Found nexts {}", + "[execution: {}] Found {} next(s) {}", execution.getId(), + nexts.size(), nexts ); } @@ -75,6 +81,7 @@ public class Executor implements Runnable { executionTasksRun.addAll(nexts); } + // update Execution newExecution = execution.withTaskRunList(executionTasksRun); if (execution.getState().getCurrent() == State.Type.CREATED) { @@ -93,6 +100,7 @@ public class Executor implements Runnable { .build() ); + // submit TaskRun final Execution finalNewExecution = newExecution; nexts.forEach(taskRun -> this.workerTaskQueue.emit(QueueMessage.builder() .key(finalNewExecution.getId()) diff --git a/core/src/main/java/org/floworc/core/runners/Worker.java b/core/src/main/java/org/floworc/core/runners/Worker.java index dbee28a4a4..cbc3ecbb7c 100644 --- a/core/src/main/java/org/floworc/core/runners/Worker.java +++ b/core/src/main/java/org/floworc/core/runners/Worker.java @@ -4,6 +4,7 @@ import org.floworc.core.executions.WorkerTask; import org.floworc.core.flows.State; import org.floworc.core.queues.QueueInterface; import org.floworc.core.queues.QueueMessage; +import org.floworc.core.tasks.RunnableTask; public class Worker implements Runnable { private QueueInterface workerTaskQueue; @@ -23,9 +24,10 @@ public class Worker implements Runnable { public void run(WorkerTask workerTask) { workerTask.logger().info( - "[execution: {}] [taskrun: {}] Task started", + "[execution: {}] [taskrun: {}] Task {} started", workerTask.getTaskRun().getExecutionId(), - workerTask.getTaskRun().getId() + workerTask.getTaskRun().getId(), + workerTask.getTask().getClass().getSimpleName() ); this.workerTaskResultQueue.emit(QueueMessage.builder() @@ -34,29 +36,33 @@ public class Worker implements Runnable { .build() ); - try { - workerTask.getTask().run(); + if (workerTask.getTask() instanceof RunnableTask) { + RunnableTask task = (RunnableTask) workerTask.getTask(); + try { + task.run(); - this.workerTaskResultQueue.emit(QueueMessage.builder() - .key(workerTask.getTaskRun().getExecutionId()) - .body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.SUCCESS))) - .build() - ); - } catch (Exception e) { - workerTask.logger().error("Failed task", e); + this.workerTaskResultQueue.emit(QueueMessage.builder() + .key(workerTask.getTaskRun().getExecutionId()) + .body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.SUCCESS))) + .build() + ); + } catch (Exception e) { + workerTask.logger().error("Failed task", e); - this.workerTaskResultQueue.emit(QueueMessage.builder() - .key(workerTask.getTaskRun().getExecutionId()) - .body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.FAILED))) - .build() + this.workerTaskResultQueue.emit(QueueMessage.builder() + .key(workerTask.getTaskRun().getExecutionId()) + .body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.FAILED))) + .build() + ); + } + + workerTask.logger().info( + "[execution: {}] [taskrun: {}] Task {} completed in {}", + workerTask.getTaskRun().getExecutionId(), + workerTask.getTaskRun().getId(), + workerTask.getTask().getClass().getSimpleName(), + workerTask.getTaskRun().getState().humanDuration() ); } - - workerTask.logger().info( - "[execution: {}] [taskrun: {}] Task completed in {}", - workerTask.getTaskRun().getExecutionId(), - workerTask.getTaskRun().getId(), - workerTask.getTaskRun().getState().humanDuration() - ); } } diff --git a/core/src/main/java/org/floworc/core/runners/types/StandAloneRunner.java b/core/src/main/java/org/floworc/core/runners/types/StandAloneRunner.java index 533a5eb178..34b916da96 100644 --- a/core/src/main/java/org/floworc/core/runners/types/StandAloneRunner.java +++ b/core/src/main/java/org/floworc/core/runners/types/StandAloneRunner.java @@ -3,6 +3,7 @@ package org.floworc.core.runners.types; import com.devskiller.friendly_id.FriendlyId; import lombok.extern.slf4j.Slf4j; import org.floworc.core.executions.Execution; +import org.floworc.core.executions.ExecutionService; import org.floworc.core.executions.WorkerTask; import org.floworc.core.flows.Flow; import org.floworc.core.flows.State; @@ -28,12 +29,15 @@ public class StandAloneRunner implements RunnerInterface { private LocalQueue workerTaskResultQueue; private LocalRepository localRepository; private ThreadPoolExecutor poolExecutor; + private ExecutionService executionService; public StandAloneRunner(File basePath) { this.executionQueue = new LocalQueue<>(QueueName.EXECUTIONS); this.workerTaskQueue = new LocalQueue<>(QueueName.WORKERS); this.workerTaskResultQueue = new LocalQueue<>(QueueName.WORKERS_RESULT); + this.localRepository = new LocalRepository(basePath); + this.executionService = new ExecutionService(this.workerTaskResultQueue); } @Override @@ -42,7 +46,8 @@ public class StandAloneRunner implements RunnerInterface { poolExecutor.execute(new Executor( this.executionQueue, this.workerTaskQueue, - this.localRepository + this.localRepository, + this.executionService )); poolExecutor.execute(new ExecutionState( diff --git a/core/src/main/java/org/floworc/core/tasks/FlowableTask.java b/core/src/main/java/org/floworc/core/tasks/FlowableTask.java new file mode 100644 index 0000000000..67adc7ffa3 --- /dev/null +++ b/core/src/main/java/org/floworc/core/tasks/FlowableTask.java @@ -0,0 +1,27 @@ +package org.floworc.core.tasks; + +import org.floworc.core.executions.Execution; + +import java.util.List; +import java.util.Optional; + +public interface FlowableTask { + + default boolean hasChildTasks() { + return false; + } + + /** + * Return list of childs tasks for current execution + * + * @param execution current execution to allow filtering of the task ready to be consumed + * @return list of task ready to be consumed:
    + *
  • {@link Optional#empty()}: no childs tasks or no more tasks available.
  • + *
  • {@link Optional#of(Object)} with empty list: no childs available for now, retry later.
  • + *
  • {@link Optional#of(Object)} with a non empty list: all childs that must be run now.
  • + *
+ */ + default Optional> getChildTasks(Execution execution) { + return Optional.empty(); + } +} diff --git a/core/src/main/java/org/floworc/core/tasks/RunnableTask.java b/core/src/main/java/org/floworc/core/tasks/RunnableTask.java new file mode 100644 index 0000000000..4e62db5229 --- /dev/null +++ b/core/src/main/java/org/floworc/core/tasks/RunnableTask.java @@ -0,0 +1,5 @@ +package org.floworc.core.tasks; + +public interface RunnableTask { + Void run() throws Exception; +} diff --git a/core/src/main/java/org/floworc/core/tasks/Task.java b/core/src/main/java/org/floworc/core/tasks/Task.java index 13d95145c4..0fa871111c 100644 --- a/core/src/main/java/org/floworc/core/tasks/Task.java +++ b/core/src/main/java/org/floworc/core/tasks/Task.java @@ -27,16 +27,10 @@ abstract public class Task { private List errors; - abstract public Void run() throws Exception; - public List toTaskRun(Execution execution) { return Collections.singletonList(TaskRun.of(execution, this)); } - public Optional> getChildTaskRun(Execution execution) { - return Optional.of(new ArrayList<>()); - } - public Optional findById(String id) { if (this.getId().equals(id)) { return Optional.of(this); diff --git a/core/src/main/java/org/floworc/core/tasks/debugs/Echo.java b/core/src/main/java/org/floworc/core/tasks/debugs/Echo.java index c4383addf5..fde3af3c42 100644 --- a/core/src/main/java/org/floworc/core/tasks/debugs/Echo.java +++ b/core/src/main/java/org/floworc/core/tasks/debugs/Echo.java @@ -1,16 +1,15 @@ package org.floworc.core.tasks.debugs; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.ToString; +import lombok.*; import lombok.extern.slf4j.Slf4j; +import org.floworc.core.tasks.RunnableTask; import org.floworc.core.tasks.Task; @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @Data @Slf4j -public class Echo extends Task { +public class Echo extends Task implements RunnableTask { private String format; @Override diff --git a/core/src/main/java/org/floworc/core/tasks/flows/Parallel.java b/core/src/main/java/org/floworc/core/tasks/flows/Parallel.java index 46a40097a1..bea7060597 100644 --- a/core/src/main/java/org/floworc/core/tasks/flows/Parallel.java +++ b/core/src/main/java/org/floworc/core/tasks/flows/Parallel.java @@ -5,22 +5,27 @@ import lombok.EqualsAndHashCode; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.floworc.core.executions.Execution; -import org.floworc.core.executions.ExecutionService; -import org.floworc.core.executions.TaskRun; +import org.floworc.core.tasks.FlowableTask; import org.floworc.core.tasks.Task; +import javax.validation.Valid; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @Data @Slf4j -public class Parallel extends Task { +public class Parallel extends Task implements FlowableTask { private Integer concurrent; + @Valid private List tasks; + @Valid + private List errors; + @Override public Optional findById(String id) { Optional superFind = super.findById(id); @@ -37,14 +42,25 @@ public class Parallel extends Task { } @Override - public Optional> getChildTaskRun(Execution execution) { - return ExecutionService.getNexts(execution, this.tasks); + public boolean hasChildTasks() { + return true; } @Override - public Void run() { - log.info("Starting '{}'", this.tasks); + public Optional> getChildTasks(Execution execution) { + List notFind = this.tasks + .stream() + .filter(task -> execution + .getTaskRunList() + .stream() + .anyMatch(taskRun -> !taskRun.getTaskId().equals(task.getId())) + ) + .collect(Collectors.toList()); - return null; + if (notFind.size() == 0) { + return Optional.empty(); + } + + return Optional.of(notFind); } } diff --git a/core/src/main/java/org/floworc/core/tasks/flows/Switch.java b/core/src/main/java/org/floworc/core/tasks/flows/Switch.java index a71ad0d0b0..e7a914d5d5 100644 --- a/core/src/main/java/org/floworc/core/tasks/flows/Switch.java +++ b/core/src/main/java/org/floworc/core/tasks/flows/Switch.java @@ -1,9 +1,8 @@ package org.floworc.core.tasks.flows; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.ToString; +import lombok.*; import lombok.extern.slf4j.Slf4j; +import org.floworc.core.tasks.FlowableTask; import org.floworc.core.tasks.Task; import java.util.List; @@ -13,15 +12,8 @@ import java.util.Map; @EqualsAndHashCode(callSuper = true) @Data @Slf4j -public class Switch extends Task { +public class Switch extends Task implements FlowableTask { private Map> cases; private List defaults; - - @Override - public Void run() { - log.info("Starting '{}'", this); - - return null; - } } diff --git a/core/src/main/java/org/floworc/core/tasks/scripts/Bash.java b/core/src/main/java/org/floworc/core/tasks/scripts/Bash.java index ff1d812b48..3ec27c53c3 100644 --- a/core/src/main/java/org/floworc/core/tasks/scripts/Bash.java +++ b/core/src/main/java/org/floworc/core/tasks/scripts/Bash.java @@ -1,10 +1,10 @@ package org.floworc.core.tasks.scripts; -import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; import lombok.Value; import lombok.extern.slf4j.Slf4j; +import org.floworc.core.tasks.RunnableTask; import org.floworc.core.tasks.Task; import java.io.BufferedReader; @@ -14,12 +14,11 @@ import java.io.InputStreamReader; import java.util.Arrays; import java.util.List; - @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) @Value @Slf4j -public class Bash extends Task { +public class Bash extends Task implements RunnableTask { private String[] commands; @Override diff --git a/core/src/main/java/org/floworc/core/utils/Debug.java b/core/src/main/java/org/floworc/core/utils/Debug.java new file mode 100644 index 0000000000..da64fe9048 --- /dev/null +++ b/core/src/main/java/org/floworc/core/utils/Debug.java @@ -0,0 +1,49 @@ +package org.floworc.core.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Debug { + private static final String NAME = Thread.currentThread().getStackTrace()[2].getClassName(); + private static final Logger LOGGER = LoggerFactory.getLogger(NAME); + private static ObjectMapper MAPPER = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + + private static String caller() { + return Thread.currentThread().getStackTrace()[3].getClassName() + " -> " + + Thread.currentThread().getStackTrace()[3].getMethodName() + " # " + + Thread.currentThread().getStackTrace()[3].getLineNumber(); + } + + public static String toJson(T arg) { + String output; + + if (arg instanceof String) { + output = (String) arg; + } else if (arg instanceof byte[]) { + output = new String((byte[]) arg); + } else { + try { + output = MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(arg); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + return output; + } + + @SafeVarargs + public static void log(T... args) { + LOGGER.trace("\033[44;30m " + caller() + " \033[0m"); + + for (Object arg : args) { + LOGGER.trace("\033[46;30m " + arg.getClass().getName() + " \033[0m " + toJson(arg)); + } + } +} diff --git a/core/src/test/java/org/floworc/core/runners/StandAloneRunnerTest.java b/core/src/test/java/org/floworc/core/runners/StandAloneRunnerTest.java index 137ac6c3bb..88ba33f17a 100644 --- a/core/src/test/java/org/floworc/core/runners/StandAloneRunnerTest.java +++ b/core/src/test/java/org/floworc/core/runners/StandAloneRunnerTest.java @@ -28,7 +28,7 @@ class StandAloneRunnerTest { Flow flow = Utils.parse("flows/full.yaml"); Execution execution = runner.run(flow); - assertThat(execution.getTaskRunList(), hasSize(4)); + assertThat(execution.getTaskRunList(), hasSize(13)); } } \ No newline at end of file diff --git a/core/src/test/java/org/floworc/core/serializers/YamlFlowParserTest.java b/core/src/test/java/org/floworc/core/serializers/YamlFlowParserTest.java index 0f27f631b2..722704c9ec 100644 --- a/core/src/test/java/org/floworc/core/serializers/YamlFlowParserTest.java +++ b/core/src/test/java/org/floworc/core/serializers/YamlFlowParserTest.java @@ -18,7 +18,7 @@ class YamlFlowParserTest { Flow flow = Utils.parse("flows/full.yaml"); assertThat(flow.getId(), is("full")); - assertThat(flow.getTasks().size(), is(4)); + assertThat(flow.getTasks().size(), is(5)); // third with all optionnals Task optionnals = flow.getTasks().get(2); diff --git a/core/src/test/resources/flows/full.yaml b/core/src/test/resources/flows/full.yaml index 89d13fad2f..d042a57d2f 100644 --- a/core/src/test/resources/flows/full.yaml +++ b/core/src/test/resources/flows/full.yaml @@ -1,5 +1,5 @@ id: full -namespace: com.leroymerlin.data.supply +namespace: org.floworc.tests #triggers: #- type: schedule @@ -34,23 +34,38 @@ tasks: - id: 4th - 1st type: org.floworc.core.tasks.scripts.Bash commands: - - echo "{{id}}" + - echo "4th - 1st {{id}}" - id: 4th - 2nd type: org.floworc.core.tasks.scripts.Bash commands: - - echo "{{id}}" + - echo "4th - 2nd {{id}}" - id: 4th - 3rd type: org.floworc.core.tasks.scripts.Bash commands: - - echo "{{id}}" + - echo "4th- 3rd {{id}}" - id: 4th - 4th - type: org.floworc.core.tasks.scripts.Bash - commands: - - echo "{{id}}" - - id: 4th - 5th - type: org.floworc.core.tasks.scripts.Bash - commands: - - echo "{{id}}" + type: org.floworc.core.tasks.flows.Parallel + tasks: + - id: 4th - 4th - 1st + type: org.floworc.core.tasks.flows.Parallel + tasks: + - id: 4th - 4th - 1st - 1st + type: org.floworc.core.tasks.flows.Parallel + tasks: + - id: 4th - 4th - 1st - 1st - 1st + type: org.floworc.core.tasks.flows.Parallel + tasks: + - id: 4th - 4th - 1st - 1st - 1st - last + type: org.floworc.core.tasks.scripts.Bash + commands: + - echo "4th - 4th - 1st - 1st - 1st - last{{id}}" + + +- id: lastte + type: org.floworc.core.tasks.scripts.Bash + commands: + - 'echo "last"' + - 'sleep 1' #- type: python # commands: diff --git a/core/src/test/resources/logback.xml b/core/src/test/resources/logback.xml index 399cb4045e..43ce5d5118 100644 --- a/core/src/test/resources/logback.xml +++ b/core/src/test/resources/logback.xml @@ -18,6 +18,6 @@ - - + + \ No newline at end of file