feat: ForEachItem as a flowable

Splitting the file can take a lot of time and need access to the internal storage.
That's why it should be done on the Worker.
A clever way to do this is to transform it as a Flowable task that will generates tasks to split the items, generates the execution , then merge the outputs.
This commit is contained in:
Loïc Mathieu
2024-02-22 16:27:45 +01:00
parent bcbe0a3e63
commit 17a2541486
11 changed files with 346 additions and 199 deletions

View File

@@ -20,13 +20,10 @@ import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URI;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Vector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -34,9 +31,9 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@Slf4j
public final class ExecutableUtils {
private static final String TASK_VARIABLE_ITERATIONS = "iterations";
private static final String TASK_VARIABLE_NUMBER_OF_BATCHES = "numberOfBatches";
private static final String TASK_VARIABLE_URI = "uri";
public static final String TASK_VARIABLE_ITERATIONS = "iterations";
public static final String TASK_VARIABLE_NUMBER_OF_BATCHES = "numberOfBatches";
public static final String TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI = "subflowOutputsBaseUri";
private ExecutableUtils() {
// prevent initialization
@@ -159,37 +156,7 @@ public final class ExecutableUtils {
final Map<String, Object> outputs = new HashMap<>();
outputs.put(TASK_VARIABLE_ITERATIONS, iterations);
outputs.put(TASK_VARIABLE_NUMBER_OF_BATCHES, numberOfBatches);
try {
// Build URIs for each sub-flow outputs.
List<URI> outputsURIs = IntStream.rangeClosed(1, terminatedIterations)
.mapToObj(it -> "kestra://" + storage.getContextBaseURI().getPath() + "/" + it + "/outputs.ion")
.map(throwFunction(URI::create))
.filter(storage::isFileExist)
.toList();
if (!outputsURIs.isEmpty()) {
// Merge outputs from each sub-flow into a single stored in the internal storage.
Enumeration<InputStream> streams = outputsURIs.stream()
.map(throwFunction(storage::getFile))
.collect(Collectors.toCollection(Vector::new))
.elements();
try (InputStream is = new SequenceInputStream(streams)) {
outputs.put(TASK_VARIABLE_URI, storage.putFile(is, "outputs.ion"));
}
}
} catch (Exception e) {
log.error("[namespace: {}] [flow: {}] [execution: {}] Failed to collect and merge outputs from each sub-flow with error: {}",
execution.getNamespace(),
execution.getFlowId(),
execution.getId(),
e.getLocalizedMessage(),
e
);
if (transmitFailed) {
state = State.Type.FAILED;
}
}
outputs.put(TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI, storage.getContextBaseURI().getPath());
return previousTaskRun
.withIteration(taskRun.getIteration())

View File

@@ -1,26 +1,28 @@
package io.kestra.core.tasks.flows;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.tasks.*;
import io.kestra.core.runners.*;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.services.StorageService;
import io.kestra.core.storages.StorageSplitInterface;
import io.kestra.core.utils.GraphUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
@@ -30,9 +32,7 @@ import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -42,6 +42,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -114,7 +116,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@Example(
title = """
Execute a subflow for each JSON item fetched from a REST API. The subflow `mysubflow` is called from the parent flow `iterate_over_json` using the `ForEachItem` task; this creates one subflow execution for each JSON object.
Note how we first need to convert the JSON array to JSON-L format using the `JsonWriter` task. This is because the `items` attribute of the `ForEachItem` task expects a file where each line represents a single item. Suitable file types include Amazon ION (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For other formats, you can use the conversion tasks available in the `io.kestra.plugin.serdes` module.
In this example, the subflow `mysubflow` expects a JSON object as input. The `JsonReader` task first reads the JSON array from the REST API and converts it to ION. Then, the `JsonWriter` task converts that ION file to JSON-L format, suitable for the `ForEachItem` task.
@@ -164,14 +166,14 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
rows: 1
namespace: dev
flowId: mysubflow
wait: true
transmitFailed: true
wait: true
transmitFailed: true
inputs:
json: '{{ json(read(taskrun.items)) }}'"""
)
)
}
)
public class ForEachItem extends Task implements ExecutableTask<ForEachItem.Output> {
public class ForEachItem extends Task implements FlowableTask<VoidOutput> {
@NotEmpty
@PluginProperty(dynamic = true)
@Schema(title = "The items to be split into batches and processed. Make sure to set it to Kestra's internal storage URI. This can be either the output from a previous task, formatted as `{{ outputs.task_id.uri }}`, or a FILE type input parameter, like `{{ inputs.myfile }}`. This task is optimized for files where each line represents a single item. Suitable file types include Amazon ION-type files (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For files in other formats such as Excel, CSV, Avro, Parquet, XML, or JSON, it's recommended to first convert them to the ION format. This can be done using the conversion tasks available in the `io.kestra.plugin.serdes` module, which will transform files from their original format to ION.")
@@ -239,135 +241,300 @@ public class ForEachItem extends Task implements ExecutableTask<ForEachItem.Outp
@PluginProperty
private final Boolean inheritLabels = false;
@Valid
private List<Task> errors;
@Override
public List<SubflowExecution<?>> createSubflowExecutions(
RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Flow currentFlow,
Execution currentExecution,
TaskRun currentTaskRun
) throws InternalException {
var renderedUri = runContext.render(this.items);
if (!renderedUri.startsWith("kestra://")) {
var errorMessage = "Unable to split the items from " + renderedUri + ", this is not an internal storage URI!";
runContext.logger().error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.SEQUENTIAL);
try {
List<URI> splits = StorageService.split(runContext, this.batch, URI.create(renderedUri));
GraphUtils.sequential(
subGraph,
this.getTasks(),
this.errors,
taskRun,
execution
);
AtomicInteger currentIteration = new AtomicInteger(1);
return splits.stream()
.<SubflowExecution<?>>map(throwFunction(
split -> {
int iteration = currentIteration.getAndIncrement();
// these are special variable that can be passed to the subflow
Map<String, Object> itemsVariable = Map.of("taskrun",
Map.of("items", split.toString(), "iteration", iteration));
Map<String, Object> inputs = new HashMap<>();
if (this.inputs != null) {
inputs.putAll(runContext.render(this.inputs, itemsVariable));
}
List<Label> labels = new ArrayList<>();
if (this.inheritLabels && currentExecution.getLabels() != null && !currentExecution.getLabels().isEmpty()) {
labels.addAll(currentExecution.getLabels());
}
if (this.labels != null) {
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
}
}
// these are special outputs to be able to compute iteration map of the parent taskrun
var outputs = Output.builder()
.numberOfBatches(splits.size())
// the passed URI may be used by the subflow to write execution outputs.
.uri(URI.create(runContext.getStorageOutputPrefix().toString() + "/" + iteration + "/outputs.ion"))
.build();
return ExecutableUtils.subflowExecution(
runContext,
flowExecutorInterface,
currentExecution,
currentFlow,
this,
currentTaskRun
.withOutputs(outputs.toMap())
.withIteration(iteration),
inputs,
labels
);
}
))
.toList();
} catch (IOException e) {
runContext.logger().error(e.getMessage(), e);
throw new InternalException(e);
}
return subGraph;
}
@Override
public Optional<SubflowExecutionResult> createSubflowExecutionResult(
RunContext runContext,
TaskRun taskRun,
Flow flow,
Execution execution
) {
public List<Task> allChildTasks() {
return Stream
.concat(
this.getTasks() != null ? this.getTasks().stream() : Stream.empty(),
this.errors != null ? this.errors.stream() : Stream.empty()
)
.collect(Collectors.toList());
}
// We only resolve subflow outputs for an execution result when the execution is terminated.
if (taskRun.getState().isTerminated() && flow.getOutputs() != null && waitForExecution()) {
final Map<String, Object> outputs = flow.getOutputs()
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
);
final ForEachItem.Output.OutputBuilder builder = Output
.builder()
.iterations((Map<State.Type, Integer>) taskRun.getOutputs().get("iterations"))
.numberOfBatches((Integer) taskRun.getOutputs().get("numberOfBatches"));
@Override
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveTasks(this.getTasks(), parentTaskRun);
}
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
FileSerde.write(bos, runContext.render(outputs));
URI uri = runContext.storage().putFile(
new ByteArrayInputStream(bos.toByteArray()),
URI.create((String) taskRun.getOutputs().get("uri"))
);
builder.uri(uri);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = this.isAllowFailure() ? State.Type.WARNING : State.Type.FAILED;
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(builder.build().toMap());
@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveSequentialNexts(
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
parentTaskRun
);
}
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
public List<Task> getTasks() {
return List.of(
new ForEachItemSplit(this.getId(), this.items, this.batch),
new ForEachItemExecutable(this.getId(), this.inputs, this.inheritLabels, this.labels, this.wait, this.transmitFailed,
new ExecutableTask.SubflowId(this.namespace, this.flowId, Optional.ofNullable(this.revision))
),
new ForEachItemMergeOutputs(this.getId())
);
}
@SuppressWarnings("unused")
public void setTasks(List<Task> tasks) {
// This setter is needed for the serialization framework, but the list is hardcoded in the getter anyway.
}
@Hidden
@Getter
@NoArgsConstructor
public static class ForEachItemSplit extends Task implements RunnableTask<ForEachItemSplit.Output> {
private String items;
private Batch batch;
private ForEachItemSplit(String parentId, String items, Batch batch) {
this.items = items;
this.batch = batch;
this.id = parentId + "_split";
this.type = ForEachItemSplit.class.getName();
}
@Override
public ForEachItemSplit.Output run(RunContext runContext) throws Exception {
var renderedUri = runContext.render(this.items);
if (!renderedUri.startsWith("kestra://")) {
var errorMessage = "Unable to split the items from " + renderedUri + ", this is not an internal storage URI!";
runContext.logger().error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
List<URI> splits = StorageService.split(runContext, this.batch, URI.create(renderedUri));
String fileContent = splits.stream().map(uri -> uri.toString()).collect(Collectors.joining(System.lineSeparator()));
try (ByteArrayInputStream bis = new ByteArrayInputStream(fileContent.getBytes())){
URI splitsFile = runContext.storage().putFile(bis, "splits.txt");
return Output.builder().splits(splitsFile).build();
}
taskRun = taskRun.withOutputs(builder.build().toMap());
}
// ForEachItem is an iterative task, the terminal state will be computed in the executor while counting on the task run execution list
return Optional.of(ExecutableUtils.subflowExecutionResult(taskRun, execution));
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
private URI splits;
}
}
@Override
public boolean waitForExecution() {
return this.wait;
@Hidden
@Getter
@NoArgsConstructor
public static class ForEachItemExecutable extends Task implements ExecutableTask<Output> {
private Map<String, Object> inputs;
private Boolean inheritLabels;
private Map<String, String> labels;
private Boolean wait;
private Boolean transmitFailed;
private SubflowId subflowId;
private ForEachItemExecutable(String parentId, Map<String, Object> inputs, Boolean inheritLabels, Map<String, String> labels, Boolean wait, Boolean transmitFailed, SubflowId subflowId) {
this.inputs = inputs;
this.inheritLabels = inheritLabels;
this.labels = labels;
this.wait = wait;
this.transmitFailed = transmitFailed;
this.subflowId = subflowId;
this.id = parentId + "_executable";
this.type = ForEachItemExecutable.class.getName();
}
@Override
public List<SubflowExecution<?>> createSubflowExecutions(
RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Flow currentFlow,
Execution currentExecution,
TaskRun currentTaskRun
) throws InternalException {
// get the list of splits from the outputs of the split task
String taskId = this.id.replace("_executable", "_split");
var taskOutput = extractOutput(runContext, taskId);
URI splitsURI = URI.create((String) taskOutput.get("splits"));
try (InputStream is = runContext.storage().getFile(splitsURI)){
String fileContent = new String(is.readAllBytes());
List<URI> splits = fileContent.lines().map(line -> URI.create(line)).toList();
AtomicInteger currentIteration = new AtomicInteger(1);
return splits
.stream()
.<SubflowExecution<?>>map(throwFunction(
split -> {
int iteration = currentIteration.getAndIncrement();
// these are special variable that can be passed to the subflow
Map<String, Object> itemsVariable = Map.of("taskrun",
Map.of("items", split, "iteration", iteration));
Map<String, Object> inputs = new HashMap<>();
if (this.inputs != null) {
inputs.putAll(runContext.render(this.inputs, itemsVariable));
}
List<Label> labels = new ArrayList<>();
if (this.inheritLabels && currentExecution.getLabels() != null && !currentExecution.getLabels().isEmpty()) {
labels.addAll(currentExecution.getLabels());
}
if (this.labels != null) {
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
}
}
// these are special outputs to be able to compute the iteration map of the parent taskrun
var outputs = Output.builder()
.numberOfBatches(splits.size())
// the passed URI may be used by the subflow to write execution outputs.
.uri(URI.create(runContext.getStorageOutputPrefix().toString() + "/" + iteration + "/outputs.ion"))
.build();
return ExecutableUtils.subflowExecution(
runContext,
flowExecutorInterface,
currentExecution,
currentFlow,
this,
currentTaskRun
.withOutputs(outputs.toMap())
.withIteration(iteration),
inputs,
labels
);
}
))
.toList();
} catch (IOException e) {
throw new InternalException(e);
}
}
@Override
public Optional<SubflowExecutionResult> createSubflowExecutionResult(
RunContext runContext,
TaskRun taskRun,
Flow flow,
Execution execution
) {
// We only resolve subflow outputs for an execution result when the execution is terminated.
if (taskRun.getState().isTerminated() && flow.getOutputs() != null && waitForExecution()) {
final Map<String, Object> outputs = flow.getOutputs()
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
);
final ForEachItem.Output.OutputBuilder builder = Output
.builder()
.iterations((Map<State.Type, Integer>) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_ITERATIONS))
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
FileSerde.write(bos, runContext.render(outputs));
URI uri = runContext.storage().putFile(
new ByteArrayInputStream(bos.toByteArray()),
URI.create((String) taskRun.getOutputs().get("uri"))
);
builder.uri(uri);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = this.isAllowFailure() ? State.Type.WARNING : State.Type.FAILED;
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(builder.build().toMap());
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
}
taskRun = taskRun.withOutputs(builder.build().toMap());
}
// ForEachItem is an iterative task, the terminal state will be computed in the executor while counting on the task run execution list
return Optional.of(ExecutableUtils.subflowExecutionResult(taskRun, execution));
}
@Override
public boolean waitForExecution() {
return this.wait;
}
@Override
public SubflowId subflowId() {
return this.subflowId;
}
}
@Override
public SubflowId subflowId() {
return new SubflowId(namespace, flowId, Optional.ofNullable(revision));
@Hidden
@Getter
@NoArgsConstructor
public static class ForEachItemMergeOutputs extends Task implements RunnableTask<ForEachItemMergeOutputs.Output> {
private ForEachItemMergeOutputs(String parentId) {
this.id = parentId + "_merge";
this.type = ForEachItemMergeOutputs.class.getName();
}
@Override
public ForEachItemMergeOutputs.Output run(RunContext runContext) throws Exception {
// get the list of splits from the outputs of the split task
String taskId = this.id.replace("_merge", "_executable");
var taskOutput = extractOutput(runContext, taskId);
Integer iterations = (Integer) taskOutput.get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES);
String subflowOutputsBaseUri = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);
List<URI> outputsURIs = IntStream.rangeClosed(1, iterations)
.mapToObj(it -> "kestra://" + subflowOutputsBaseUri + "/" + it + "/outputs.ion")
.map(throwFunction(URI::create))
.filter(runContext.storage()::isFileExist)
.toList();
if (!outputsURIs.isEmpty()) {
// Merge outputs from each sub-flow into a single stored in the internal storage.
List<InputStream> streams = outputsURIs.stream()
.map(throwFunction(runContext.storage()::getFile))
.toList();
try (InputStream is = new SequenceInputStream(Collections.enumeration(streams))) {
URI uri = runContext.storage().putFile(is, "outputs.ion");
return ForEachItemMergeOutputs.Output.builder().subflowOutputs(uri).build();
}
}
return null;
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
private URI subflowOutputs;
}
}
@SuperBuilder
@ToString
@EqualsAndHashCode
@@ -404,4 +571,17 @@ public class ForEachItem extends Task implements ExecutableTask<ForEachItem.Outp
)
private final URI uri;
}
@SuppressWarnings("unchecked")
private static Map<String, Object> extractOutput(RunContext runContext, String taskId) {
var outputVariables = (Map<String, Map<String, Object>>) runContext.getVariables().get("outputs");
var splitTaskOutput = outputVariables.get(taskId);
if (runContext.getVariables().containsKey("parent")) {
// get the parent taskrun value as the value is in the ForEachItem not in one of its subtasks
var parent = (Map<String, Map<String, Object>>) runContext.getVariables().get("parent");
String value = (String) parent.get("taskrun").get("value");
splitTaskOutput = (Map<String, Object>) splitTaskOutput.get(value);
}
return splitTaskOutput;
}
}

View File

@@ -1,6 +1,5 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.storages.StorageInterface;
@@ -29,7 +28,7 @@ public class TaskWithAllowFailureTest extends AbstractMemoryRunnerTest {
private StorageInterface storageInterface;
@Test
void runnableTask() throws TimeoutException, InternalException {
void runnableTask() throws TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "task-allow-failure-runnable");
assertThat(execution.getState().getCurrent(), is(State.Type.WARNING));
@@ -52,7 +51,7 @@ public class TaskWithAllowFailureTest extends AbstractMemoryRunnerTest {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "task-allow-failure-executable-foreachitem", null, (flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs));
assertThat(execution.getState().getCurrent(), is(State.Type.WARNING));
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getTaskRunList(), hasSize(4));
}
@Test

View File

@@ -76,11 +76,11 @@ public class ForEachItemCaseTest {
assertThat(countDownLatch.await(1, TimeUnit.MINUTES), is(true));
// assert on the main flow execution
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts().get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(4));
assertThat(execution.getTaskRunList().get(2).getAttempts(), hasSize(1));
assertThat(execution.getTaskRunList().get(2).getAttempts().get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Map<String, Object> outputs = execution.getTaskRunList().get(0).getOutputs();
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
assertThat(outputs.get("numberOfBatches"), is(3));
assertThat(outputs.get("iterations"), notNullValue());
Map<String, Integer> iterations = (Map<String, Integer>) outputs.get("iterations");
@@ -91,7 +91,7 @@ public class ForEachItemCaseTest {
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(triggered.get().getFlowId(), is("for-each-item-subflow"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item/executions/.*/tasks/each/.*\\.txt"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item/executions/.*/tasks/each-split/.*\\.txt"));
assertThat(triggered.get().getTaskRunList(), hasSize(1));
}
@@ -116,12 +116,16 @@ public class ForEachItemCaseTest {
(flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs),
Duration.ofSeconds(30));
// assert that not all subflows ran (depending on the speed of execution, there can be some)
// be careful that it's racy.
assertThat(countDownLatch.getCount(), greaterThan(0L));
// assert on the main flow execution
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts().get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(4));
assertThat(execution.getTaskRunList().get(2).getAttempts(), hasSize(1));
assertThat(execution.getTaskRunList().get(2).getAttempts().get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Map<String, Object> outputs = execution.getTaskRunList().get(0).getOutputs();
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
assertThat(outputs.get("numberOfBatches"), is(3));
assertThat(outputs.get("iterations"), notNullValue());
Map<String, Integer> iterations = (Map<String, Integer>) outputs.get("iterations");
@@ -129,17 +133,13 @@ public class ForEachItemCaseTest {
assertThat(iterations.get("RUNNING"), is(0));
assertThat(iterations.get("SUCCESS"), is(3));
// assert that not all subflows ran (depending on the speed of execution, there can be some)
// be careful that it's racy.
assertThat(countDownLatch.getCount(), greaterThan(0L));
// wait for the 3 flows to ends
assertThat("Remaining count was " + countDownLatch.getCount(), countDownLatch.await(1, TimeUnit.MINUTES), is(true));
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(triggered.get().getFlowId(), is("for-each-item-subflow"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item-no-wait/executions/.*/tasks/each/.*\\.txt"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item-no-wait/executions/.*/tasks/each-split/.*\\.txt"));
assertThat(triggered.get().getTaskRunList(), hasSize(1));
}
@@ -165,11 +165,11 @@ public class ForEachItemCaseTest {
assertThat(countDownLatch.await(1, TimeUnit.MINUTES), is(true));
// assert on the main flow execution
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts().get(0).getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList(), hasSize(3));
assertThat(execution.getTaskRunList().get(2).getAttempts(), hasSize(1));
assertThat(execution.getTaskRunList().get(2).getAttempts().get(0).getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
Map<String, Object> outputs = execution.getTaskRunList().get(0).getOutputs();
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
assertThat(outputs.get("numberOfBatches"), is(3));
assertThat(outputs.get("iterations"), notNullValue());
Map<String, Integer> iterations = (Map<String, Integer>) outputs.get("iterations");
@@ -180,7 +180,7 @@ public class ForEachItemCaseTest {
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent(), is(State.Type.FAILED));
assertThat(triggered.get().getFlowId(), is("for-each-item-subflow-failed"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item-failed/executions/.*/tasks/each/.*\\.txt"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item-failed/executions/.*/tasks/each-split/.*\\.txt"));
assertThat(triggered.get().getTaskRunList(), hasSize(1));
}
@@ -206,11 +206,11 @@ public class ForEachItemCaseTest {
assertThat(countDownLatch.await(1, TimeUnit.MINUTES), is(true));
// assert on the main flow execution
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1));
assertThat(execution.getTaskRunList().get(0).getAttempts().get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(5));
assertThat(execution.getTaskRunList().get(2).getAttempts(), hasSize(1));
assertThat(execution.getTaskRunList().get(2).getAttempts().get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Map<String, Object> outputs = execution.getTaskRunList().get(0).getOutputs();
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
assertThat(outputs.get("numberOfBatches"), is(3));
assertThat(outputs.get("iterations"), notNullValue());
@@ -222,12 +222,13 @@ public class ForEachItemCaseTest {
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(triggered.get().getFlowId(), is("for-each-item-outputs-subflow"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item-outputs/executions/.*/tasks/each/.*\\.txt"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item-outputs/executions/.*/tasks/each-split/.*\\.txt"));
assertThat(triggered.get().getTaskRunList(), hasSize(1));
// asserts for subflow merged outputs
assertThat(outputs.get("uri"), notNullValue());
InputStream stream = storageInterface.get(null, URI.create((String) outputs.get("uri")));
Map<String, Object> mergeTaskOutputs = execution.getTaskRunList().get(3).getOutputs();
assertThat(mergeTaskOutputs.get("subflowOutputs"), notNullValue());
InputStream stream = storageInterface.get(null, URI.create((String) mergeTaskOutputs.get("subflowOutputs")));
try (var br = new BufferedReader(new InputStreamReader(stream))) {
// one line per sub-flows

View File

@@ -20,4 +20,4 @@ tasks:
- id: return
type: io.kestra.core.tasks.debugs.Return
format: "{{ outputs.each.uri }}"
format: "{{ outputs.each_merge.subflowOutputs }}"

View File

@@ -633,7 +633,7 @@ public class JdbcExecutor implements ExecutorInterface {
// iterative tasks
Task task = flow.findTaskByTaskId(message.getParentTaskRun().getTaskId());
TaskRun taskRun;
if (task instanceof ForEachItem forEachItem) {
if (task instanceof ForEachItem.ForEachItemExecutable forEachItem) {
RunContext runContext = runContextFactory.of(flow, task, current.getExecution(), message.getParentTaskRun());
taskRun = ExecutableUtils.manageIterations(
runContext.storage(),

View File

@@ -273,7 +273,7 @@ public abstract class JdbcRunnerTest {
forEachItemCaseTest.forEachItem();
}
@Test
@RetryingTest(5)
void forEachItemNoWait() throws URISyntaxException, IOException, InterruptedException, TimeoutException {
forEachItemCaseTest.forEachItemNoWait();
}

View File

@@ -617,7 +617,7 @@ public class MemoryExecutor implements ExecutorInterface {
// iterative tasks
Task task = flow.findTaskByTaskId(subflowExecutionResult.getParentTaskRun().getTaskId());
TaskRun taskRun;
if (task instanceof ForEachItem forEachItem) {
if (task instanceof ForEachItem.ForEachItemExecutable forEachItem) {
RunContext runContext = runContextFactory.of(
flow,
task,

View File

@@ -58,7 +58,7 @@
@follow="forwardEvent('follow', $event)"
:target-execution="execution"
:target-flow="flow"
:show-logs="taskType(currentTaskRun) !== 'io.kestra.core.tasks.flows.ForEachItem'"
:show-logs="taskType(currentTaskRun) !== 'io.kestra.core.tasks.flows.ForEachItem$ForEachItemExecutable' && taskType(currentTaskRun) !== undefined"
/>
</td>
</tr>

View File

@@ -291,7 +291,7 @@
return this.shouldDisplayProgressBar(taskRun) || this.shouldDisplayLogs(taskRun.id)
},
shouldDisplayProgressBar(taskRun) {
return this.taskType(taskRun) === "io.kestra.core.tasks.flows.ForEachItem"
return this.taskType(taskRun) === "io.kestra.core.tasks.flows.ForEachItem$ForEachItemExecutable"
},
shouldDisplayLogs(taskRunId) {
return this.logsWithIndexByAttemptUid[this.attemptUid(taskRunId, this.selectedAttemptNumberByTaskRunId[taskRunId])]

View File

@@ -313,7 +313,7 @@
},
shouldDisplayProgressBar(taskRun) {
return this.showProgressBar &&
this.taskType(taskRun) === "io.kestra.core.tasks.flows.ForEachItem"
this.taskType(taskRun) === "io.kestra.core.tasks.flows.ForEachItem$ForEachItemExecutable"
},
shouldDisplayLogs(taskRun) {
return (this.taskRunId ||