feat(flows): add new check conditions

Adds new property 'checks' on flow in order to allow
pre-conditions to be evaluated before execution

Fixes: kestra-io/kestra-ee#5759
This commit is contained in:
Florian Hussonnois
2025-11-19 13:44:18 +01:00
committed by Florian Hussonnois
parent 90ee720d49
commit eb51c5be37
9 changed files with 451 additions and 17 deletions

View File

@@ -10,6 +10,7 @@ import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.hierarchies.FlowGraph;
import io.kestra.core.models.storage.FileMetas;
@@ -91,6 +92,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.reactivestreams.Publisher;
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
@@ -190,14 +192,14 @@ public class ExecutionController {
@Inject
private Optional<OpenTelemetry> openTelemetry;
@Inject
private ExecutionStreamingService executionStreamingService;
@Inject
private LocalPathFactory localPathFactory;
@Inject
private SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
private LogService logService;
@Value("${" + LocalPath.ENABLE_PREVIEW_CONFIG + ":true}")
private boolean enableLocalFilePreview;
@@ -691,7 +693,11 @@ public class ExecutionController {
Execution execution = Execution.newExecution(flow, parsedLabels);
return flowInputOutput
.validateExecutionInputs(flow.getInputs(), flow, execution, inputs)
.map(values -> ApiValidateExecutionInputsResponse.of(id, namespace, values));
.map(values -> {
Map<String, Object> inputsAsMap = values.stream().collect(HashMap::new, (m,v)->m.put(v.input().getId(), v.value()), HashMap::putAll);
List<Check> checks = flowService.getFailedChecks(flow, inputsAsMap);
return ApiValidateExecutionInputsResponse.of(id, namespace, checks, values);
});
}
@ExecuteOn(TaskExecutors.IO)
@@ -729,7 +735,24 @@ public class ExecutionController {
return flowInputOutput.readExecutionInputs(flow, current, inputs)
.flatMap(executionInputs -> {
Execution executionWithInputs = current.withInputs(executionInputs);
Check.Behavior behavior = Check.resolveBehavior(flowService.getFailedChecks(flow, executionInputs));
if (Check.Behavior.BLOCK_EXECUTION.equals(behavior)) {
return Mono.error(new IllegalArgumentException(
"Flow execution blocked: one or more condition checks evaluated to false."
));
}
final Execution executionWithInputs = Optional.of(current.withInputs(executionInputs))
.map(exec -> {
if (Check.Behavior.FAIL_EXECUTION.equals(behavior)) {
this.logService.logExecution(current, log, Level.WARN, "Flow execution failed because one or more condition checks evaluated to false.");
return exec.withState(State.Type.FAILED);
} else {
return exec;
}
})
.get();
try {
// inject the traceparent into the execution
openTelemetry
@@ -740,7 +763,7 @@ public class ExecutionController {
executionQueue.emit(executionWithInputs);
eventPublisher.publishEvent(new CrudEvent<>(executionWithInputs, CrudEventType.CREATE));
if (!wait) {
if (!wait || executionWithInputs.getState().isFailed()) {
return Mono.just(ExecutionResponse.fromExecution(
executionWithInputs,
executionUrl(executionWithInputs)
@@ -1457,7 +1480,7 @@ public class ExecutionController {
Flow flow = flowRepository.findByExecutionWithoutAcl(execution);
return executionService.validateForResume(execution, flow, inputs)
.map(values -> ApiValidateExecutionInputsResponse.of(execution.getFlowId(), execution.getNamespace(), values))
.map(values -> ApiValidateExecutionInputsResponse.of(execution.getFlowId(), execution.getNamespace(), List.of(), values))
// need to consume the inputs in case of error
.doOnError(t -> Flux.from(inputs).subscribeOn(Schedulers.boundedElastic()).blockLast());
}
@@ -2574,7 +2597,8 @@ public class ExecutionController {
@Parameter(description = "The namespace")
String namespace,
@Parameter(description = "The flow's inputs")
List<ApiInputAndValue> inputs
List<ApiInputAndValue> inputs,
List<ApiCheckFailure> checks
) {
@Introspected
@@ -2597,10 +2621,20 @@ public class ExecutionController {
@Parameter(description = "The error message")
String message
) {
}
@Introspected
public record ApiCheckFailure(
@Parameter(description = "The message")
String message,
@Parameter(description = "The message style")
Check.Style style,
@Parameter(description = "The behavior")
Check.Behavior behavior
) {
}
public static ApiValidateExecutionInputsResponse of(String id, String namespace, List<InputAndValue> inputs) {
public static ApiValidateExecutionInputsResponse of(String id, String namespace, List<Check> checks, List<InputAndValue> inputs) {
return new ApiValidateExecutionInputsResponse(
id,
namespace,
@@ -2615,7 +2649,8 @@ public class ExecutionController {
.map(cv -> new ApiInputError(cv.getMessage()))
.toList()
).orElse(List.of())
)).toList()
)).toList(),
checks.stream().map(check -> new ApiCheckFailure(check.getMessage(), check.getStyle(), check.getBehavior())).toList()
);
}
}