mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
1 Commits
docs/retur
...
feat/embed
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
911cd47fe6 |
@@ -140,28 +140,11 @@ public final class ExecutableUtils {
|
||||
}
|
||||
}
|
||||
|
||||
String tenantId = currentExecution.getTenantId();
|
||||
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
|
||||
String subflowId = runContext.render(currentTask.subflowId().flowId());
|
||||
Optional<Integer> subflowRevision = currentTask.subflowId().revision();
|
||||
|
||||
FlowInterface flow = flowExecutorInterface.findByIdFromTask(
|
||||
currentExecution.getTenantId(),
|
||||
subflowNamespace,
|
||||
subflowId,
|
||||
subflowRevision,
|
||||
currentExecution.getTenantId(),
|
||||
currentFlow.getNamespace(),
|
||||
currentFlow.getId()
|
||||
)
|
||||
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
|
||||
|
||||
if (flow.isDisabled()) {
|
||||
throw new IllegalStateException("Cannot execute a flow which is disabled");
|
||||
}
|
||||
|
||||
if (flow instanceof FlowWithException fwe) {
|
||||
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
|
||||
}
|
||||
FlowInterface flow = getSubflow(tenantId, subflowNamespace, subflowId, subflowRevision, flowExecutorInterface, currentFlow);
|
||||
|
||||
List<Label> newLabels = inheritLabels ? new ArrayList<>(filterLabels(currentExecution.getLabels(), flow)) : new ArrayList<>(systemLabels(currentExecution));
|
||||
if (labels != null) {
|
||||
@@ -223,6 +206,35 @@ public final class ExecutableUtils {
|
||||
.toList();
|
||||
}
|
||||
|
||||
public static FlowInterface getSubflow(String tenantId,
|
||||
String subflowNamespace,
|
||||
String subflowId,
|
||||
Optional<Integer> subflowRevision,
|
||||
FlowExecutorInterface flowExecutorInterface,
|
||||
FlowInterface currentFlow) {
|
||||
|
||||
FlowInterface flow = flowExecutorInterface.findByIdFromTask(
|
||||
tenantId,
|
||||
subflowNamespace,
|
||||
subflowId,
|
||||
subflowRevision,
|
||||
tenantId,
|
||||
currentFlow.getNamespace(),
|
||||
currentFlow.getId()
|
||||
)
|
||||
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
|
||||
|
||||
if (flow.isDisabled()) {
|
||||
throw new IllegalStateException("Cannot execute a flow which is disabled");
|
||||
}
|
||||
|
||||
if (flow instanceof FlowWithException fwe) {
|
||||
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
|
||||
}
|
||||
|
||||
return flow;
|
||||
}
|
||||
|
||||
private static List<Label> systemLabels(Execution execution) {
|
||||
return Streams.of(execution.getLabels())
|
||||
.filter(label -> label.key().startsWith(Label.SYSTEM_PREFIX))
|
||||
|
||||
@@ -24,6 +24,7 @@ import io.kestra.core.runners.RunContextLogger;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.kestra.plugin.core.flow.EmbeddedSubflow;
|
||||
import io.kestra.plugin.core.flow.Template;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
@@ -381,11 +382,16 @@ public class PluginDefaultService {
|
||||
.build();
|
||||
|
||||
if (tenant != null) {
|
||||
// This is a hack to set the tenant in template tasks.
|
||||
// When using the Template task, we need the tenant to fetch the Template from the database.
|
||||
// This is a hack to set the tenant in Template and EmbeddedSubflow tasks.
|
||||
// When using the Template or EmbeddedSubflow task, we need the tenant to fetch the them from the database.
|
||||
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
|
||||
// So we save it at flow creation/updating time.
|
||||
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
|
||||
full.allTasksWithChilds().stream()
|
||||
.filter(task -> task instanceof Template)
|
||||
.forEach(task -> ((Template) task).setTenantId(tenant));
|
||||
full.allTasksWithChilds().stream()
|
||||
.filter(task -> task instanceof EmbeddedSubflow)
|
||||
.forEach(task -> ((EmbeddedSubflow) task).setTenantId(tenant));
|
||||
}
|
||||
|
||||
return full;
|
||||
|
||||
@@ -19,6 +19,7 @@ import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
|
||||
import io.kestra.core.services.ConditionService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.plugin.core.condition.*;
|
||||
import io.kestra.plugin.core.flow.ChildFlowInterface;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
@@ -161,11 +162,9 @@ public class FlowTopologyService {
|
||||
return parent
|
||||
.allTasksWithChilds()
|
||||
.stream()
|
||||
.filter(t -> t instanceof ExecutableTask)
|
||||
.map(t -> (ExecutableTask<?>) t)
|
||||
.anyMatch(t ->
|
||||
t.subflowId() != null && t.subflowId().namespace().equals(child.getNamespace()) && t.subflowId().flowId().equals(child.getId())
|
||||
);
|
||||
.filter(t -> t instanceof ChildFlowInterface)
|
||||
.map(t -> (ChildFlowInterface) t)
|
||||
.anyMatch(t -> Objects.equals(t.getFlowId(), child.getId()) && Objects.equals(t.getNamespace(), child.getNamespace()));
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to detect flow task on namespace:'{}', flowId:'{}'", parent.getNamespace(), parent.getId(), e);
|
||||
return false;
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.validations.FlowValidation;
|
||||
import io.kestra.plugin.core.flow.ChildFlowInterface;
|
||||
import io.micronaut.core.annotation.AnnotationValue;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.micronaut.core.annotation.NonNull;
|
||||
@@ -69,9 +70,9 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
}
|
||||
|
||||
value.allTasksWithChilds()
|
||||
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
&& value.getId().equals(executableTask.subflowId().flowId())
|
||||
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
|
||||
.stream().filter(task -> task instanceof ChildFlowInterface childFlow
|
||||
&& value.getId().equals(childFlow.getFlowId())
|
||||
&& value.getNamespace().equals(childFlow.getNamespace()))
|
||||
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
|
||||
|
||||
// input unique name
|
||||
|
||||
@@ -0,0 +1,304 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.NextTaskRun;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.hierarchies.AbstractGraph;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.StartupEvent;
|
||||
import io.micronaut.runtime.event.annotation.EventListener;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Embeds subflow tasks into this flow."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Embeds subflow tasks.",
|
||||
full = true,
|
||||
code = """
|
||||
id: parent_flow
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: embed_subflow
|
||||
type: io.kestra.plugin.core.flow.EmbeddedSubflow
|
||||
namespace: company.team
|
||||
flowId: subflow
|
||||
"""
|
||||
)
|
||||
}
|
||||
)
|
||||
public class EmbeddedSubflow extends Task implements FlowableTask<EmbeddedSubflow.Output>, ChildFlowInterface {
|
||||
static final String PLUGIN_FLOW_OUTPUTS_ENABLED = "outputs.enabled";
|
||||
|
||||
@Hidden
|
||||
@Setter // we have no other option here as we need to update the task inside the flow when creating it
|
||||
private String tenantId;;
|
||||
|
||||
@NotEmpty
|
||||
@Schema(
|
||||
title = "The namespace of the subflow to be embedded."
|
||||
)
|
||||
@PluginProperty
|
||||
private String namespace;
|
||||
|
||||
@NotNull
|
||||
@Schema(
|
||||
title = "The identifier of the subflow to be embedded."
|
||||
)
|
||||
@PluginProperty
|
||||
private String flowId;
|
||||
|
||||
@Schema(
|
||||
title = "The revision of the subflow to be embedded.",
|
||||
description = "By default, the last, i.e. the most recent, revision of the subflow is embedded."
|
||||
)
|
||||
@PluginProperty
|
||||
@Min(value = 1)
|
||||
private Integer revision;
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the subflow to be embedded."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private Map<String, Object> inputs;
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public List<Task> getErrors() {
|
||||
Optional<Flow> maybeSubflow = fetchSubflow();
|
||||
if (maybeSubflow.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Flow subflow = maybeSubflow.get();
|
||||
|
||||
return subflow.getErrors();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public List<Task> getFinally() {
|
||||
Optional<Flow> maybeSubflow = fetchSubflow();
|
||||
if (maybeSubflow.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Flow subflow = maybeSubflow.get();
|
||||
|
||||
return subflow.getFinally();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractGraph tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
|
||||
Optional<Flow> maybeSubflow = fetchSubflow();
|
||||
if (maybeSubflow.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Flow subflow = maybeSubflow.get();
|
||||
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.SEQUENTIAL);
|
||||
|
||||
GraphUtils.sequential(
|
||||
subGraph,
|
||||
subflow.getTasks(),
|
||||
subflow.getErrors(),
|
||||
subflow.getFinally(),
|
||||
taskRun,
|
||||
execution
|
||||
);
|
||||
|
||||
return subGraph;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Task> allChildTasks() {
|
||||
Optional<Flow> maybeSubflow = fetchSubflow();
|
||||
if (maybeSubflow.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Flow subflow = maybeSubflow.get();
|
||||
return Stream
|
||||
.concat(
|
||||
subflow.getTasks() != null ? subflow.getTasks().stream() : Stream.empty(),
|
||||
subflow.getErrors() != null ? subflow.getErrors().stream() : Stream.empty()
|
||||
)
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
Flow subflow = fetchSubflow(runContext);
|
||||
|
||||
return FlowableUtils.resolveTasks(subflow.getTasks(), parentTaskRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
return FlowableUtils.resolveSequentialNexts(
|
||||
execution,
|
||||
this.childTasks(runContext, parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
|
||||
parentTaskRun
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Output outputs(RunContext runContext) throws Exception {
|
||||
final Output.OutputBuilder builder = Output.builder();
|
||||
Flow subflow = fetchSubflow(runContext);
|
||||
|
||||
final Optional<Map<String, Object>> subflowOutputs = Optional
|
||||
.ofNullable(subflow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
);
|
||||
|
||||
if (subflowOutputs.isPresent() && runContext.getVariables().get("outputs") != null) {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs.get());
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (subflow.getOutputs() != null && flowInputOutput != null) {
|
||||
// to be able to use FILE Input, we need the execution info, so we create a fake execution with what's needed here
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
String executionId = (String) ((Map<String, Object>) runContext.getVariables().get("execution")).get("id");
|
||||
Execution fake = Execution.builder()
|
||||
.id(executionId)
|
||||
.tenantId(flowInfo.tenantId())
|
||||
.namespace(flowInfo.namespace())
|
||||
.flowId(flowInfo.id())
|
||||
.build();
|
||||
outputs = flowInputOutput.typedOutputs(subflow, fake, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
// This method should only be used when getSubflow(RunContext) cannot be used.
|
||||
private Optional<Flow> fetchSubflow() {
|
||||
// at validation time, namespace and flowId may not yet be set, in this case let's return an optional to not fail
|
||||
if (namespace == null || flowId == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
ApplicationContext applicationContext = ContextHelper.context();
|
||||
FlowExecutorInterface flowExecutor = applicationContext.getBean(FlowExecutorInterface.class);
|
||||
FlowInterface subflow = flowExecutor.findById(tenantId, namespace, flowId, Optional.ofNullable(revision)).orElseThrow(() -> new IllegalArgumentException("Unable to find flow " + namespace + "." + flowId));
|
||||
|
||||
if (subflow.isDisabled()) {
|
||||
throw new IllegalStateException("Cannot execute a flow which is disabled");
|
||||
}
|
||||
|
||||
if (subflow instanceof FlowWithException fwe) {
|
||||
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
|
||||
}
|
||||
|
||||
|
||||
PluginDefaultService pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
|
||||
try {
|
||||
return Optional.of(pluginDefaultService.injectAllDefaults(subflow, true));
|
||||
} catch (FlowProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// This method is preferred as getSubflow() as it checks current flow and subflow and allowed namespaces
|
||||
private Flow fetchSubflow(RunContext runContext) {
|
||||
// we check that the task tenant is the current tenant to avoid accessing flows from another tenant
|
||||
if (!Objects.equals(tenantId, runContext.flowInfo().tenantId())) {
|
||||
throw new IllegalArgumentException("Cannot embeds a flow from a different tenant");
|
||||
}
|
||||
|
||||
ApplicationContext applicationContext = ContextHelper.context();
|
||||
FlowExecutorInterface flowExecutor = applicationContext.getBean(FlowExecutorInterface.class);
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
|
||||
FlowInterface flow = flowExecutor.findById(flowInfo.tenantId(), flowInfo.namespace(), flowInfo.id(), Optional.of(flowInfo.revision()))
|
||||
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow " + flowInfo.namespace() + "." + flowInfo.id()));
|
||||
FlowInterface subflow = ExecutableUtils.getSubflow(tenantId, namespace, flowId, Optional.ofNullable(revision), flowExecutor, flow);
|
||||
|
||||
// check inputs
|
||||
if (!ListUtils.isEmpty(subflow.getInputs())) {
|
||||
Optional<Input<?>> missing = subflow.getInputs().stream()
|
||||
.filter(input -> input.getRequired() && !inputs.containsKey(input.getId()))
|
||||
.findFirst();
|
||||
if (missing.isPresent()) {
|
||||
throw new IllegalArgumentException("Missing required input " + missing.get().getId());
|
||||
}
|
||||
}
|
||||
|
||||
PluginDefaultService pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
|
||||
return pluginDefaultService.injectAllDefaults(subflow, runContext.logger());
|
||||
}
|
||||
|
||||
/**
|
||||
* Ugly hack to provide the ApplicationContext on {{@link #allChildTasks }} & {{@link #tasksTree }}
|
||||
* We need to inject a way to fetch embedded subflows ...
|
||||
*/
|
||||
@Singleton
|
||||
static class ContextHelper {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
private static ApplicationContext context;
|
||||
|
||||
static ApplicationContext context() {
|
||||
return ContextHelper.context;
|
||||
}
|
||||
|
||||
@EventListener
|
||||
void onStartup(final StartupEvent event) {
|
||||
ContextHelper.context = this.applicationContext;
|
||||
}
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
@Schema(
|
||||
title = "The extracted outputs from the embedded subflow."
|
||||
)
|
||||
private final Map<String, Object> outputs;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
class EmbeddedSubflowTest {
|
||||
@Test
|
||||
@LoadFlows("flows/valids/minimal.yaml")
|
||||
@ExecuteFlow("flows/valids/embedded-flow.yaml")
|
||||
void shouldEmbedTasks(Execution execution) throws Exception {
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
|
||||
assertThat(execution.getTaskRunList(), hasSize(2));
|
||||
assertThat(execution.findTaskRunsByTaskId("embeddedFlow"), notNullValue());
|
||||
assertThat(execution.findTaskRunsByTaskId("date"), notNullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/minimal.yaml", "flows/valids/embedded-flow.yaml"})
|
||||
@ExecuteFlow("flows/valids/embedded-parent.yaml")
|
||||
void shouldEmbedTasksRecursively(Execution execution) throws Exception {
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
|
||||
assertThat(execution.getTaskRunList(), hasSize(3));
|
||||
assertThat(execution.findTaskRunsByTaskId("embeddedParent"), notNullValue());
|
||||
assertThat(execution.findTaskRunsByTaskId("embeddedFlow"), notNullValue());
|
||||
assertThat(execution.findTaskRunsByTaskId("date"), notNullValue());
|
||||
}
|
||||
}
|
||||
8
core/src/test/resources/flows/valids/embedded-flow.yaml
Normal file
8
core/src/test/resources/flows/valids/embedded-flow.yaml
Normal file
@@ -0,0 +1,8 @@
|
||||
id: embedded-flow
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: embeddedFlow
|
||||
type: io.kestra.plugin.core.flow.EmbeddedSubflow
|
||||
namespace: io.kestra.tests
|
||||
flowId: minimal
|
||||
@@ -0,0 +1,8 @@
|
||||
id: embedded-parent
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: embeddedParent
|
||||
type: io.kestra.plugin.core.flow.EmbeddedSubflow
|
||||
namespace: io.kestra.tests
|
||||
flowId: embedded-flow
|
||||
@@ -105,7 +105,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
Flow deserialize = MAPPER.convertValue(map, Flow.class);
|
||||
|
||||
// raise exception for invalid flow, ex: Templates disabled
|
||||
deserialize.allTasksWithChilds();
|
||||
deserialize.allTasks().forEach((task) -> {});
|
||||
|
||||
return deserialize;
|
||||
} catch (DeserializationException | IOException | IllegalArgumentException | FlowProcessingException e) {
|
||||
|
||||
Reference in New Issue
Block a user