feat(execution): bring support for input and output processing in the run context

Part-of: https://github.com/kestra-io/kestra-ee/issues/4228

Encapsulate access the FlowInputOutput from the RunContext in a new InputAndOutput component with a currated list of supported methods used by plugins.
This commit is contained in:
Loïc Mathieu
2025-12-16 10:51:31 +01:00
parent 01b5441d16
commit 2d2cb00cab
11 changed files with 112 additions and 43 deletions

View File

@@ -6,8 +6,6 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
@@ -88,8 +86,7 @@ public abstract class TriggerService {
}
// add inputs and inject defaults (FlowInputOutput handles defaults internally)
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
execution = execution.withInputs(runContext.inputAndOutput().readInputs(conditionContext.getFlow(), execution, allInputs));
return execution;
}

View File

@@ -599,6 +599,11 @@ public class DefaultRunContext extends RunContext {
return localPath;
}
@Override
public InputAndOutput inputAndOutput() {
return new InputAndOutputImpl(this.applicationContext, this);
}
/**
* Builder class for constructing new {@link DefaultRunContext} objects.
*/

View File

@@ -189,12 +189,11 @@ public final class ExecutableUtils {
variables.put("taskRunIteration", currentTaskRun.getIteration());
}
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
Execution execution = Execution
.newExecution(
flow,
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
(f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs),
newLabels,
Optional.empty())
.withTrigger(ExecutionTrigger.builder()

View File

@@ -3,13 +3,11 @@ package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.FileInput;
@@ -539,30 +537,6 @@ public class FlowInputOutput {
}
}
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
/**
* Mutable wrapper to hold a flow's input, and it's resolved value.
*/

View File

@@ -0,0 +1,29 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Output;
import java.util.List;
import java.util.Map;
/**
* InputAndOutput could be used to work with flow execution inputs and outputs.
*/
public interface InputAndOutput {
/**
* Reads the inputs of a flow execution.
*/
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
/**
* Processes the outputs of a flow execution (parse them based on their types).
*/
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
/**
* Render flow execution outputs.
*/
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
}

View File

@@ -0,0 +1,56 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Output;
import io.micronaut.context.ApplicationContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class InputAndOutputImpl implements InputAndOutput {
private final FlowInputOutput flowInputOutput;
private final RunContext runContext;
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
this.runContext = runContext;
}
@Override
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
}
@Override
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
}
@Override
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
}

View File

@@ -211,4 +211,9 @@ public abstract class RunContext implements PropertyContext {
* @return a new run context with the plugin configuration of the given plugin.
*/
public abstract RunContext cloneForPlugin(Plugin plugin);
/**
* @return an InputAndOutput that can be used to work with inputs and outputs.
*/
public abstract InputAndOutput inputAndOutput();
}

View File

@@ -23,7 +23,6 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.StorageService;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageSplitInterface;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.validations.NoSystemLabelValidation;
@@ -540,7 +539,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext));
FileSerde.write(bos, runContext.inputAndOutput().renderOutputs(flow.getOutputs()));
URI uri = runContext.storage().putFile(
new ByteArrayInputStream(bos.toByteArray()),
URI.create((String) taskRun.getOutputs().get("uri"))

View File

@@ -18,7 +18,6 @@ import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecution;
@@ -38,7 +37,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.slf4j.event.Level;
import java.time.ZonedDateTime;
import java.util.Collections;
@@ -246,11 +244,11 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
try {
Map<String, Object> rOutputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext);
var inputAndOutput = runContext.inputAndOutput();
Map<String, Object> rOutputs = inputAndOutput.renderOutputs(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
rOutputs = flowInputOutput.typedOutputs(flow, execution, rOutputs);
if (flow.getOutputs() != null) {
rOutputs = inputAndOutput.typedOutputs(flow, execution, rOutputs);
}
builder.outputs(rOutputs);
} catch (Exception e) {

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.State.History;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.InputAndOutput;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.services.VariablesService;
import io.micronaut.context.ApplicationContext;
@@ -46,11 +47,15 @@ class SubflowTest {
@Mock
private ApplicationContext applicationContext;
@Mock
private InputAndOutput inputAndOutput;
@BeforeEach
void beforeEach() {
Mockito.when(applicationContext.getBean(VariablesService.class)).thenReturn(new VariablesService());
Mockito.when(runContext.logger()).thenReturn(LOG);
Mockito.when(runContext.getApplicationContext()).thenReturn(applicationContext);
Mockito.when(runContext.inputAndOutput()).thenReturn(inputAndOutput);
}
@Test
@@ -118,7 +123,7 @@ class SubflowTest {
Map<String, Object> outputs = Map.of("key", "value");
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(outputs);
Mockito.when(inputAndOutput.renderOutputs(Mockito.anyList())).thenReturn(Map.of("key", "value"));
Subflow subflow = Subflow.builder()
.outputs(outputs)
@@ -159,6 +164,7 @@ class SubflowTest {
Output output = Output.builder().id("key").value("value").build();
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue()));
Mockito.when(inputAndOutput.typedOutputs(Mockito.any(), Mockito.any(), Mockito.anyMap())).thenReturn(Map.of("key", "value"));
Flow flow = Flow.builder()
.outputs(List.of(output))
.build();

View File

@@ -402,10 +402,11 @@ public class ExecutorService {
if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
var inputAndOutput = runContext.inputAndOutput();
try {
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
Map<String, Object> outputs = inputAndOutput.renderOutputs(flow.getOutputs());
outputs = inputAndOutput.typedOutputs(flow, executor.getExecution(), outputs);
newExecution = newExecution.withOutputs(outputs);
} catch (Exception e) {
Logs.logExecution(