feat(core): ForEachItem task (#2359)

close kestra-io/kestra#2131
This commit is contained in:
Loïc Mathieu
2023-11-03 09:33:04 +01:00
committed by GitHub
parent 382a5df797
commit a570cc675b
25 changed files with 850 additions and 206 deletions

View File

@@ -41,6 +41,7 @@ public class TaskRun {
String parentTaskRunId;
@With
String value;
@With
@@ -52,6 +53,9 @@ public class TaskRun {
@NotNull
State state;
@With
String items;
public void destroyOutputs() {
// DANGER ZONE: this method is only used to deals with issues with messages too big that must be stripped down
// to avoid crashing the platform. Don't use it for anything else.
@@ -71,7 +75,8 @@ public class TaskRun {
this.value,
this.attempts,
this.outputs,
this.state.withState(state)
this.state.withState(state),
this.items
);
}
@@ -88,6 +93,7 @@ public class TaskRun {
.attempts(this.getAttempts())
.outputs(this.getOutputs())
.state(state == null ? this.getState() : state)
.items(this.getItems())
.build();
}
@@ -128,7 +134,7 @@ public class TaskRun {
public TaskRun onRunningResend() {
TaskRunBuilder taskRunBuilder = this.toBuilder();
if (taskRunBuilder.attempts == null || taskRunBuilder.attempts.size() == 0) {
if (taskRunBuilder.attempts == null || taskRunBuilder.attempts.isEmpty()) {
taskRunBuilder.attempts = new ArrayList<>();
taskRunBuilder.attempts.add(TaskRunAttempt.builder()
@@ -172,7 +178,7 @@ public class TaskRun {
", parentTaskRunId=" + this.getParentTaskRunId() +
", state=" + this.getState().getCurrent().toString() +
", outputs=" + this.getOutputs() +
", attemps=" + this.getAttempts() +
", attempts=" + this.getAttempts() +
")";
}

View File

@@ -1,19 +1,19 @@
package io.kestra.core.models.hierarchies;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.tasks.flows.Flow;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import lombok.Getter;
import java.util.List;
@Getter
public class SubflowGraphTask extends AbstractGraphTask {
public SubflowGraphTask(Flow task, TaskRun taskRun, List<String> values, RelationType relationType) {
super(task, taskRun, values, relationType);
public SubflowGraphTask(ExecutableTask task, TaskRun taskRun, List<String> values, RelationType relationType) {
super((Task) task, taskRun, values, relationType);
}
@Override
public Flow getTask() {
return (Flow) super.getTask();
public ExecutableTask getExecutableTask() {
return (ExecutableTask) super.getTask();
}
}

View File

@@ -0,0 +1,57 @@
package io.kestra.core.models.tasks;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.core.runners.WorkerTaskResult;
import java.util.List;
import java.util.Optional;
/**
* Interface for tasks that generates subflow execution(s). Those tasks are handled in the Executor.
*/
public interface ExecutableTask {
/**
* Creates a list of WorkerTaskExecution for this task definition.
* Each WorkerTaskExecution will generate a subflow execution.
*/
List<WorkerTaskExecution<?>> createWorkerTaskExecutions(RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Flow currentFlow, Execution currentExecution,
TaskRun currentTaskRun) throws InternalException;
/**
* Creates a WorkerTaskResult for a given WorkerTaskExecution
*/
Optional<WorkerTaskResult> createWorkerTaskResult(RunContext runContext,
WorkerTaskExecution<?> workerTaskExecution,
Flow flow,
Execution execution);
/**
* Whether to wait for the execution(s) of the subflow before terminating this tasks
*/
boolean waitForExecution();
/**
* @return the subflow identifier, used by the flow topology and related dependency code.
*/
SubflowId subflowId();
record SubflowId(String namespace, String flowId, Optional<Integer> revision) {
public String flowUid() {
// as the Flow task can only be used in the same tenant we can hardcode null here
return Flow.uid(null, this.namespace, this.flowId, this.revision);
}
public String flowUidWithoutRevision() {
// as the Flow task can only be used in the same tenant we can hardcode null here
return Flow.uidWithoutRevision(null, this.namespace, this.flowId);
}
}
}

View File

@@ -14,6 +14,9 @@ import io.kestra.core.runners.RunContext;
import java.util.List;
import java.util.Optional;
/**
* Interface for tasks that orchestrate other tasks. Those tasks are handled by the Executor.
*/
public interface FlowableTask <T extends Output> {
@Schema(
title = "List of tasks to run if any tasks failed on this FlowableTask"
@@ -21,14 +24,37 @@ public interface FlowableTask <T extends Output> {
@PluginProperty
List<Task> getErrors();
/**
* Create the topology representation of a flowable task.
* <p>
* A flowable task always contains subtask to it returns a cluster that displays the subtasks.
*/
GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException;
/**
* @return all child tasks including errors
*/
List<Task> allChildTasks();
/**
* Resolve child tasks of a flowable task.
* <p>
* For a normal flowable, it should be the list of its tasks, for an iterative flowable (such as EachSequential, ForEachItem, ...),
* it should be the list of its tasks for all iterations.
*/
List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException;
/**
* Resolve next tasks to run for an execution.
* <p>
* For a normal flowable, it should be <b>the</b> subsequent task, for a parallel flowable (such as Parallel, ForEachItem, ...),
* it should be a list of the next subsequent tasks of the size of the concurrency of the task.
*/
List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException;
/**
* Resolve the state of a flowable task.
*/
default Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveState(
execution,

View File

@@ -2,6 +2,12 @@ package io.kestra.core.models.tasks;
import io.kestra.core.runners.RunContext;
/**
* Interface for tasks that are run in the Worker.
*/
public interface RunnableTask <T extends Output> {
/**
* Thsis method is called inside the Worker to run (execute) the task.
*/
T run(RunContext runContext) throws Exception;
}

View File

@@ -0,0 +1,105 @@
package io.kestra.core.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public final class ExecutableUtils {
private ExecutableUtils() {
// prevent initialization
}
public static State.Type guessState(Execution execution, boolean transmitFailed) {
if (transmitFailed &&
(execution.getState().isFailed() || execution.getState().isPaused() || execution.getState().getCurrent() == State.Type.KILLED || execution.getState().getCurrent() == State.Type.WARNING)
) {
return execution.getState().getCurrent();
} else {
return State.Type.SUCCESS;
}
}
public static WorkerTaskResult workerTaskResult(TaskRun taskRun) {
return WorkerTaskResult.builder()
.taskRun(taskRun.withAttempts(
Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(taskRun.getState().getCurrent())).build())
))
.build();
}
public static <T extends Task & ExecutableTask> WorkerTaskExecution<?> workerTaskExecution(
RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Execution currentExecution,
Flow currentFlow,
T currentTask,
TaskRun currentTaskRun,
Map<String, Object> inputs,
List<Label> labels
) throws IllegalVariableEvaluationException {
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
String subflowId = runContext.render(currentTask.subflowId().flowId());
Optional<Integer> subflowRevision = currentTask.subflowId().revision();
io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
currentExecution.getTenantId(),
subflowNamespace,
subflowId,
subflowRevision,
currentExecution.getTenantId(),
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision + '" + subflowRevision.orElse(0) + "'"));
if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
Map<String, Object> variables = ImmutableMap.of(
"executionId", currentExecution.getId(),
"namespace", currentFlow.getNamespace(),
"flowId", currentFlow.getId(),
"flowRevision", currentFlow.getRevision()
);
RunnerUtils runnerUtils = runContext.getApplicationContext().getBean(RunnerUtils.class);
Execution execution = runnerUtils
.newExecution(
flow,
(f, e) -> runnerUtils.typedInputs(f, e, inputs),
labels)
.withTrigger(ExecutionTrigger.builder()
.id(currentTask.getId())
.type(currentTask.getType())
.variables(variables)
.build()
);
return WorkerTaskExecution.builder()
.task(currentTask)
.taskRun(currentTaskRun)
.execution(execution)
.build();
}
}

View File

@@ -91,7 +91,7 @@ public class Executor {
return this;
}
public Executor withWorkerTaskExecutions(List<WorkerTaskExecution> newExecutions, String from) {
public Executor withWorkerTaskExecutions(List<WorkerTaskExecution<?>> newExecutions, String from) {
this.workerTaskExecutions.addAll(newExecutions);
this.from.add(from);

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.ResolvedTask;
@@ -87,7 +88,7 @@ public class ExecutorService {
executor = this.handleChildWorkerTaskResult(executor);
// search for flow task
executor = this.handleFlowTask(executor);
executor = this.handleExecutableTask(executor);
} catch (Exception e) {
return executor.withException(e, "process");
}
@@ -314,20 +315,20 @@ public class ExecutorService {
return executor.withExecution(newExecution, "onEnd");
}
private Executor handleNext(Executor Executor) {
private Executor handleNext(Executor executor) {
List<NextTaskRun> nextTaskRuns = FlowableUtils
.resolveSequentialNexts(
Executor.getExecution(),
ResolvedTask.of(Executor.getFlow().getTasks()),
ResolvedTask.of(Executor.getFlow().getErrors())
executor.getExecution(),
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors())
);
if (nextTaskRuns.size() == 0) {
return Executor;
if (nextTaskRuns.isEmpty()) {
return executor;
}
return Executor.withTaskRun(
this.saveFlowableOutput(nextTaskRuns, Executor, null),
return executor.withTaskRun(
this.saveFlowableOutput(nextTaskRuns, executor, null),
"handleNext"
);
}
@@ -542,72 +543,76 @@ public class ExecutorService {
.filter(taskRun -> taskRun.getState().getCurrent().isCreated())
.map(throwFunction(taskRun -> {
Task task = executor.getFlow().findTaskByTaskId(taskRun.getTaskId());
RunContext runContext = runContextFactory.of(executor.getFlow(), task, executor.getExecution(), taskRun);
return WorkerTask.builder()
.runContext(runContextFactory.of(executor.getFlow(), task, executor.getExecution(), taskRun))
.runContext(runContext)
.taskRun(taskRun)
.task(task)
.build();
}))
.collect(Collectors.toList());
if (workerTasks.size() == 0) {
if (workerTasks.isEmpty()) {
return executor;
}
return executor.withWorkerTasks(workerTasks, "handleWorkerTask");
}
private Executor handleFlowTask(final Executor executor) {
List<WorkerTaskExecution> executions = new ArrayList<>();
private Executor handleExecutableTask(final Executor executor) {
List<WorkerTaskExecution<?>> executions = new ArrayList<>();
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
boolean haveFlows = executor.getWorkerTasks()
.removeIf(workerTask -> {
if (!(workerTask.getTask() instanceof io.kestra.core.tasks.flows.Flow)) {
if (!(workerTask.getTask() instanceof ExecutableTask)) {
return false;
}
io.kestra.core.tasks.flows.Flow flowTask = (io.kestra.core.tasks.flows.Flow) workerTask.getTask();
RunContext runContext = runContextFactory.of(
executor.getFlow(),
flowTask,
executor.getExecution(),
workerTask.getTaskRun()
);
var executableTask = (Task & ExecutableTask) workerTask.getTask();
try {
// mark taskrun as running to avoid multiple try for failed
TaskRun taskRunByTaskRunId = executor.getExecution()
TaskRun executableTaskRun = executor.getExecution()
.findTaskRunByTaskRunId(workerTask.getTaskRun().getId());
executor.withExecution(
executor
.getExecution()
.withTaskRun(taskRunByTaskRunId.withState(State.Type.RUNNING)),
"handleFlowTaskRunning"
.withTaskRun(executableTaskRun.withState(State.Type.RUNNING)),
"handleExecutableTaskRunning"
);
// create the execution
Execution execution = flowTask.createExecution(runContext, flowExecutorInterface(), executor.getExecution());
WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder()
.task(flowTask)
.taskRun(workerTask.getTaskRun())
.execution(execution)
.build();
executions.add(workerTaskExecution);
if (!flowTask.getWait()) {
workerTaskResults.add(flowTask.createWorkerTaskResult(
null,
workerTaskExecution,
null,
execution
));
RunContext runContext = runContextFactory.of(
executor.getFlow(),
executableTask,
executor.getExecution(),
executableTaskRun
);
List<WorkerTaskExecution<?>> workerTaskExecutions = executableTask.createWorkerTaskExecutions(runContext, flowExecutorInterface(), executor.getFlow(), executor.getExecution(), executableTaskRun);
if (workerTaskExecutions.isEmpty()) {
// if no executions we move the task to SUCCESS immediately
executor.withExecution(
executor
.getExecution()
.withTaskRun(executableTaskRun.withState(State.Type.SUCCESS)),
"handleExecutableTaskRunning.noExecution"
);
}
} catch (Exception e) {
else {
executions.addAll(workerTaskExecutions);
if (!executableTask.waitForExecution()) {
for (WorkerTaskExecution<?> workerTaskExecution : workerTaskExecutions) {
Optional<WorkerTaskResult> workerTaskResult = executableTask.createWorkerTaskResult(
runContext,
workerTaskExecution,
executor.getFlow(),
workerTaskExecution.getExecution()
);
workerTaskResult.ifPresent(result -> workerTaskResults.add(result));
}
}
}
}
catch (Exception e) {
workerTaskResults.add(WorkerTaskResult.builder()
.taskRun(workerTask.getTaskRun().withState(State.Type.FAILED)
.withAttempts(Collections.singletonList(
@@ -616,9 +621,8 @@ public class ExecutorService {
)
.build()
);
executor.withException(e, "handleFlowTask");
executor.withException(e, "handleExecutableTask");
}
return true;
});
@@ -626,10 +630,10 @@ public class ExecutorService {
return executor;
}
Executor resultExecutor = executor.withWorkerTaskExecutions(executions, "handleFlowTask");
Executor resultExecutor = executor.withWorkerTaskExecutions(executions, "handleExecutableTask");
if (workerTaskResults.size() > 0) {
resultExecutor = executor.withWorkerTaskResults(workerTaskResults, "handleFlowTaskWorkerTaskResults");
if (!workerTaskResults.isEmpty()) {
resultExecutor = executor.withWorkerTaskResults(workerTaskResults, "handleExecutableTaskWorkerTaskResults");
}
return resultExecutor;

View File

@@ -325,6 +325,10 @@ public class RunContext {
builder.put("value", taskRun.getValue());
}
if(taskRun.getItems() != null) {
builder.put("items", taskRun.getItems());
}
return builder.build();
}

View File

@@ -2,7 +2,8 @@ package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.tasks.flows.Flow;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import lombok.Builder;
import lombok.Data;
@@ -10,12 +11,12 @@ import javax.validation.constraints.NotNull;
@Data
@Builder
public class WorkerTaskExecution {
public class WorkerTaskExecution<T extends Task & ExecutableTask> {
@NotNull
private TaskRun taskRun;
@NotNull
private Flow task;
private T task;
@NotNull
private Execution execution;

View File

@@ -52,15 +52,15 @@ public class GraphService {
subflowToReplaceByParent.map(Rethrow.throwFunction(parentWithSubflowGraphTask -> {
SubflowGraphTask subflowGraphTask = parentWithSubflowGraphTask.getValue();
Flow subflow = flowByUid.computeIfAbsent(
subflowGraphTask.getTask().flowUid(),
subflowGraphTask.getExecutableTask().subflowId().flowUid(),
uid -> flowRepository.findById(
tenantId,
subflowGraphTask.getTask().getNamespace(),
subflowGraphTask.getTask().getFlowId(),
Optional.ofNullable(subflowGraphTask.getTask().getRevision())
subflowGraphTask.getExecutableTask().subflowId().namespace(),
subflowGraphTask.getExecutableTask().subflowId().flowId(),
subflowGraphTask.getExecutableTask().subflowId().revision()
).orElseThrow(() -> new NoSuchElementException(
"Unable to find subflow " +
(subflowGraphTask.getTask().getRevision() == null ? subflowGraphTask.getTask().flowUidWithoutRevision() : subflowGraphTask.getTask().flowUid())
(subflowGraphTask.getExecutableTask().subflowId().revision().isEmpty() ? subflowGraphTask.getExecutableTask().subflowId().flowUidWithoutRevision() : subflowGraphTask.getExecutableTask().subflowId().flowUid())
+ " for task " + subflowGraphTask.getTask().getId()
))
);

View File

@@ -1,24 +1,21 @@
package io.kestra.core.tasks.flows;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.*;
@@ -46,44 +43,43 @@ import java.util.*;
)
}
)
public class Flow extends Task implements RunnableTask<Flow.Output> {
public class Flow extends Task implements ExecutableTask {
@NotNull
@Schema(
title = "The namespace of the flow that should be executed as a subflow"
title = "The namespace of the subflow to be executed"
)
@PluginProperty(dynamic = true)
private String namespace;
@NotNull
@Schema(
title = "The identifier of the flow that should be executed as a subflow"
title = "The identifier of the subflow to be executed"
)
@PluginProperty(dynamic = true)
private String flowId;
@Schema(
title = "The revision of the flow that should be executed as a subflow",
description = "By default, the last i.e. the most recent revision of the flow is triggered."
title = "The revision of the subflow to be executed",
description = "By default, the last, i.e. the most recent, revision of the subflow is executed."
)
@PluginProperty(dynamic = true)
private Integer revision;
@Schema(
title = "The inputs to pass to the subflow"
title = "The inputs to pass to the subflow to be executed"
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs;
@Schema(
title = "The labels to pass to the subflow execution"
title = "The labels to pass to the subflow to be executed"
)
@PluginProperty(dynamic = true)
private Map<String, String> labels;
@Builder.Default
@Schema(
title = "Whether to wait for the subflow execution to finish before continuing the current execution",
description = "By default, the subflow will be executed in a fire-and-forget manner without waiting for the subflow execution to finish. If you set this option to `true`, the current execution will wait for the subflow execution to finish before continuing with the next task."
title = "Whether to wait for the subflow execution to finish before continuing the current execution."
)
@PluginProperty
private final Boolean wait = false;
@@ -91,7 +87,7 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
@Builder.Default
@Schema(
title = "Whether to fail the current execution if the subflow execution fails or is killed",
description = "Note that this option only has an effect if `wait` is set to `true`."
description = "Note that this option works only if `wait` is set to `true`."
)
@PluginProperty
private final Boolean transmitFailed = false;
@@ -106,31 +102,18 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
@Schema(
title = "Outputs from the subflow executions.",
description = "Allows to specify key-value pairs (with the value rendered) in order to extract any outputs from the " +
"subflow execution."
description = "Allows to specify outputs as key-value pairs to extract any outputs from the subflow execution into output of this task execution."
)
@PluginProperty(dynamic = true)
private Map<String, Object> outputs;
@Override
public Flow.Output run(RunContext runContext) throws Exception {
throw new IllegalStateException("This task should not be executed by a worker and must run on executor side.");
}
public String flowUid() {
// as the Flow task can only be used in the same tenant we can hardcode null here
return io.kestra.core.models.flows.Flow.uid(null, this.getNamespace(), this.getFlowId(), Optional.ofNullable(this.revision));
}
public String flowUidWithoutRevision() {
// as the Flow task can only be used in the same tenant we can hardcode null here
return io.kestra.core.models.flows.Flow.uidWithoutRevision(null, this.getNamespace(), this.getFlowId());
}
@SuppressWarnings("unchecked")
public Execution createExecution(RunContext runContext, FlowExecutorInterface flowExecutorInterface, Execution currentExecution) throws Exception {
RunnerUtils runnerUtils = runContext.getApplicationContext().getBean(RunnerUtils.class);
@Override
public List<WorkerTaskExecution<?>> createWorkerTaskExecutions(RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
io.kestra.core.models.flows.Flow currentFlow,
Execution currentExecution,
TaskRun currentTaskRun) throws InternalException {
Map<String, Object> inputs = new HashMap<>();
if (this.inputs != null) {
inputs.putAll(runContext.render(this.inputs));
@@ -146,70 +129,33 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
}
}
Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");
String namespace = runContext.render(this.namespace);
String flowId = runContext.render(this.flowId);
Optional<Integer> revision = this.revision != null ? Optional.of(this.revision) : Optional.empty();
io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
flowVars.get("tenantId"),
namespace,
flowId,
revision,
flowVars.get("tenantId"),
flowVars.get("namespace"),
flowVars.get("id")
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "'.'" + flowId + "' with revision + '" + revision + "'"));
if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
return runnerUtils
.newExecution(
flow,
(f, e) -> runnerUtils.typedInputs(f, e, inputs),
labels)
.withTrigger(ExecutionTrigger.builder()
.id(this.getId())
.type(this.getType())
.variables(ImmutableMap.of(
"executionId", ((Map<String, Object>) runContext.getVariables().get("execution")).get("id"),
"namespace", flowVars.get("namespace"),
"flowId", flowVars.get("id"),
"flowRevision", flowVars.get("revision")
))
.build()
);
return List.of(ExecutableUtils.workerTaskExecution(
runContext,
flowExecutorInterface,
currentExecution,
currentFlow,
this,
currentTaskRun,
inputs,
labels
));
}
public WorkerTaskResult createWorkerTaskResult(
@Nullable RunContextFactory runContextFactory,
WorkerTaskExecution workerTaskExecution,
@Nullable io.kestra.core.models.flows.Flow flow,
@Override
public Optional<WorkerTaskResult> createWorkerTaskResult(
RunContext runContext,
WorkerTaskExecution<?> workerTaskExecution,
io.kestra.core.models.flows.Flow flow,
Execution execution
) {
TaskRun taskRun = workerTaskExecution.getTaskRun();
Output.OutputBuilder builder = Output.builder()
.executionId(execution.getId());
if (workerTaskExecution.getTask().getOutputs() != null && runContextFactory != null) {
RunContext runContext = runContextFactory.of(
flow,
workerTaskExecution.getTask(),
execution,
workerTaskExecution.getTaskRun()
);
.executionId(execution.getId())
.state(execution.getState().getCurrent());
if (this.getOutputs() != null) {
try {
builder.outputs(runContext.render(workerTaskExecution.getTask().getOutputs()));
builder.outputs(runContext.render(this.getOutputs()));
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '" + e.getMessage() + "'", e);
taskRun = taskRun
@@ -217,29 +163,27 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build()))
.withOutputs(builder.build().toMap());
return WorkerTaskResult.builder()
return Optional.of(WorkerTaskResult.builder()
.taskRun(taskRun)
.build();
.build());
}
}
builder.state(execution.getState().getCurrent());
taskRun = taskRun.withOutputs(builder.build().toMap());
if (transmitFailed &&
(execution.getState().isFailed() || execution.getState().isPaused() || execution.getState().getCurrent() == State.Type.KILLED || execution.getState().getCurrent() == State.Type.WARNING)
) {
taskRun = taskRun.withState(execution.getState().getCurrent());
} else {
taskRun = taskRun.withState(State.Type.SUCCESS);
}
taskRun = taskRun.withState(ExecutableUtils.guessState(execution, this.transmitFailed));
return WorkerTaskResult.builder()
.taskRun(taskRun.withAttempts(
Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(taskRun.getState().getCurrent())).build())
))
.build();
return Optional.of(ExecutableUtils.workerTaskResult(taskRun));
}
@Override
public boolean waitForExecution() {
return this.wait;
}
@Override
public SubflowId subflowId() {
return new SubflowId(this.namespace, this.flowId, Optional.ofNullable(this.revision));
}
@Builder

View File

@@ -0,0 +1,273 @@
package io.kestra.core.tasks.flows;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.core.runners.WorkerTaskResult;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import static io.kestra.core.utils.Rethrow.throwFunction;
@SuperBuilder(toBuilder = true)
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Execute a subflow for each batch of items",
description = "Execute a subflow for each batch of items. The `items` value must be internal storage URI e.g. an output file from a previous task, or a file from inputs of FILE type."
)
@Plugin(
examples = {
@Example(
title = "Execute a subflow for each batch of items",
code = {
"""
id: each
type: io.kestra.core.tasks.flows.ForEachItem
items: "{{ outputs.extract.uri }}" # works with API payloads too. Kestra can detect if this output is not a file,\s
# and will make it to a file, split into (batches of) items
maxItemsPerBatch: 10
subflow:
flowId: file
namespace: dev
inputs:
file: "{{ taskrun.items }}" # special variable that contains the items of the batch
wait: true # wait for the subflow execution
transmitFailed: true # fail the task run if the subflow execution fails"""
}
)
}
)
public class ForEachItem extends Task implements ExecutableTask {
@NotEmpty
@PluginProperty(dynamic = true)
@Schema(title = "The items to be split into batches and processed. Make sure to set it to Kestra's internal storage URI, e.g. output from a previous task in the format `{{ outputs.task_id.uri }}` or an input parameter of FILE type e.g. `{{ inputs.myfile }}`.")
private String items;
@Positive
@NotNull
@PluginProperty
@Builder.Default
@Schema(title = "Maximum number of items per batch")
private Integer maxItemsPerBatch = 10;
@NotNull
@PluginProperty
@Schema(title = "The subflow that will be executed for each batch of items")
private ForEachItem.Subflow subflow;
@Override
public List<WorkerTaskExecution<?>> createWorkerTaskExecutions(RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Flow currentFlow,
Execution currentExecution,
TaskRun currentTaskRun) throws InternalException {
List<URI> splits = readSplits(runContext);
AtomicInteger currentIteration = new AtomicInteger(1);
return splits.stream()
.<WorkerTaskExecution<?>>map(throwFunction(
split -> {
Map<String, Object> intemsVariable = Map.of("taskrun", Map.of("items", split.toString()));
Map<String, Object> inputs = new HashMap<>();
if (this.subflow.inputs != null) {
inputs.putAll(runContext.render(this.subflow.inputs, intemsVariable));
}
List<Label> labels = new ArrayList<>();
if (this.subflow.inheritLabels) {
labels.addAll(currentExecution.getLabels());
}
if (this.subflow.labels != null) {
for (Map.Entry<String, String> entry: this.subflow.labels.entrySet()) {
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
}
}
int interation = currentIteration.getAndIncrement();
return ExecutableUtils.workerTaskExecution(
runContext,
flowExecutorInterface,
currentExecution,
currentFlow,
this,
currentTaskRun
.withValue(String.valueOf(interation))
.withOutputs(ImmutableMap.of(
"currentIteration", interation,
"maxIterations", splits.size()
))
.withItems(split.toString()),
inputs,
labels
);
}
))
.toList();
}
@Override
public Optional<WorkerTaskResult> createWorkerTaskResult(
RunContext runContext,
WorkerTaskExecution<?> workerTaskExecution,
Flow flow,
Execution execution
) {
TaskRun taskRun = workerTaskExecution.getTaskRun();
taskRun = taskRun.withState(ExecutableUtils.guessState(execution, this.subflow.transmitFailed));
int currentIteration = (Integer) taskRun.getOutputs().get("currentIteration");
int maxIterations = (Integer) taskRun.getOutputs().get("maxIterations");
return currentIteration == maxIterations ? Optional.of(ExecutableUtils.workerTaskResult(taskRun)) : Optional.empty();
}
@Override
public boolean waitForExecution() {
return this.subflow.wait;
}
@Override
public SubflowId subflowId() {
return new SubflowId(subflow.namespace, subflow.flowId, Optional.ofNullable(subflow.revision));
}
private List<URI> readSplits(RunContext runContext) throws IllegalVariableEvaluationException {
URI data = URI.create(runContext.render(this.items));
try (var reader = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(data)))) {
int batches = 0;
int lineNb = 0;
String row;
List<String> rows = new ArrayList<>(maxItemsPerBatch);
List<URI> uris = new ArrayList<>();
while ((row = reader.readLine()) != null) {
rows.add(row);
lineNb++;
if (lineNb == maxItemsPerBatch) {
uris.add(createBatchFile(runContext, rows, batches));
batches++;
lineNb = 0;
rows.clear();
}
}
if (!rows.isEmpty()) {
uris.add(createBatchFile(runContext, rows, batches));
}
return uris;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private URI createBatchFile(RunContext runContext, List<String> rows, int batch) throws IOException {
byte[] bytes = rows.stream().collect(Collectors.joining(System.lineSeparator())).getBytes();
File batchFile = runContext.tempFile(bytes, ".ion").toFile();
return runContext.putTempFile(batchFile, "batch-" + (batch + 1) + ".ion");
}
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public static class Subflow {
@NotEmpty
@Schema(
title = "The namespace of the subflow to be executed"
)
@PluginProperty(dynamic = true)
private String namespace;
@NotEmpty
@Schema(
title = "The identifier of the subflow to be executed"
)
@PluginProperty(dynamic = true)
private String flowId;
@Schema(
title = "The revision of the subflow to be executed",
description = "By default, the last, i.e. the most recent, revision of the subflow is executed."
)
@PluginProperty
private Integer revision;
@Schema(
title = "The inputs to pass to the subflow to be executed"
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs;
@Schema(
title = "The labels to pass to the subflow to be executed"
)
@PluginProperty(dynamic = true)
private Map<String, String> labels;
@Builder.Default
@Schema(
title = "Whether to wait for the subflows execution to finish before continuing the current execution."
)
@PluginProperty
private final Boolean wait = true;
@Builder.Default
@Schema(
title = "Whether to fail the current execution if the subflow execution fails or is killed",
description = "Note that this option works only if `wait` is set to `true`."
)
@PluginProperty
private final Boolean transmitFailed = true;
@Builder.Default
@Schema(
title = "Whether the subflow should inherit labels from this execution that triggered it.",
description = "By default, labels are not passed to the subflow execution. If you set this option to `true`, the child flow execution will inherit all labels from the parent execution."
)
@PluginProperty
private final Boolean inheritLabels = false;
}
}

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.conditions.types.*;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.hierarchies.Graph;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.topologies.FlowNode;
import io.kestra.core.models.topologies.FlowRelation;
import io.kestra.core.models.topologies.FlowTopology;
@@ -161,10 +162,10 @@ public class FlowTopologyService {
return parent
.allTasksWithChilds()
.stream()
.filter(t -> t instanceof io.kestra.core.tasks.flows.Flow)
.map(t -> (io.kestra.core.tasks.flows.Flow) t)
.filter(t -> t instanceof ExecutableTask)
.map(t -> (ExecutableTask) t)
.anyMatch(t ->
t.getNamespace().equals(child.getNamespace()) && t.getFlowId().equals(child.getId())
t.subflowId().namespace().equals(child.getNamespace()) && t.subflowId().flowId().equals(child.getId())
);
} catch (Exception e) {
log.warn("Failed to detect flow task on namespace:'" + parent.getNamespace() + "', flowId:'" + parent.getId() + "'", e);

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.hierarchies.*;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
@@ -270,7 +271,7 @@ public class GraphUtils {
// detect kids
if (currentTask instanceof FlowableTask<?> flowableTask) {
currentGraph = flowableTask.tasksTree(execution, currentTaskRun, parentValues);
} else if (currentTask instanceof io.kestra.core.tasks.flows.Flow subflowTask) {
} else if (currentTask instanceof ExecutableTask subflowTask) {
currentGraph = new SubflowGraphTask(subflowTask, currentTaskRun, parentValues, relationType);
} else {
currentGraph = new GraphTask(currentTask, currentTaskRun, parentValues, relationType);

View File

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.conditions.types.MultipleCondition;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.WorkerGroup;
@@ -216,10 +217,10 @@ public class ValidationFactory {
.stream()
.forEach(
task -> {
if (task instanceof io.kestra.core.tasks.flows.Flow taskFlow
&& value.getId().equals(taskFlow.getFlowId())
&& value.getNamespace().equals(taskFlow.getNamespace())) {
violations.add("Recursive call to flow [" + value.getId() + "]");
if (task instanceof ExecutableTask executableTask
&& value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace())) {
violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]");
}
}
);

View File

@@ -236,7 +236,7 @@ class FlowGraphTest extends AbstractMemoryRunnerTest {
assertThat(flowGraph.getEdges().size(), is(20));
assertThat(flowGraph.getClusters().size(), is(3));
assertThat(((SubflowGraphTask) ((SubflowGraphCluster) cluster(flowGraph, "root\\.launch").getCluster()).getTaskNode()).getTask().getFlowId(), is("switch"));
assertThat(((SubflowGraphTask) ((SubflowGraphCluster) cluster(flowGraph, "root\\.launch").getCluster()).getTaskNode()).getExecutableTask().subflowId().flowId(), is("switch"));
SubflowGraphTask subflowGraphTask = (SubflowGraphTask) nodeByUid(flowGraph, "root.launch");
assertThat(subflowGraphTask.getTask(), instanceOf(io.kestra.core.tasks.flows.Flow.class));
assertThat(subflowGraphTask.getRelationType(), is(RelationType.SEQUENTIAL));

View File

@@ -0,0 +1,125 @@
package io.kestra.core.tasks.flows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.storages.StorageInterface;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@Singleton
public class ForEachItemCaseTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
private StorageInterface storageInterface;
@Inject
protected RunnerUtils runnerUtils;
public void forEachItem() throws TimeoutException, InterruptedException, URISyntaxException, IOException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<Execution> triggered = new AtomicReference<>();
executionQueue.receive(either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("for-each-item-subflow") && execution.getState().getCurrent().isTerminated()) {
countDownLatch.countDown();
triggered.set(execution);
}
});
URI file = storageUpload(10);
Map<String, Object> inputs = Map.of("file", file.toString());
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "for-each-item", null, (flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs));
// we should have triggered 3 subflows
assertThat(countDownLatch.await(1, TimeUnit.MINUTES), is(true));
// assert on the main flow execution
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(triggered.get().getFlowId(), is("for-each-item-subflow"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item/executions/.*/tasks/.*/.*/batch-.*\\.ion"));
assertThat(triggered.get().getTaskRunList(), hasSize(1));
}
public void forEachItemNoWait() throws TimeoutException, InterruptedException, URISyntaxException, IOException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<Execution> triggered = new AtomicReference<>();
executionQueue.receive(either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("for-each-item-subflow") && execution.getState().getCurrent().isTerminated()) {
countDownLatch.countDown();
triggered.set(execution);
}
});
URI file = storageUpload(10);
Map<String, Object> inputs = Map.of("file", file.toString());
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "for-each-item-no-wait", null, (flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs));
// assert on the main flow execution
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
// assert that not all subflows ran (depending on the speed of execution there can be some)
// be careful that it's racy.
assertThat(countDownLatch.getCount(), greaterThan(0L));
// wait for the 3 flows to ends
assertThat(countDownLatch.await(1, TimeUnit.MINUTES), is(true));
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(triggered.get().getFlowId(), is("for-each-item-subflow"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item-no-wait/executions/.*/tasks/.*/.*/batch-.*\\.ion"));
assertThat(triggered.get().getTaskRunList(), hasSize(1));
}
private URI storageUpload(int count) throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), content(count));
return storageInterface.put(
null,
new URI("/file/storage/file.txt"),
new FileInputStream(tempFile)
);
}
private List<String> content(int count) {
return IntStream
.range(0, count)
.mapToObj(value -> StringUtils.leftPad(value + "", 20))
.collect(Collectors.toList());
}
}

View File

@@ -30,7 +30,7 @@ class FlowValidationTest {
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent(), is(true));
assertThat(validate.get().getMessage(), containsString(": Invalid Flow: Recursive call to flow [recursive-flow]"));
assertThat(validate.get().getMessage(), containsString(": Invalid Flow: Recursive call to flow [io.kestra.tests.recursive-flow]"));
}
private Flow parse(String path) {

View File

@@ -0,0 +1,19 @@
id: for-each-item-no-wait
namespace: io.kestra.tests
inputs:
- name: file
type: FILE
tasks:
- id: each
type: io.kestra.core.tasks.flows.ForEachItem
items: "{{ inputs.file }}"
maxItemsPerBatch: 4
subflow:
namespace: io.kestra.tests
flowId: for-each-item-subflow
wait: false
transmitFailed: true
inputs:
items: "{{ taskrun.items }}"

View File

@@ -0,0 +1,11 @@
id: for-each-item-subflow
namespace: io.kestra.tests
inputs:
- name: items
type: STRING
tasks:
- id: per-item
type: io.kestra.core.tasks.log.Log
message: "{{ inputs.items }}"

View File

@@ -0,0 +1,19 @@
id: for-each-item
namespace: io.kestra.tests
inputs:
- name: file
type: FILE
tasks:
- id: each
type: io.kestra.core.tasks.flows.ForEachItem
items: "{{ inputs.file }}"
maxItemsPerBatch: 4
subflow:
namespace: io.kestra.tests
flowId: for-each-item-subflow
wait: true
transmitFailed: true
inputs:
items: "{{ taskrun.items }}"

View File

@@ -9,6 +9,8 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueFactoryInterface;
@@ -317,7 +319,7 @@ public class JdbcExecutor implements ExecutorInterface {
.filter(workerTask -> workerTask.getTask().isSendToWorkerTask())
.forEach(workerTask -> workerTaskQueue.emit(workerGroupService.resolveGroupFromJob(workerTask), workerTask));
// WorkerTask not flowable to workerTaskResult as Running
// WorkerTask flowable to workerTaskResult as Running
workerTasksDedup
.stream()
.filter(workerTask -> workerTask.getTask().isFlowable())
@@ -353,7 +355,7 @@ public class JdbcExecutor implements ExecutorInterface {
workerTaskExecution.getExecution()
.getNamespace() + "'.'" + workerTaskExecution.getExecution().getFlowId() +
"' with id '" + workerTaskExecution.getExecution()
.getId() + "' from task '" + workerTaskExecution.getTask().getId() +
.getId() + "' from task '" + ((Task) workerTaskExecution.getTask()).getId() +
"' and taskrun '" + workerTaskExecution.getTaskRun().getId() +
(workerTaskExecution.getTaskRun()
.getValue() != null ? " (" + workerTaskExecution.getTaskRun()
@@ -387,14 +389,26 @@ public class JdbcExecutor implements ExecutorInterface {
workerTaskExecutionStorage.get(execution.getId())
.ifPresent(workerTaskExecution -> {
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
if (workerTaskExecution.getTask().getWait()) {
if (((ExecutableTask)workerTaskExecution.getTask()).waitForExecution()) {
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
WorkerTaskResult workerTaskResult = workerTaskExecution
.getTask()
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
ExecutableTask executableTask = (ExecutableTask) workerTaskExecution.getTask();
this.workerTaskResultQueue.emit(workerTaskResult);
RunContext runContext = runContextFactory.of(
workerTaskFlow,
workerTaskExecution.getTask(),
execution,
workerTaskExecution.getTaskRun()
);
try {
Optional<WorkerTaskResult> maybeWorkerTaskResult = executableTask
.createWorkerTaskResult(runContext, workerTaskExecution, workerTaskFlow, execution);
maybeWorkerTaskResult.ifPresent(workerTaskResult -> this.workerTaskResultQueue.emit(workerTaskResult));
} catch (Exception e) {
// TODO maybe create a FAILED Worker Task Result instead
log.error("Unable to create the Worker Task Result", e);
}
}
workerTaskExecutionStorage.delete(workerTaskExecution);
@@ -620,7 +634,8 @@ public class JdbcExecutor implements ExecutorInterface {
}
private boolean deduplicateWorkerTaskExecution(Execution execution, ExecutorState executorState, TaskRun taskRun) {
String deduplicationKey = taskRun.getId();
// There can be multiple executions for the same task, so we need to deduplicated with the taskrun.value
String deduplicationKey = taskRun.getId() + "-" + taskRun.getValue();
State.Type current = executorState.getWorkerTaskExecutionDeduplication().get(deduplicationKey);
if (current == taskRun.getState().getCurrent()) {

View File

@@ -12,6 +12,7 @@ import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.*;
import io.kestra.core.tasks.flows.EachSequentialTest;
import io.kestra.core.tasks.flows.FlowCaseTest;
import io.kestra.core.tasks.flows.ForEachItemCaseTest;
import io.kestra.core.tasks.flows.PauseTest;
import io.kestra.core.tasks.flows.WorkingDirectoryTest;
import io.kestra.core.utils.TestsUtils;
@@ -77,6 +78,9 @@ public abstract class JdbcRunnerTest {
@Inject
private SkipExecutionCaseTest skipExecutionCaseTest;
@Inject
private ForEachItemCaseTest forEachItemCaseTest;
@BeforeAll
void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
@@ -262,4 +266,14 @@ public abstract class JdbcRunnerTest {
void skipExecution() throws TimeoutException, InterruptedException {
skipExecutionCaseTest.skipExecution();
}
@Test
void forEachItem() throws URISyntaxException, IOException, InterruptedException, TimeoutException {
forEachItemCaseTest.forEachItem();
}
@Test
void forEachItemNoWait() throws URISyntaxException, IOException, InterruptedException, TimeoutException {
forEachItemCaseTest.forEachItemNoWait();
}
}

View File

@@ -8,6 +8,8 @@ import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
@@ -39,7 +41,7 @@ import java.util.stream.Collectors;
@Slf4j
public class MemoryExecutor implements ExecutorInterface {
private static final ConcurrentHashMap<String, ExecutionState> EXECUTIONS = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, WorkerTaskExecution> WORKERTASKEXECUTIONS_WATCHER = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, WorkerTaskExecution<?>> WORKERTASKEXECUTIONS_WATCHER = new ConcurrentHashMap<>();
private List<Flow> allFlows;
private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor();
@@ -65,9 +67,6 @@ public class MemoryExecutor implements ExecutorInterface {
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
@Inject
private FlowService flowService;
@Inject
private TaskDefaultService taskDefaultService;
@@ -123,7 +122,7 @@ public class MemoryExecutor implements ExecutorInterface {
return;
}
if (message.getTaskRunList() == null || message.getTaskRunList().size() == 0 || message.getState().isCreated()) {
if (message.getTaskRunList() == null || message.getTaskRunList().isEmpty() || message.getState().isCreated()) {
this.handleExecution(saveExecution(message));
}
}
@@ -228,7 +227,7 @@ public class MemoryExecutor implements ExecutorInterface {
}
if (executor.getWorkerTaskExecutions().size() > 0) {
if (!executor.getWorkerTaskExecutions().isEmpty()) {
executor.getWorkerTaskExecutions()
.forEach(workerTaskExecution -> {
WORKERTASKEXECUTIONS_WATCHER.put(workerTaskExecution.getExecution().getId(), workerTaskExecution);
@@ -250,17 +249,30 @@ public class MemoryExecutor implements ExecutorInterface {
// worker task execution
if (conditionService.isTerminatedWithListeners(flow, execution) && WORKERTASKEXECUTIONS_WATCHER.containsKey(execution.getId())) {
WorkerTaskExecution workerTaskExecution = WORKERTASKEXECUTIONS_WATCHER.get(execution.getId());
WorkerTaskExecution<?> workerTaskExecution = WORKERTASKEXECUTIONS_WATCHER.get(execution.getId());
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
if (workerTaskExecution.getTask().getWait()) {
if (workerTaskExecution.getTask().waitForExecution()) {
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);
WorkerTaskResult workerTaskResult = workerTaskExecution
.getTask()
.createWorkerTaskResult(runContextFactory, workerTaskExecution, workerTaskFlow, execution);
ExecutableTask executableTask = workerTaskExecution.getTask();
this.workerTaskResultQueue.emit(workerTaskResult);
RunContext runContext = runContextFactory.of(
workerTaskFlow,
workerTaskExecution.getTask(),
execution,
workerTaskExecution.getTaskRun()
);
try {
Optional<WorkerTaskResult> maybeWorkerTaskResult = executableTask
.createWorkerTaskResult(runContext, workerTaskExecution, workerTaskFlow, execution);
maybeWorkerTaskResult.ifPresent(workerTaskResult -> this.workerTaskResultQueue.emit(workerTaskResult));
} catch (Exception e) {
// TODO maybe create a FAILED Worker Task Result instead<>
log.error("Unable to create the Worker Task Result", e);
}
}
WORKERTASKEXECUTIONS_WATCHER.remove(execution.getId());