mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(core): add type to flow outputs (#3094)
This commit also add outputs validation for flow Fix: #3094
This commit is contained in:
committed by
Florian Hussonnois
parent
4b02fc16ed
commit
b2af7feb4b
2
Makefile
2
Makefile
@@ -129,6 +129,8 @@ kestra:
|
||||
server:
|
||||
basic-auth:
|
||||
enabled: false
|
||||
encryption:
|
||||
secret-key: 3ywuDa/Ec61VHkOX3RlI9gYq7CaD0mv0Pf3DHtAXA6U=
|
||||
repository:
|
||||
type: postgres
|
||||
storage:
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.flows;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.core.exceptions.MissingRequiredInput;
|
||||
import io.kestra.core.exceptions.MissingRequiredArgument;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
@@ -102,7 +102,7 @@ public class FlowTestCommand extends AbstractCommand {
|
||||
);
|
||||
|
||||
runner.close();
|
||||
} catch (MissingRequiredInput e) {
|
||||
} catch (MissingRequiredArgument e) {
|
||||
throw new CommandLine.ParameterException(this.spec.commandLine(), e.getMessage());
|
||||
} catch (IOException | TimeoutException e) {
|
||||
throw new IllegalStateException(e);
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
public class MissingRequiredArgument extends IllegalArgumentException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public MissingRequiredArgument(String message, Throwable e) {
|
||||
super(message, e);
|
||||
}
|
||||
|
||||
public MissingRequiredArgument(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
public class MissingRequiredInput extends IllegalArgumentException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public MissingRequiredInput(String message, Throwable e) {
|
||||
super(message, e);
|
||||
}
|
||||
|
||||
public MissingRequiredInput(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
21
core/src/main/java/io/kestra/core/models/flows/Data.java
Normal file
21
core/src/main/java/io/kestra/core/models/flows/Data.java
Normal file
@@ -0,0 +1,21 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
/**
|
||||
* Interface for defining on identifiable and typed data.
|
||||
*/
|
||||
public interface Data {
|
||||
|
||||
/**
|
||||
* The ID for this data.
|
||||
*
|
||||
* @return a string id.
|
||||
*/
|
||||
String getId();
|
||||
|
||||
/**
|
||||
* The Type for this data.
|
||||
*
|
||||
* @return a type.
|
||||
*/
|
||||
Type getType();
|
||||
}
|
||||
@@ -118,6 +118,7 @@ public class Flow implements DeletedInterface, TenantInterface {
|
||||
description = "Output values make information about the execution of your Flow available and expose for other Kestra flows to use. Output values are similar to return values in programming languages."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
@Valid
|
||||
List<Output> outputs;
|
||||
|
||||
public Logger logger() {
|
||||
|
||||
@@ -37,13 +37,12 @@ import jakarta.validation.constraints.Pattern;
|
||||
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
|
||||
@JsonSubTypes.Type(value = URIInput.class, name = "URI")
|
||||
})
|
||||
public abstract class Input<T> {
|
||||
public abstract class Input<T> implements Data {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
|
||||
String id;
|
||||
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
Type type;
|
||||
@@ -65,30 +64,4 @@ public abstract class Input<T> {
|
||||
}
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public enum Type {
|
||||
STRING(StringInput.class.getName()),
|
||||
INT(IntInput.class.getName()),
|
||||
FLOAT(FloatInput.class.getName()),
|
||||
BOOLEAN(BooleanInput.class.getName()),
|
||||
DATETIME(DateTimeInput.class.getName()),
|
||||
DATE(DateInput.class.getName()),
|
||||
TIME(TimeInput.class.getName()),
|
||||
DURATION(DurationInput.class.getName()),
|
||||
FILE(FileInput.class.getName()),
|
||||
JSON(JsonInput.class.getName()),
|
||||
URI(URIInput.class.getName()),
|
||||
SECRET(SecretInput.class.getName());
|
||||
|
||||
private final String clsName;
|
||||
|
||||
Type(String clsName) {
|
||||
this.clsName = clsName;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Class<? extends Input<?>> cls() throws ClassNotFoundException {
|
||||
return (Class<? extends Input<?>>) Class.forName(this.clsName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.validation.Valid;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
@@ -16,7 +17,7 @@ import jakarta.validation.constraints.Pattern;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
public class Output {
|
||||
public class Output implements Data {
|
||||
/**
|
||||
* The output's unique id.
|
||||
*/
|
||||
@@ -33,4 +34,11 @@ public class Output {
|
||||
*/
|
||||
@NotNull
|
||||
Object value;
|
||||
|
||||
/**
|
||||
* The type of the output.
|
||||
*/
|
||||
@NotNull
|
||||
@Valid
|
||||
Type type;
|
||||
}
|
||||
|
||||
45
core/src/main/java/io/kestra/core/models/flows/Type.java
Normal file
45
core/src/main/java/io/kestra/core/models/flows/Type.java
Normal file
@@ -0,0 +1,45 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.kestra.core.models.flows.input.BooleanInput;
|
||||
import io.kestra.core.models.flows.input.DateInput;
|
||||
import io.kestra.core.models.flows.input.DateTimeInput;
|
||||
import io.kestra.core.models.flows.input.DurationInput;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
import io.kestra.core.models.flows.input.FloatInput;
|
||||
import io.kestra.core.models.flows.input.IntInput;
|
||||
import io.kestra.core.models.flows.input.JsonInput;
|
||||
import io.kestra.core.models.flows.input.SecretInput;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.flows.input.TimeInput;
|
||||
import io.kestra.core.models.flows.input.URIInput;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
|
||||
/**
|
||||
* The supported data types.
|
||||
*/
|
||||
@Introspected
|
||||
public enum Type {
|
||||
STRING(StringInput.class.getName()),
|
||||
INT(IntInput.class.getName()),
|
||||
FLOAT(FloatInput.class.getName()),
|
||||
BOOLEAN(BooleanInput.class.getName()),
|
||||
DATETIME(DateTimeInput.class.getName()),
|
||||
DATE(DateInput.class.getName()),
|
||||
TIME(TimeInput.class.getName()),
|
||||
DURATION(DurationInput.class.getName()),
|
||||
FILE(FileInput.class.getName()),
|
||||
JSON(JsonInput.class.getName()),
|
||||
URI(URIInput.class.getName()),
|
||||
SECRET(SecretInput.class.getName());
|
||||
|
||||
private final String clsName;
|
||||
|
||||
Type(String clsName) {
|
||||
this.clsName = clsName;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Class<? extends Input<?>> cls() throws ClassNotFoundException {
|
||||
return (Class<? extends Input<?>>) Class.forName(this.clsName);
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.models.tasks.common;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Getter;
|
||||
@@ -8,15 +7,23 @@ import lombok.Getter;
|
||||
import java.security.GeneralSecurityException;
|
||||
|
||||
@Getter
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true)
|
||||
@Schema(hidden = true)
|
||||
public class EncryptedString {
|
||||
|
||||
public static final String TYPE = "io.kestra.datatype:aes_encrypted";
|
||||
|
||||
private final String value;
|
||||
|
||||
private final String type = TYPE;
|
||||
|
||||
private EncryptedString(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public static EncryptedString from(String encrypted) {
|
||||
return new EncryptedString(encrypted);
|
||||
}
|
||||
|
||||
public static EncryptedString from(String plainText, RunContext runContext) throws GeneralSecurityException {
|
||||
String encrypted = runContext.encrypt(plainText);
|
||||
return new EncryptedString(encrypted);
|
||||
|
||||
@@ -30,6 +30,7 @@ import org.slf4j.event.Level;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@@ -56,6 +57,9 @@ public class ExecutorService {
|
||||
@Inject
|
||||
private LogService logService;
|
||||
|
||||
@Inject
|
||||
private RunnerUtils runnerUtils;
|
||||
|
||||
protected FlowExecutorInterface flowExecutorInterface;
|
||||
|
||||
protected FlowExecutorInterface flowExecutorInterface() {
|
||||
@@ -355,16 +359,12 @@ public class ExecutorService {
|
||||
try {
|
||||
Map<String, Object> outputs = flow.getOutputs()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
);
|
||||
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
outputs = runContext.render(outputs);
|
||||
newExecution = newExecution
|
||||
.withOutputs(outputs);
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
outputs = runnerUtils.typedOutputs(flow, executor.getExecution(), outputs);
|
||||
newExecution = newExecution.withOutputs(outputs);
|
||||
} catch (Exception e) {
|
||||
logService.logExecution(
|
||||
executor.getExecution(),
|
||||
logger,
|
||||
|
||||
@@ -350,7 +350,7 @@ public class RunContext {
|
||||
for (var entry: outputs.entrySet()) {
|
||||
if (entry.getValue() instanceof Map map) {
|
||||
// if some outputs are of type EncryptedString we decode them and replace the object
|
||||
if (EncryptedString.class.getName().equals(map.get("type"))) {
|
||||
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
|
||||
try {
|
||||
String decoded = decrypt((String) map.get("value"));
|
||||
outputs.put(entry.getKey(), decoded);
|
||||
|
||||
@@ -3,12 +3,15 @@ package io.kestra.core.runners;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.MissingRequiredInput;
|
||||
import io.kestra.core.exceptions.MissingRequiredArgument;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
@@ -36,7 +39,13 @@ import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalTime;
|
||||
import java.time.format.DateTimeParseException;
|
||||
import java.util.*;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
@@ -121,7 +130,7 @@ public class RunnerUtils {
|
||||
}
|
||||
|
||||
if (input.getRequired() && current == null) {
|
||||
throw new MissingRequiredInput("Missing required input value '" + input.getId() + "'");
|
||||
throw new MissingRequiredArgument("Missing required input value '" + input.getId() + "'");
|
||||
}
|
||||
|
||||
if (!input.getRequired() && current == null) {
|
||||
@@ -131,7 +140,7 @@ public class RunnerUtils {
|
||||
));
|
||||
}
|
||||
|
||||
var parsedInput = parseInput(flow, execution, input, current);
|
||||
var parsedInput = parseData(execution, input, current);
|
||||
parsedInput.ifPresent(parsed -> input.validate(parsed.getValue()));
|
||||
return parsedInput;
|
||||
})
|
||||
@@ -142,84 +151,116 @@ public class RunnerUtils {
|
||||
return handleNestedInputs(results);
|
||||
}
|
||||
|
||||
private Optional<AbstractMap.SimpleEntry<String, Object>> parseInput(Flow flow, Execution execution, Input<?> input, Object current) {
|
||||
switch (input.getType()) {
|
||||
public Map<String, Object> typedOutputs(Flow flow, Execution execution, Map<String, Object> in) {
|
||||
if (flow.getOutputs() == null) {
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
Map<String, Object> results = flow
|
||||
.getOutputs()
|
||||
.stream()
|
||||
.map(output -> {
|
||||
Object current = in == null ? null : in.get(output.getId());
|
||||
return parseData(execution, output, current)
|
||||
.map(entry -> {
|
||||
if (output.getType().equals(Type.SECRET)) {
|
||||
return new AbstractMap.SimpleEntry<>(
|
||||
entry.getKey(),
|
||||
EncryptedString.from(entry.getValue().toString())
|
||||
);
|
||||
}
|
||||
return entry;
|
||||
});
|
||||
})
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getKey(), entry.getValue()), Map::putAll);
|
||||
|
||||
// Ensure outputs are compliant with tasks outputs.
|
||||
return JacksonMapper.toMap(results);
|
||||
}
|
||||
|
||||
private Optional<AbstractMap.SimpleEntry<String, Object>> parseData(Execution execution, Data data, Object current) {
|
||||
if (data.getType() == null) {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(data.getId(), current));
|
||||
}
|
||||
final String loggableType = data instanceof Input ? "input" : "output";
|
||||
switch (data.getType()) {
|
||||
case STRING -> {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
current
|
||||
));
|
||||
}
|
||||
case SECRET -> {
|
||||
try {
|
||||
if (secretKey.isEmpty()) {
|
||||
throw new MissingRequiredInput("Unable to use a SECRET input as encryption is not configured");
|
||||
throw new MissingRequiredArgument("Unable to use a SECRET " + loggableType + " as encryption is not configured");
|
||||
}
|
||||
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
EncryptionService.encrypt(secretKey.get(), (String) current)
|
||||
));
|
||||
} catch (GeneralSecurityException e) {
|
||||
throw new MissingRequiredInput("Invalid SECRET format for '" + input.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
throw new MissingRequiredArgument("Invalid SECRET format for '" + data.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
case INT -> {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
current instanceof Integer ? current : Integer.valueOf((String) current)
|
||||
));
|
||||
}
|
||||
case FLOAT -> {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
current instanceof Float ? current : Float.valueOf((String) current)
|
||||
));
|
||||
}
|
||||
case BOOLEAN -> {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
current instanceof Boolean ? current : Boolean.valueOf((String) current)
|
||||
));
|
||||
}
|
||||
case DATETIME -> {
|
||||
try {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
Instant.parse(((String) current))
|
||||
));
|
||||
} catch (DateTimeParseException e) {
|
||||
throw new MissingRequiredInput("Invalid DATETIME format for '" + input.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
throw new MissingRequiredArgument("Invalid DATETIME format for '" + data.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
case DATE -> {
|
||||
try {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
LocalDate.parse(((String) current))
|
||||
));
|
||||
} catch (DateTimeParseException e) {
|
||||
throw new MissingRequiredInput("Invalid DATE format for '" + input.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
throw new MissingRequiredArgument("Invalid DATE format for '" + data.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
case TIME -> {
|
||||
try {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
LocalTime.parse(((String) current))
|
||||
));
|
||||
} catch (DateTimeParseException e) {
|
||||
throw new MissingRequiredInput("Invalid TIME format for '" + input.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
throw new MissingRequiredArgument("Invalid TIME format for '" + data.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
case DURATION -> {
|
||||
try {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
Duration.parse(((String) current))
|
||||
));
|
||||
} catch (DateTimeParseException e) {
|
||||
throw new MissingRequiredInput("Invalid DURATION format for '" + input.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
throw new MissingRequiredArgument("Invalid DURATION format for '" + data.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
case FILE -> {
|
||||
@@ -228,42 +269,42 @@ public class RunnerUtils {
|
||||
|
||||
if (uri.getScheme() != null && uri.getScheme().equals("kestra")) {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
uri
|
||||
));
|
||||
} else {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
storageInterface.from(execution, input.getId(), new File(((String) current)))
|
||||
data.getId(),
|
||||
storageInterface.from(execution, data.getId(), new File(((String) current)))
|
||||
));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new MissingRequiredInput("Invalid input arguments for file on input '" + input.getId() + "'", e);
|
||||
throw new MissingRequiredArgument("Invalid " + loggableType + " arguments for file on " + loggableType + " '" + data.getId() + "'", e);
|
||||
}
|
||||
}
|
||||
case JSON -> {
|
||||
try {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
JacksonMapper.toObject(((String) current))
|
||||
));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new MissingRequiredInput("Invalid JSON format for '" + input.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
throw new MissingRequiredArgument("Invalid JSON format for '" + data.getId() + "' for '" + current + "' with error " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
case URI -> {
|
||||
Matcher matcher = URI_PATTERN.matcher((String) current);
|
||||
if (matcher.matches()) {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getId(),
|
||||
data.getId(),
|
||||
current
|
||||
));
|
||||
} else {
|
||||
throw new MissingRequiredInput("Invalid URI format for '" + input.getId() + "' for '" + current + "'");
|
||||
throw new MissingRequiredArgument("Invalid URI format for '" + data.getId() + "' for '" + current + "'");
|
||||
}
|
||||
}
|
||||
default ->
|
||||
throw new MissingRequiredInput("Invalid input type '" + input.getType() + "' for '" + input.getId() + "'");
|
||||
throw new MissingRequiredArgument("Invalid data type '" + data.getType() + "' for '" + data.getId() + "'");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -460,5 +501,4 @@ public class RunnerUtils {
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.ExecutableUtils;
|
||||
import io.kestra.core.runners.FlowExecutorInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.runners.SubflowExecution;
|
||||
import io.kestra.core.runners.SubflowExecutionResult;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
@@ -198,7 +199,12 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output> {
|
||||
|
||||
if (subflowOutputs != null) {
|
||||
try {
|
||||
builder.outputs(runContext.render(subflowOutputs));
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
RunnerUtils runnerUtils = runContext.getApplicationContext().getBean(RunnerUtils.class); // this is hacking
|
||||
if (flow.getOutputs() != null && runnerUtils != null) {
|
||||
outputs = runnerUtils.typedOutputs(flow, execution, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||
var state = this.isAllowFailure() ? State.Type.WARNING : State.Type.FAILED;
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package io.kestra.core.validations.validator;
|
||||
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.validations.FlowValidation;
|
||||
@@ -63,17 +65,16 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
|
||||
// input unique name
|
||||
if (value.getInputs() != null) {
|
||||
List<String> inputNames = value.getInputs()
|
||||
.stream()
|
||||
.map(Input::getId)
|
||||
.toList();
|
||||
List<String> inputDuplicates = inputNames
|
||||
.stream()
|
||||
.distinct()
|
||||
.filter(entry -> Collections.frequency(inputNames, entry) > 1)
|
||||
.toList();
|
||||
if (!inputDuplicates.isEmpty()) {
|
||||
violations.add("Duplicate input with name [" + String.join(", ", inputDuplicates) + "]");
|
||||
List<String> duplicates = getDuplicates(value.getInputs().stream().map(Data::getId).toList());
|
||||
if (!duplicates.isEmpty()) {
|
||||
violations.add("Duplicate input with name [" + String.join(", ", duplicates) + "]");
|
||||
}
|
||||
}
|
||||
// output unique name
|
||||
if (value.getOutputs() != null) {
|
||||
List<String> duplicates = getDuplicates(value.getOutputs().stream().map(Data::getId).toList());
|
||||
if (!duplicates.isEmpty()) {
|
||||
violations.add("Duplicate output with name [" + String.join(", ", duplicates) + "]");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import io.kestra.core.models.conditions.types.DayWeekCondition;
|
||||
import io.kestra.core.models.conditions.types.DayWeekInMonthCondition;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
@@ -402,8 +402,8 @@ class ScheduleTest {
|
||||
)
|
||||
)
|
||||
.inputs(List.of(
|
||||
StringInput.builder().id("input1").type(Input.Type.STRING).required(false).build(),
|
||||
StringInput.builder().id("input2").type(Input.Type.STRING).defaults("default").build()
|
||||
StringInput.builder().id("input1").type(Type.STRING).required(false).build(),
|
||||
StringInput.builder().id("input2").type(Type.STRING).defaults("default").build()
|
||||
))
|
||||
.build();
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
@@ -120,7 +120,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
.id(flowId)
|
||||
.namespace("io.kestra.unittest")
|
||||
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format("test").build()))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Input.Type.STRING).id("a").build()))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id("a").build()))
|
||||
.build();
|
||||
// create with repository
|
||||
FlowWithSource flow = flowRepository.create(first, first.generateSource(), taskDefaultService.injectDefaults(first));
|
||||
@@ -140,7 +140,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
.message("Hello World")
|
||||
.build()
|
||||
))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Input.Type.STRING).id("b").build()))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id("b").build()))
|
||||
.build();
|
||||
|
||||
// revision is incremented
|
||||
@@ -305,7 +305,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
Flow flow = Flow.builder()
|
||||
.id(flowId)
|
||||
.namespace("io.kestra.unittest")
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Input.Type.STRING).id("a").build()))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id("a").build()))
|
||||
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format("test").build()))
|
||||
.build();
|
||||
|
||||
@@ -316,7 +316,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
Flow update = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest2")
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Input.Type.STRING).id("b").build()))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id("b").build()))
|
||||
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format("test").build()))
|
||||
.build();
|
||||
;
|
||||
@@ -446,7 +446,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
.id(flowId)
|
||||
.namespace(namespace)
|
||||
.tasks(Collections.singletonList(Return.builder().id("test").type(Return.class.getName()).format("test").build()))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Input.Type.STRING).id("a").build()))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id("a").build()))
|
||||
.build();
|
||||
// create with repository
|
||||
flowRepository.create(first, first.generateSource(), taskDefaultService.injectDefaults(first));
|
||||
@@ -462,7 +462,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
.message("Hello World")
|
||||
.build()
|
||||
))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Input.Type.STRING).id("b").build()))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id("b").build()))
|
||||
.build();
|
||||
|
||||
flowRepository.update(flowRev2, first, flowRev2.generateSource(), taskDefaultService.injectDefaults(flowRev2));
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.CharStreams;
|
||||
import io.kestra.core.exceptions.MissingRequiredInput;
|
||||
import io.kestra.core.exceptions.MissingRequiredArgument;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
@@ -297,7 +297,7 @@ public class InputsTest extends AbstractMemoryRunnerTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("uri", "http:/bla");
|
||||
|
||||
MissingRequiredInput e = assertThrows(MissingRequiredInput.class, () -> {
|
||||
MissingRequiredArgument e = assertThrows(MissingRequiredArgument.class, () -> {
|
||||
Map<String, Object> typeds = typedInputs(map);
|
||||
});
|
||||
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.MissingRequiredInput;
|
||||
import io.kestra.core.exceptions.MissingRequiredArgument;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.micronaut.core.annotation.NonNull;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
@@ -46,7 +47,7 @@ public class NoEncryptionConfiguredTest extends AbstractMemoryRunnerTest impleme
|
||||
TaskRun hello = execution.findTaskRunsByTaskId("hello").get(0);
|
||||
Map<String, String> valueOutput = (Map<String, String>) hello.getOutputs().get("value");
|
||||
assertThat(valueOutput.size(), is(2));
|
||||
assertThat(valueOutput.get("type"), is("io.kestra.core.models.tasks.common.EncryptedString"));
|
||||
assertThat(valueOutput.get("type"), is(EncryptedString.TYPE));
|
||||
// the value is not encrypted as there is no encryption key
|
||||
assertThat(valueOutput.get("value"), is("Hello World"));
|
||||
TaskRun returnTask = execution.findTaskRunsByTaskId("return").get(0);
|
||||
@@ -64,6 +65,6 @@ public class NoEncryptionConfiguredTest extends AbstractMemoryRunnerTest impleme
|
||||
.flowId(flow.getId())
|
||||
.build();
|
||||
|
||||
assertThrows(MissingRequiredInput.class, () -> runnerUtils.typedInputs(flow, execution, InputsTest.inputs));
|
||||
assertThrows(MissingRequiredArgument.class, () -> runnerUtils.typedInputs(flow, execution, InputsTest.inputs));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.metrics.Counter;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -237,7 +238,7 @@ class RunContextTest extends AbstractMemoryRunnerTest {
|
||||
TaskRun hello = execution.findTaskRunsByTaskId("hello").get(0);
|
||||
Map<String, String> valueOutput = (Map<String, String>) hello.getOutputs().get("value");
|
||||
assertThat(valueOutput.size(), is(2));
|
||||
assertThat(valueOutput.get("type"), is("io.kestra.core.models.tasks.common.EncryptedString"));
|
||||
assertThat(valueOutput.get("type"), is(EncryptedString.TYPE));
|
||||
// the value is encrypted so it's not the plaintext value of the task property
|
||||
assertThat(valueOutput.get("value"), not("Hello World"));
|
||||
TaskRun returnTask = execution.findTaskRunsByTaskId("return").get(0);
|
||||
|
||||
@@ -5,10 +5,7 @@ import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.TaskDefault;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.PollingTriggerInterface;
|
||||
@@ -48,13 +45,13 @@ abstract public class AbstractSchedulerTest {
|
||||
.namespace("io.kestra.unittest")
|
||||
.inputs(List.of(
|
||||
StringInput.builder()
|
||||
.type(Input.Type.STRING)
|
||||
.type(Type.STRING)
|
||||
.id("testInputs")
|
||||
.required(false)
|
||||
.defaults("test")
|
||||
.build(),
|
||||
StringInput.builder()
|
||||
.type(Input.Type.STRING)
|
||||
.type(Type.STRING)
|
||||
.id("def")
|
||||
.required(false)
|
||||
.defaults("awesome")
|
||||
|
||||
@@ -4,12 +4,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.Constant;
|
||||
import io.kestra.core.models.triggers.types.Schedule;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.kestra.core.tasks.debugs.Return;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -23,7 +22,6 @@ import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@@ -139,7 +137,7 @@ class YamlFlowParserTest {
|
||||
|
||||
assertThat(flow.getInputs().size(), is(1));
|
||||
assertThat(flow.getInputs().get(0).getId(), is("myInput"));
|
||||
assertThat(flow.getInputs().get(0).getType(), is(Input.Type.STRING));
|
||||
assertThat(flow.getInputs().get(0).getType(), is(Type.STRING));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.core.validations;
|
||||
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
@@ -19,7 +19,7 @@ class InputTest {
|
||||
void inputValidation() {
|
||||
final StringInput validInput = StringInput.builder()
|
||||
.id("test")
|
||||
.type(Input.Type.STRING)
|
||||
.type(Type.STRING)
|
||||
.validator("[A-Z]+")
|
||||
.build();
|
||||
|
||||
|
||||
@@ -8,4 +8,5 @@ tasks:
|
||||
|
||||
outputs:
|
||||
- id: "key"
|
||||
value: "{{ invalid }}"
|
||||
value: "{{ invalid }}"
|
||||
type: STRING
|
||||
@@ -8,4 +8,5 @@ tasks:
|
||||
|
||||
outputs:
|
||||
- id: "key"
|
||||
value: "{{ outputs.return }}"
|
||||
value: "{{ outputs.return }}"
|
||||
type: STRING
|
||||
@@ -12,4 +12,5 @@ tasks:
|
||||
|
||||
outputs:
|
||||
- id: value
|
||||
value: "{{ outputs.return.value }}"
|
||||
value: "{{ outputs.return.value }}"
|
||||
type: STRING
|
||||
@@ -3,6 +3,7 @@ package io.kestra.webserver.controllers;
|
||||
import io.kestra.core.docs.*;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
@@ -77,7 +78,7 @@ public class PluginController {
|
||||
summary = "Get all types for an inputs"
|
||||
)
|
||||
public List<InputType> inputs() throws ClassNotFoundException {
|
||||
return Stream.of(Input.Type.values())
|
||||
return Stream.of(Type.values())
|
||||
.map(throwFunction(type -> new InputType(type.name(), type.cls().getName())))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
@@ -90,7 +91,7 @@ public class PluginController {
|
||||
description = "The schema will be output as [http://json-schema.org/draft-07/schema](Json Schema Draft 7)"
|
||||
)
|
||||
public MutableHttpResponse<DocumentationWithSchema> inputSchemas(
|
||||
@Parameter(description = "The schema needed") @PathVariable Input.Type type
|
||||
@Parameter(description = "The schema needed") @PathVariable Type type
|
||||
) throws ClassNotFoundException, IOException {
|
||||
ClassInputDocumentation classInputDocumentation = this.inputDocumentation(type);
|
||||
|
||||
@@ -107,7 +108,7 @@ public class PluginController {
|
||||
}
|
||||
|
||||
@Cacheable("default")
|
||||
protected ClassInputDocumentation inputDocumentation(Input.Type type) throws ClassNotFoundException {
|
||||
protected ClassInputDocumentation inputDocumentation(Type type) throws ClassNotFoundException {
|
||||
Class<? extends Input<?>> inputCls = type.cls();
|
||||
|
||||
return ClassInputDocumentation.of(jsonSchemaGenerator, inputCls);
|
||||
|
||||
@@ -5,7 +5,7 @@ import io.kestra.core.Helpers;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.hierarchies.FlowGraph;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
@@ -743,7 +743,7 @@ class FlowControllerTest extends JdbcH2ControllerTest {
|
||||
return Flow.builder()
|
||||
.id(friendlyId)
|
||||
.namespace(namespace)
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Input.Type.STRING).id(inputName).build()))
|
||||
.inputs(ImmutableList.of(StringInput.builder().type(Type.STRING).id(inputName).build()))
|
||||
.tasks(Collections.singletonList(generateTask("test", "test")))
|
||||
.build();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user