Compare commits

...

1 Commits

Author SHA1 Message Date
Loïc Mathieu
911cd47fe6 feat(flow): EmbeddedFlow task
Adds an EmbeddedFlow that allow to embed subflow tasks into a parent tasks.

Fixes #6518
2025-04-14 17:02:21 +02:00
9 changed files with 404 additions and 31 deletions

View File

@@ -140,28 +140,11 @@ public final class ExecutableUtils {
}
}
String tenantId = currentExecution.getTenantId();
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
String subflowId = runContext.render(currentTask.subflowId().flowId());
Optional<Integer> subflowRevision = currentTask.subflowId().revision();
FlowInterface flow = flowExecutorInterface.findByIdFromTask(
currentExecution.getTenantId(),
subflowNamespace,
subflowId,
subflowRevision,
currentExecution.getTenantId(),
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
FlowInterface flow = getSubflow(tenantId, subflowNamespace, subflowId, subflowRevision, flowExecutorInterface, currentFlow);
List<Label> newLabels = inheritLabels ? new ArrayList<>(filterLabels(currentExecution.getLabels(), flow)) : new ArrayList<>(systemLabels(currentExecution));
if (labels != null) {
@@ -223,6 +206,35 @@ public final class ExecutableUtils {
.toList();
}
public static FlowInterface getSubflow(String tenantId,
String subflowNamespace,
String subflowId,
Optional<Integer> subflowRevision,
FlowExecutorInterface flowExecutorInterface,
FlowInterface currentFlow) {
FlowInterface flow = flowExecutorInterface.findByIdFromTask(
tenantId,
subflowNamespace,
subflowId,
subflowRevision,
tenantId,
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
return flow;
}
private static List<Label> systemLabels(Execution execution) {
return Streams.of(execution.getLabels())
.filter(label -> label.key().startsWith(Label.SYSTEM_PREFIX))

View File

@@ -24,6 +24,7 @@ import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.MapUtils;
import io.kestra.plugin.core.flow.EmbeddedSubflow;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.core.annotation.Nullable;
import jakarta.annotation.PostConstruct;
@@ -381,11 +382,16 @@ public class PluginDefaultService {
.build();
if (tenant != null) {
// This is a hack to set the tenant in template tasks.
// When using the Template task, we need the tenant to fetch the Template from the database.
// This is a hack to set the tenant in Template and EmbeddedSubflow tasks.
// When using the Template or EmbeddedSubflow task, we need the tenant to fetch the them from the database.
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
// So we save it at flow creation/updating time.
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
full.allTasksWithChilds().stream()
.filter(task -> task instanceof Template)
.forEach(task -> ((Template) task).setTenantId(tenant));
full.allTasksWithChilds().stream()
.filter(task -> task instanceof EmbeddedSubflow)
.forEach(task -> ((EmbeddedSubflow) task).setTenantId(tenant));
}
return full;

View File

@@ -19,6 +19,7 @@ import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.services.ConditionService;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.condition.*;
import io.kestra.plugin.core.flow.ChildFlowInterface;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -161,11 +162,9 @@ public class FlowTopologyService {
return parent
.allTasksWithChilds()
.stream()
.filter(t -> t instanceof ExecutableTask)
.map(t -> (ExecutableTask<?>) t)
.anyMatch(t ->
t.subflowId() != null && t.subflowId().namespace().equals(child.getNamespace()) && t.subflowId().flowId().equals(child.getId())
);
.filter(t -> t instanceof ChildFlowInterface)
.map(t -> (ChildFlowInterface) t)
.anyMatch(t -> Objects.equals(t.getFlowId(), child.getId()) && Objects.equals(t.getNamespace(), child.getNamespace()));
} catch (Exception e) {
log.warn("Failed to detect flow task on namespace:'{}', flowId:'{}'", parent.getNamespace(), parent.getId(), e);
return false;

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.FlowValidation;
import io.kestra.plugin.core.flow.ChildFlowInterface;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.NonNull;
@@ -69,9 +70,9 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
}
value.allTasksWithChilds()
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
&& value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
.stream().filter(task -> task instanceof ChildFlowInterface childFlow
&& value.getId().equals(childFlow.getFlowId())
&& value.getNamespace().equals(childFlow.getNamespace()))
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
// input unique name

View File

@@ -0,0 +1,304 @@
package io.kestra.plugin.core.flow;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.hierarchies.AbstractGraph;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.*;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Embeds subflow tasks into this flow."
)
@Plugin(
examples = {
@Example(
title = "Embeds subflow tasks.",
full = true,
code = """
id: parent_flow
namespace: company.team
tasks:
- id: embed_subflow
type: io.kestra.plugin.core.flow.EmbeddedSubflow
namespace: company.team
flowId: subflow
"""
)
}
)
public class EmbeddedSubflow extends Task implements FlowableTask<EmbeddedSubflow.Output>, ChildFlowInterface {
static final String PLUGIN_FLOW_OUTPUTS_ENABLED = "outputs.enabled";
@Hidden
@Setter // we have no other option here as we need to update the task inside the flow when creating it
private String tenantId;;
@NotEmpty
@Schema(
title = "The namespace of the subflow to be embedded."
)
@PluginProperty
private String namespace;
@NotNull
@Schema(
title = "The identifier of the subflow to be embedded."
)
@PluginProperty
private String flowId;
@Schema(
title = "The revision of the subflow to be embedded.",
description = "By default, the last, i.e. the most recent, revision of the subflow is embedded."
)
@PluginProperty
@Min(value = 1)
private Integer revision;
@Schema(
title = "The inputs to pass to the subflow to be embedded."
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs;
@Override
@JsonIgnore
public List<Task> getErrors() {
Optional<Flow> maybeSubflow = fetchSubflow();
if (maybeSubflow.isEmpty()) {
return Collections.emptyList();
}
Flow subflow = maybeSubflow.get();
return subflow.getErrors();
}
@Override
@JsonIgnore
public List<Task> getFinally() {
Optional<Flow> maybeSubflow = fetchSubflow();
if (maybeSubflow.isEmpty()) {
return Collections.emptyList();
}
Flow subflow = maybeSubflow.get();
return subflow.getFinally();
}
@Override
public AbstractGraph tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
Optional<Flow> maybeSubflow = fetchSubflow();
if (maybeSubflow.isEmpty()) {
return null;
}
Flow subflow = maybeSubflow.get();
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.SEQUENTIAL);
GraphUtils.sequential(
subGraph,
subflow.getTasks(),
subflow.getErrors(),
subflow.getFinally(),
taskRun,
execution
);
return subGraph;
}
@Override
public List<Task> allChildTasks() {
Optional<Flow> maybeSubflow = fetchSubflow();
if (maybeSubflow.isEmpty()) {
return Collections.emptyList();
}
Flow subflow = maybeSubflow.get();
return Stream
.concat(
subflow.getTasks() != null ? subflow.getTasks().stream() : Stream.empty(),
subflow.getErrors() != null ? subflow.getErrors().stream() : Stream.empty()
)
.toList();
}
@Override
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
Flow subflow = fetchSubflow(runContext);
return FlowableUtils.resolveTasks(subflow.getTasks(), parentTaskRun);
}
@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveSequentialNexts(
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun
);
}
@Override
public Output outputs(RunContext runContext) throws Exception {
final Output.OutputBuilder builder = Output.builder();
Flow subflow = fetchSubflow(runContext);
final Optional<Map<String, Object>> subflowOutputs = Optional
.ofNullable(subflow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
);
if (subflowOutputs.isPresent() && runContext.getVariables().get("outputs") != null) {
Map<String, Object> outputs = runContext.render(subflowOutputs.get());
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (subflow.getOutputs() != null && flowInputOutput != null) {
// to be able to use FILE Input, we need the execution info, so we create a fake execution with what's needed here
RunContext.FlowInfo flowInfo = runContext.flowInfo();
String executionId = (String) ((Map<String, Object>) runContext.getVariables().get("execution")).get("id");
Execution fake = Execution.builder()
.id(executionId)
.tenantId(flowInfo.tenantId())
.namespace(flowInfo.namespace())
.flowId(flowInfo.id())
.build();
outputs = flowInputOutput.typedOutputs(subflow, fake, outputs);
}
builder.outputs(outputs);
}
return builder.build();
}
// This method should only be used when getSubflow(RunContext) cannot be used.
private Optional<Flow> fetchSubflow() {
// at validation time, namespace and flowId may not yet be set, in this case let's return an optional to not fail
if (namespace == null || flowId == null) {
return Optional.empty();
}
ApplicationContext applicationContext = ContextHelper.context();
FlowExecutorInterface flowExecutor = applicationContext.getBean(FlowExecutorInterface.class);
FlowInterface subflow = flowExecutor.findById(tenantId, namespace, flowId, Optional.ofNullable(revision)).orElseThrow(() -> new IllegalArgumentException("Unable to find flow " + namespace + "." + flowId));
if (subflow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
if (subflow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
PluginDefaultService pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
try {
return Optional.of(pluginDefaultService.injectAllDefaults(subflow, true));
} catch (FlowProcessingException e) {
throw new RuntimeException(e);
}
}
// This method is preferred as getSubflow() as it checks current flow and subflow and allowed namespaces
private Flow fetchSubflow(RunContext runContext) {
// we check that the task tenant is the current tenant to avoid accessing flows from another tenant
if (!Objects.equals(tenantId, runContext.flowInfo().tenantId())) {
throw new IllegalArgumentException("Cannot embeds a flow from a different tenant");
}
ApplicationContext applicationContext = ContextHelper.context();
FlowExecutorInterface flowExecutor = applicationContext.getBean(FlowExecutorInterface.class);
RunContext.FlowInfo flowInfo = runContext.flowInfo();
FlowInterface flow = flowExecutor.findById(flowInfo.tenantId(), flowInfo.namespace(), flowInfo.id(), Optional.of(flowInfo.revision()))
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow " + flowInfo.namespace() + "." + flowInfo.id()));
FlowInterface subflow = ExecutableUtils.getSubflow(tenantId, namespace, flowId, Optional.ofNullable(revision), flowExecutor, flow);
// check inputs
if (!ListUtils.isEmpty(subflow.getInputs())) {
Optional<Input<?>> missing = subflow.getInputs().stream()
.filter(input -> input.getRequired() && !inputs.containsKey(input.getId()))
.findFirst();
if (missing.isPresent()) {
throw new IllegalArgumentException("Missing required input " + missing.get().getId());
}
}
PluginDefaultService pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
return pluginDefaultService.injectAllDefaults(subflow, runContext.logger());
}
/**
* Ugly hack to provide the ApplicationContext on {{@link #allChildTasks }} &amp; {{@link #tasksTree }}
* We need to inject a way to fetch embedded subflows ...
*/
@Singleton
static class ContextHelper {
@Inject
private ApplicationContext applicationContext;
private static ApplicationContext context;
static ApplicationContext context() {
return ContextHelper.context;
}
@EventListener
void onStartup(final StartupEvent event) {
ContextHelper.context = this.applicationContext;
}
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The extracted outputs from the embedded subflow."
)
private final Map<String, Object> outputs;
}
}

View File

@@ -0,0 +1,35 @@
package io.kestra.plugin.core.flow;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@KestraTest(startRunner = true)
class EmbeddedSubflowTest {
@Test
@LoadFlows("flows/valids/minimal.yaml")
@ExecuteFlow("flows/valids/embedded-flow.yaml")
void shouldEmbedTasks(Execution execution) throws Exception {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("embeddedFlow"), notNullValue());
assertThat(execution.findTaskRunsByTaskId("date"), notNullValue());
}
@Test
@LoadFlows({"flows/valids/minimal.yaml", "flows/valids/embedded-flow.yaml"})
@ExecuteFlow("flows/valids/embedded-parent.yaml")
void shouldEmbedTasksRecursively(Execution execution) throws Exception {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(3));
assertThat(execution.findTaskRunsByTaskId("embeddedParent"), notNullValue());
assertThat(execution.findTaskRunsByTaskId("embeddedFlow"), notNullValue());
assertThat(execution.findTaskRunsByTaskId("date"), notNullValue());
}
}

View File

@@ -0,0 +1,8 @@
id: embedded-flow
namespace: io.kestra.tests
tasks:
- id: embeddedFlow
type: io.kestra.plugin.core.flow.EmbeddedSubflow
namespace: io.kestra.tests
flowId: minimal

View File

@@ -0,0 +1,8 @@
id: embedded-parent
namespace: io.kestra.tests
tasks:
- id: embeddedParent
type: io.kestra.plugin.core.flow.EmbeddedSubflow
namespace: io.kestra.tests
flowId: embedded-flow

View File

@@ -105,7 +105,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
Flow deserialize = MAPPER.convertValue(map, Flow.class);
// raise exception for invalid flow, ex: Templates disabled
deserialize.allTasksWithChilds();
deserialize.allTasks().forEach((task) -> {});
return deserialize;
} catch (DeserializationException | IOException | IllegalArgumentException | FlowProcessingException e) {