diff --git a/core/src/main/java/io/kestra/core/models/triggers/TriggerService.java b/core/src/main/java/io/kestra/core/models/triggers/TriggerService.java index 3a133e4ad9..760795a59f 100644 --- a/core/src/main/java/io/kestra/core/models/triggers/TriggerService.java +++ b/core/src/main/java/io/kestra/core/models/triggers/TriggerService.java @@ -6,8 +6,6 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.ExecutionTrigger; import io.kestra.core.models.tasks.Output; import io.kestra.core.models.flows.State; -import io.kestra.core.runners.DefaultRunContext; -import io.kestra.core.runners.FlowInputOutput; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.ListUtils; @@ -88,8 +86,7 @@ public abstract class TriggerService { } // add inputs and inject defaults (FlowInputOutput handles defaults internally) - FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); - execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs)); + execution = execution.withInputs(runContext.inputAndOutput().readInputs(conditionContext.getFlow(), execution, allInputs)); return execution; } diff --git a/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java b/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java index 59bfc96f2b..41693481f8 100644 --- a/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java +++ b/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java @@ -599,6 +599,11 @@ public class DefaultRunContext extends RunContext { return localPath; } + @Override + public InputAndOutput inputAndOutput() { + return new InputAndOutputImpl(this.applicationContext, this); + } + /** * Builder class for constructing new {@link DefaultRunContext} objects. */ diff --git a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java index 1f89e5c96d..5ed0f335cd 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java @@ -189,12 +189,11 @@ public final class ExecutableUtils { variables.put("taskRunIteration", currentTaskRun.getIteration()); } - FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null); Execution execution = Execution .newExecution( flow, - (f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs), + (f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs), newLabels, Optional.empty()) .withTrigger(ExecutionTrigger.builder() diff --git a/core/src/main/java/io/kestra/core/runners/FlowInputOutput.java b/core/src/main/java/io/kestra/core/runners/FlowInputOutput.java index 2330a713b6..51a9ed9af3 100644 --- a/core/src/main/java/io/kestra/core/runners/FlowInputOutput.java +++ b/core/src/main/java/io/kestra/core/runners/FlowInputOutput.java @@ -3,13 +3,11 @@ package io.kestra.core.runners; import com.fasterxml.jackson.databind.ObjectMapper; import io.kestra.core.encryption.EncryptionService; import io.kestra.core.exceptions.IllegalVariableEvaluationException; -import io.kestra.core.exceptions.KestraRuntimeException; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Data; import io.kestra.core.models.flows.DependsOn; import io.kestra.core.models.flows.FlowInterface; import io.kestra.core.models.flows.Input; -import io.kestra.core.models.flows.Output; import io.kestra.core.models.flows.RenderableInput; import io.kestra.core.models.flows.Type; import io.kestra.core.models.flows.input.FileInput; @@ -539,30 +537,6 @@ public class FlowInputOutput { } } - public static Map renderFlowOutputs(List outputs, RunContext runContext) throws IllegalVariableEvaluationException { - if (outputs == null) return Map.of(); - - // render required outputs - Map outputsById = outputs - .stream() - .filter(output -> output.getRequired() == null || output.getRequired()) - .collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll); - outputsById = runContext.render(outputsById); - - // render optional outputs one by one to catch, log, and skip any error. - for (io.kestra.core.models.flows.Output output : outputs) { - if (Boolean.FALSE.equals(output.getRequired())) { - try { - outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue()))); - } catch (Exception e) { - runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e); - outputsById.put(output.getId(), null); - } - } - } - return outputsById; - } - /** * Mutable wrapper to hold a flow's input, and it's resolved value. */ diff --git a/core/src/main/java/io/kestra/core/runners/InputAndOutput.java b/core/src/main/java/io/kestra/core/runners/InputAndOutput.java new file mode 100644 index 0000000000..8a6268824b --- /dev/null +++ b/core/src/main/java/io/kestra/core/runners/InputAndOutput.java @@ -0,0 +1,29 @@ +package io.kestra.core.runners; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.FlowInterface; +import io.kestra.core.models.flows.Output; + +import java.util.List; +import java.util.Map; + +/** + * InputAndOutput could be used to work with flow execution inputs and outputs. + */ +public interface InputAndOutput { + /** + * Reads the inputs of a flow execution. + */ + Map readInputs(FlowInterface flow, Execution execution, Map inputs); + + /** + * Processes the outputs of a flow execution (parse them based on their types). + */ + Map typedOutputs(FlowInterface flow, Execution execution, Map rOutputs); + + /** + * Render flow execution outputs. + */ + Map renderOutputs(List outputs) throws IllegalVariableEvaluationException; +} diff --git a/core/src/main/java/io/kestra/core/runners/InputAndOutputImpl.java b/core/src/main/java/io/kestra/core/runners/InputAndOutputImpl.java new file mode 100644 index 0000000000..4f802f1dcb --- /dev/null +++ b/core/src/main/java/io/kestra/core/runners/InputAndOutputImpl.java @@ -0,0 +1,56 @@ +package io.kestra.core.runners; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.FlowInterface; +import io.kestra.core.models.flows.Output; +import io.micronaut.context.ApplicationContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class InputAndOutputImpl implements InputAndOutput { + private final FlowInputOutput flowInputOutput; + private final RunContext runContext; + + InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) { + this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class); + this.runContext = runContext; + } + + @Override + public Map readInputs(FlowInterface flow, Execution execution, Map inputs) { + return flowInputOutput.readExecutionInputs(flow, execution, inputs); + } + + @Override + public Map typedOutputs(FlowInterface flow, Execution execution, Map rOutputs) { + return flowInputOutput.typedOutputs(flow, execution, rOutputs); + } + + @Override + public Map renderOutputs(List outputs) throws IllegalVariableEvaluationException { + if (outputs == null) return Map.of(); + + // render required outputs + Map outputsById = outputs + .stream() + .filter(output -> output.getRequired() == null || output.getRequired()) + .collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll); + outputsById = runContext.render(outputsById); + + // render optional outputs one by one to catch, log, and skip any error. + for (io.kestra.core.models.flows.Output output : outputs) { + if (Boolean.FALSE.equals(output.getRequired())) { + try { + outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue()))); + } catch (Exception e) { + runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e); + outputsById.put(output.getId(), null); + } + } + } + return outputsById; + } +} diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index 458ecc6235..21091aea80 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -211,4 +211,9 @@ public abstract class RunContext implements PropertyContext { * @return a new run context with the plugin configuration of the given plugin. */ public abstract RunContext cloneForPlugin(Plugin plugin); + + /** + * @return an InputAndOutput that can be used to work with inputs and outputs. + */ + public abstract InputAndOutput inputAndOutput(); } diff --git a/core/src/main/java/io/kestra/plugin/core/flow/ForEachItem.java b/core/src/main/java/io/kestra/plugin/core/flow/ForEachItem.java index 08234e7176..4779bdee35 100644 --- a/core/src/main/java/io/kestra/plugin/core/flow/ForEachItem.java +++ b/core/src/main/java/io/kestra/plugin/core/flow/ForEachItem.java @@ -23,7 +23,6 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer; import io.kestra.core.services.StorageService; import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.StorageContext; -import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageSplitInterface; import io.kestra.core.utils.GraphUtils; import io.kestra.core.validations.NoSystemLabelValidation; @@ -540,7 +539,7 @@ public class ForEachItem extends Task implements FlowableTask, Child .numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES)); try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { - FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext)); + FileSerde.write(bos, runContext.inputAndOutput().renderOutputs(flow.getOutputs())); URI uri = runContext.storage().putFile( new ByteArrayInputStream(bos.toByteArray()), URI.create((String) taskRun.getOutputs().get("uri")) diff --git a/core/src/main/java/io/kestra/plugin/core/flow/Subflow.java b/core/src/main/java/io/kestra/plugin/core/flow/Subflow.java index 6ebd585cc0..07794a96dd 100644 --- a/core/src/main/java/io/kestra/plugin/core/flow/Subflow.java +++ b/core/src/main/java/io/kestra/plugin/core/flow/Subflow.java @@ -18,7 +18,6 @@ import io.kestra.core.models.tasks.ExecutableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.DefaultRunContext; import io.kestra.core.runners.ExecutableUtils; -import io.kestra.core.runners.FlowInputOutput; import io.kestra.core.runners.FlowMetaStoreInterface; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.SubflowExecution; @@ -38,7 +37,6 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.ToString; import lombok.experimental.SuperBuilder; -import org.slf4j.event.Level; import java.time.ZonedDateTime; import java.util.Collections; @@ -246,11 +244,11 @@ public class Subflow extends Task implements ExecutableTask, Chi if (subflowOutputs != null && !subflowOutputs.isEmpty()) { try { - Map rOutputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext); + var inputAndOutput = runContext.inputAndOutput(); + Map rOutputs = inputAndOutput.renderOutputs(subflowOutputs); - FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking - if (flow.getOutputs() != null && flowInputOutput != null) { - rOutputs = flowInputOutput.typedOutputs(flow, execution, rOutputs); + if (flow.getOutputs() != null) { + rOutputs = inputAndOutput.typedOutputs(flow, execution, rOutputs); } builder.outputs(rOutputs); } catch (Exception e) { diff --git a/core/src/test/java/io/kestra/plugin/core/flow/SubflowTest.java b/core/src/test/java/io/kestra/plugin/core/flow/SubflowTest.java index e47ab50813..65a1abcc31 100644 --- a/core/src/test/java/io/kestra/plugin/core/flow/SubflowTest.java +++ b/core/src/test/java/io/kestra/plugin/core/flow/SubflowTest.java @@ -8,6 +8,7 @@ import io.kestra.core.models.flows.Output; import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State.History; import io.kestra.core.runners.DefaultRunContext; +import io.kestra.core.runners.InputAndOutput; import io.kestra.core.runners.SubflowExecutionResult; import io.kestra.core.services.VariablesService; import io.micronaut.context.ApplicationContext; @@ -46,11 +47,15 @@ class SubflowTest { @Mock private ApplicationContext applicationContext; + @Mock + private InputAndOutput inputAndOutput; + @BeforeEach void beforeEach() { Mockito.when(applicationContext.getBean(VariablesService.class)).thenReturn(new VariablesService()); Mockito.when(runContext.logger()).thenReturn(LOG); Mockito.when(runContext.getApplicationContext()).thenReturn(applicationContext); + Mockito.when(runContext.inputAndOutput()).thenReturn(inputAndOutput); } @Test @@ -118,7 +123,7 @@ class SubflowTest { Map outputs = Map.of("key", "value"); Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(outputs); - + Mockito.when(inputAndOutput.renderOutputs(Mockito.anyList())).thenReturn(Map.of("key", "value")); Subflow subflow = Subflow.builder() .outputs(outputs) @@ -159,6 +164,7 @@ class SubflowTest { Output output = Output.builder().id("key").value("value").build(); Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue())); + Mockito.when(inputAndOutput.typedOutputs(Mockito.any(), Mockito.any(), Mockito.anyMap())).thenReturn(Map.of("key", "value")); Flow flow = Flow.builder() .outputs(List.of(output)) .build(); diff --git a/executor/src/main/java/io/kestra/executor/ExecutorService.java b/executor/src/main/java/io/kestra/executor/ExecutorService.java index 712e2a6844..65d2993630 100644 --- a/executor/src/main/java/io/kestra/executor/ExecutorService.java +++ b/executor/src/main/java/io/kestra/executor/ExecutorService.java @@ -402,10 +402,11 @@ public class ExecutorService { if (flow.getOutputs() != null) { RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution()); + var inputAndOutput = runContext.inputAndOutput(); try { - Map outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext); - outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs); + Map outputs = inputAndOutput.renderOutputs(flow.getOutputs()); + outputs = inputAndOutput.typedOutputs(flow, executor.getExecution(), outputs); newExecution = newExecution.withOutputs(outputs); } catch (Exception e) { Logs.logExecution(