Compare commits

..

1 Commits

Author SHA1 Message Date
ben8t
0a121c6979 doc(task): improve TemplatedTask example 2025-12-19 14:06:42 +01:00
140 changed files with 1409 additions and 2388 deletions

View File

@@ -51,7 +51,7 @@ updates:
storybook:
applies-to: version-updates
patterns: ["storybook*", "@storybook/*", "eslint-plugin-storybook"]
patterns: ["storybook*", "@storybook/*"]
vitest:
applies-to: version-updates
@@ -67,10 +67,10 @@ updates:
"@types/*",
"storybook*",
"@storybook/*",
"eslint-plugin-storybook",
"vitest",
"@vitest/*",
# Temporary exclusion of these packages from major updates
"eslint-plugin-storybook",
"eslint-plugin-vue",
]
@@ -84,7 +84,6 @@ updates:
"@types/*",
"storybook*",
"@storybook/*",
"eslint-plugin-storybook",
"vitest",
"@vitest/*",
# Temporary exclusion of these packages from minor updates
@@ -103,7 +102,6 @@ updates:
"@types/*",
"storybook*",
"@storybook/*",
"eslint-plugin-storybook",
"vitest",
"@vitest/*",
]

View File

@@ -29,8 +29,8 @@ start_time2=$(date +%s)
echo "cd ./ui"
cd ./ui
echo "npm ci"
npm ci
echo "npm i"
npm i
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.2.1.6560"
id "org.sonarqube" version "7.2.0.6526"
id 'jacoco-report-aggregation'
// helper
@@ -331,7 +331,7 @@ subprojects {
}
dependencies {
agent "org.aspectj:aspectjweaver:1.9.25.1"
agent "org.aspectj:aspectjweaver:1.9.25"
}
test {

View File

@@ -82,8 +82,8 @@ dependencies {
testImplementation "io.micronaut:micronaut-http-server-netty"
testImplementation "io.micronaut:micronaut-management"
testImplementation "org.testcontainers:testcontainers:1.21.4"
testImplementation "org.testcontainers:junit-jupiter:1.21.4"
testImplementation "org.testcontainers:testcontainers:1.21.3"
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
testImplementation "org.bouncycastle:bcpkix-jdk18on"
testImplementation "org.wiremock:wiremock-jetty12"

View File

@@ -3,7 +3,6 @@ package io.kestra.core.docs;
import io.kestra.core.models.annotations.PluginSubGroup;
import io.kestra.core.plugins.RegisteredPlugin;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -118,17 +117,10 @@ public class Plugin {
.filter(not(io.kestra.core.models.Plugin::isInternal))
.filter(clazzFilter)
.filter(c -> !c.getName().startsWith("org.kestra."))
.map(c -> {
Schema schema = c.getAnnotation(Schema.class);
var title = Optional.ofNullable(schema).map(Schema::title).filter(t -> !t.isEmpty()).orElse(null);
var description = Optional.ofNullable(schema).map(Schema::description).filter(d -> !d.isEmpty()).orElse(null);
var deprecated = io.kestra.core.models.Plugin.isDeprecated(c) ? true : null;
return new PluginElementMetadata(c.getName(), deprecated, title, description);
})
.map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null))
.toList();
}
public record PluginElementMetadata(String cls, Boolean deprecated, String title, String description) {}
public record PluginElementMetadata(String cls, Boolean deprecated) {
}
}

View File

@@ -26,7 +26,6 @@ public record Label(
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
public static final String TEST = SYSTEM_PREFIX + "test";
public static final String FROM = SYSTEM_PREFIX + "from";
/**
* Static helper method for converting a list of labels to a nested map.

View File

@@ -16,7 +16,6 @@ import jakarta.validation.constraints.NotNull;
public class Setting {
public static final String INSTANCE_UUID = "instance.uuid";
public static final String INSTANCE_VERSION = "instance.version";
public static final String INSTANCE_EDITION = "instance.edition";
@NotNull
private String key;

View File

@@ -1,5 +1,6 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -129,7 +130,7 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
@PluginProperty
List<SLA> sla;
@Schema(
title = "Conditions evaluated before the flow is executed.",
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
@@ -354,7 +355,7 @@ public class Flow extends AbstractFlow implements HasUID {
* To be conservative a flow MUST not return any source.
*/
@Override
@Schema(hidden = true)
@JsonIgnore
public String getSource() {
return null;
}

View File

@@ -1,12 +1,14 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.micronaut.core.annotation.Introspected;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.Objects;
import java.util.regex.Pattern;
@SuperBuilder(toBuilder = true)
@Getter
@@ -46,7 +48,7 @@ public class FlowWithSource extends Flow {
}
@Override
@Schema(hidden = false)
@JsonIgnore(value = false)
public String getSource() {
return this.source;
}

View File

@@ -82,12 +82,6 @@ abstract public class AbstractTrigger implements TriggerInterface {
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
private boolean failOnTriggerError = false;
@PluginProperty(group = PluginProperty.CORE_GROUP)
@Schema(
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
)
private boolean allowConcurrent = false;
/**
* For backward compatibility: we rename minLogLevel to logLevel.
* @deprecated use {@link #logLevel} instead

View File

@@ -1,37 +1,22 @@
package io.kestra.core.models.triggers;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import java.time.ZonedDateTime;
import java.util.Map;
public interface Schedulable extends PollingTriggerInterface{
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
@Schema(
title = "The inputs to pass to the scheduled flow"
)
@PluginProperty(dynamic = true)
Map<String, Object> getInputs();
@Schema(
title = "Action to take in the case of missed schedules",
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
"The default is `ALL` unless a different value is configured using the global plugin configuration."
)
@PluginProperty
RecoverMissedSchedules getRecoverMissedSchedules();
/**
* Compute the previous evaluation of a trigger.
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
*/
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
RecoverMissedSchedules getRecoverMissedSchedules();
/**
* Load the default RecoverMissedSchedules from plugin property, or else ALL.
*/

View File

@@ -172,7 +172,7 @@ public class Trigger extends TriggerContext implements HasUID {
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
try {
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, lastTrigger);
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.empty());
} catch (InvalidTriggerConfigurationException e) {
disabled = true;
}

View File

@@ -6,9 +6,12 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import java.time.ZonedDateTime;
import java.util.*;
public abstract class TriggerService {
@@ -48,6 +51,58 @@ public abstract class TriggerService {
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
}
public static Execution generateScheduledExecution(
AbstractTrigger trigger,
ConditionContext conditionContext,
TriggerContext context,
List<Label> labels,
Map<String, Object> inputs,
Map<String, Object> variables,
Optional<ZonedDateTime> scheduleDate
) {
RunContext runContext = conditionContext.getRunContext();
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
}
Execution execution = Execution.builder()
.id(runContext.getTriggerExecutionId())
.tenantId(context.getTenantId())
.namespace(context.getNamespace())
.flowId(context.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.variables(conditionContext.getFlow().getVariables())
.labels(executionLabels)
.state(new State())
.trigger(executionTrigger)
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
.build();
Map<String, Object> allInputs = new HashMap<>();
// add flow inputs with default value
var flow = conditionContext.getFlow();
if (flow.getInputs() != null) {
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null)
.forEach(input -> allInputs.put(input.getId(), input.getDefaults()));
}
if (inputs != null) {
allInputs.putAll(inputs);
}
// add inputs and inject defaults
if (!allInputs.isEmpty()) {
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
}
return execution;
}
private static Execution generateExecution(
String id,
AbstractTrigger trigger,
@@ -56,7 +111,6 @@ public abstract class TriggerService {
ConditionContext conditionContext
) {
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
executionLabels.add(new Label(Label.FROM, "trigger"));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, id));

View File

@@ -1,10 +1,10 @@
package io.kestra.core.repositories;
import io.kestra.core.models.Setting;
import jakarta.validation.ConstraintViolationException;
import java.util.List;
import java.util.Optional;
import jakarta.validation.ConstraintViolationException;
public interface SettingRepositoryInterface {
Optional<Setting> findByKey(String key);
@@ -13,7 +13,5 @@ public interface SettingRepositoryInterface {
Setting save(Setting setting) throws ConstraintViolationException;
Setting internalSave(Setting setting) throws ConstraintViolationException;
Setting delete(Setting setting);
}

View File

@@ -16,8 +16,8 @@ import java.util.function.Function;
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
Optional<Trigger> findLast(TriggerContext trigger);
Optional<Trigger> findByUid(String uid);
Optional<Trigger> findByExecution(Execution execution);
List<Trigger> findAll(String tenantId);
List<Trigger> findAllForAllTenants();

View File

@@ -6,12 +6,10 @@ import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageInterface;
@@ -237,14 +235,6 @@ public class DefaultRunContext extends RunContext {
return runContext;
}
@Override
public RunContext cloneForPlugin(Plugin plugin) {
PluginConfigurations pluginConfigurations = applicationContext.getBean(PluginConfigurations.class);
DefaultRunContext runContext = clone();
runContext.pluginConfiguration = pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass());
return runContext;
}
/**
* {@inheritDoc}
*/
@@ -599,11 +589,6 @@ public class DefaultRunContext extends RunContext {
return localPath;
}
@Override
public InputAndOutput inputAndOutput() {
return new InputAndOutputImpl(this.applicationContext, this);
}
/**
* Builder class for constructing new {@link DefaultRunContext} objects.
*/

View File

@@ -189,11 +189,12 @@ public final class ExecutableUtils {
variables.put("taskRunIteration", currentTaskRun.getIteration());
}
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
Execution execution = Execution
.newExecution(
flow,
(f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs),
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
newLabels,
Optional.empty())
.withTrigger(ExecutionTrigger.builder()

View File

@@ -3,11 +3,13 @@ package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.FileInput;
@@ -537,6 +539,30 @@ public class FlowInputOutput {
}
}
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
/**
* Mutable wrapper to hold a flow's input, and it's resolved value.
*/

View File

@@ -1,29 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Output;
import java.util.List;
import java.util.Map;
/**
* InputAndOutput could be used to work with flow execution inputs and outputs.
*/
public interface InputAndOutput {
/**
* Reads the inputs of a flow execution.
*/
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
/**
* Processes the outputs of a flow execution (parse them based on their types).
*/
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
/**
* Render flow execution outputs.
*/
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
}

View File

@@ -1,56 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Output;
import io.micronaut.context.ApplicationContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class InputAndOutputImpl implements InputAndOutput {
private final FlowInputOutput flowInputOutput;
private final RunContext runContext;
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
this.runContext = runContext;
}
@Override
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
}
@Override
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
}
@Override
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
}

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
@@ -205,15 +204,4 @@ public abstract class RunContext implements PropertyContext {
* when Namespace ACLs are used (EE).
*/
public abstract AclChecker acl();
/**
* Clone this run context for a specific plugin.
* @return a new run context with the plugin configuration of the given plugin.
*/
public abstract RunContext cloneForPlugin(Plugin plugin);
/**
* @return an InputAndOutput that can be used to work with inputs and outputs.
*/
public abstract InputAndOutput inputAndOutput();
}

View File

@@ -1,8 +1,10 @@
package io.kestra.core.runners;
import com.google.common.collect.Lists;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.plugins.PluginConfigurations;
@@ -51,6 +53,20 @@ public class RunContextInitializer {
@Value("${kestra.encryption.secret-key}")
protected Optional<String> secretKey;
/**
* Initializes the given {@link RunContext} for the given {@link Plugin}.
*
* @param runContext The {@link RunContext} to initialize.
* @param plugin The {@link TaskRunner} used for initialization.
* @return The {@link RunContext} to initialize
*/
public DefaultRunContext forPlugin(final DefaultRunContext runContext,
final Plugin plugin) {
runContext.init(applicationContext);
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass()));
return runContext;
}
/**
* Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor.
*

View File

@@ -55,11 +55,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) {
if (logEntry.getTaskId() != null) {
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTaskId();
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTaskId();
} else if (logEntry.getTriggerId() != null) {
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTriggerId();
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTriggerId();
} else {
this.loggerName = baseLoggerName(logEntry);
this.loggerName = "flow." + logEntry.getFlowId();
}
this.logQueue = logQueue;
@@ -68,10 +68,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
this.logToFile = logToFile;
}
private String baseLoggerName(LogEntry logEntry) {
return "flow." + logEntry.getTenantId() + "." + logEntry.getNamespace() + "." + logEntry.getFlowId();
}
private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) {
Iterable<String> split;

View File

@@ -81,24 +81,7 @@ public final class YamlParser {
throw toConstraintViolationException(input, resource, e);
}
}
private static String formatYamlErrorMessage(String originalMessage, JsonProcessingException e) {
StringBuilder friendlyMessage = new StringBuilder();
if (originalMessage.contains("Expected a field name")) {
friendlyMessage.append("YAML syntax error: Invalid structure. Check indentation and ensure all fields are properly formatted.");
} else if (originalMessage.contains("MappingStartEvent")) {
friendlyMessage.append("YAML syntax error: Unexpected mapping start. Verify that scalar values are properly quoted if needed.");
} else if (originalMessage.contains("Scalar value")) {
friendlyMessage.append("YAML syntax error: Expected a simple value but found complex structure. Check for unquoted special characters.");
} else {
friendlyMessage.append("YAML parsing error: ").append(originalMessage.replaceAll("org\\.yaml\\.snakeyaml.*", "").trim());
}
if (e.getLocation() != null) {
int line = e.getLocation().getLineNr();
friendlyMessage.append(String.format(" (at line %d)", line));
}
// Return a generic but cleaner message for other YAML errors
return friendlyMessage.toString();
}
@SuppressWarnings("unchecked")
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
@@ -138,12 +121,11 @@ public final class YamlParser {
)
));
} else {
String userFriendlyMessage = formatYamlErrorMessage(e.getMessage(), e);
return new ConstraintViolationException(
"Illegal " + resource + " source: " + userFriendlyMessage,
"Illegal " + resource + " source: " + e.getMessage(),
Collections.singleton(
ManualConstraintViolation.of(
userFriendlyMessage,
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(),
target,
(Class<T>) target.getClass(),
"yaml",
@@ -154,3 +136,4 @@ public final class YamlParser {
}
}
}

View File

@@ -4,6 +4,7 @@ import com.cronutils.utils.VisibleForTesting;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
@@ -64,6 +65,16 @@ public class ConditionService {
return this.valid(flow, conditions, conditionContext);
}
/**
* Check that all conditions are valid.
* Warning, this method throws if a condition cannot be evaluated.
*/
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
return conditions
.stream()
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
}
/**
* Check that all conditions are valid.
* Warning, this method throws if a condition cannot be evaluated.

View File

@@ -92,14 +92,7 @@ public class FlowService {
return flowRepository
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
}
private static String formatValidationError(String message) {
if (message.startsWith("Illegal flow source:")) {
// Already formatted by YamlParser, return as-is
return message;
}
// For other validation errors, provide context
return "Validation error: " + message;
}
/**
* Evaluates all checks defined in the given flow using the provided inputs.
* <p>
@@ -181,12 +174,10 @@ public class FlowService {
modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false));
} catch (ConstraintViolationException e) {
String friendlyMessage = formatValidationError(e.getMessage());
validateConstraintViolationBuilder.constraints(friendlyMessage);
validateConstraintViolationBuilder.constraints(e.getMessage());
} catch (FlowProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException cve) {
String friendlyMessage = formatValidationError(cve.getMessage());
validateConstraintViolationBuilder.constraints(friendlyMessage);
if (e.getCause() instanceof ConstraintViolationException) {
validateConstraintViolationBuilder.constraints(e.getMessage());
} else {
Throwable cause = e.getCause() != null ? e.getCause() : e;
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
@@ -588,4 +579,4 @@ public class FlowService {
private IllegalStateException noRepositoryException() {
return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set.");
}
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.storages;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.services.NamespaceService;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
@@ -271,13 +272,7 @@ public class InternalStorage implements Storage {
return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream));
}
@Override
public Optional<StorageContext.Task> getTaskStorageContext() {
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
}
@Override
public List<FileAttributes> list(URI uri) throws IOException {
return this.storage.list(context.getTenantId(), context.getNamespace(), uri);
}
}

View File

@@ -173,6 +173,4 @@ public interface Storage {
* @return the task storage context
*/
Optional<StorageContext.Task> getTaskStorageContext();
List<FileAttributes> list(URI uri) throws IOException;
}

View File

@@ -1,39 +1,13 @@
package io.kestra.core.utils;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Optional;
@Singleton
public class EditionProvider {
public Edition get() {
return Edition.OSS;
}
@Inject
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
@PostConstruct
void start() {
// check the edition in the settings and update if needed, we didn't use it would allow us to detect incompatible update later if needed
settingRepository.ifPresent(settingRepositoryInterface -> persistEdition(settingRepositoryInterface, get()));
}
private void persistEdition(SettingRepositoryInterface settingRepositoryInterface, Edition edition) {
Optional<Setting> versionSetting = settingRepositoryInterface.findByKey(Setting.INSTANCE_EDITION);
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(edition)) {
settingRepositoryInterface.save(Setting.builder()
.key(Setting.INSTANCE_EDITION)
.value(edition)
.build()
);
}
}
public enum Edition {
OSS,
EE

View File

@@ -11,11 +11,6 @@ import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
/**
* Utility class to create {@link java.util.concurrent.ExecutorService} with {@link java.util.concurrent.ExecutorService} instances.
* WARNING: those instances will use the {@link ThreadUncaughtExceptionHandler} which terminates Kestra if an error occurs in any thread,
* so it should not be used inside plugins.
*/
@Singleton
@Slf4j
public class ExecutorsUtils {

View File

@@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
/**
* Utility class for server logging
* Utility class for logging
*/
public final class Logs {
@@ -18,7 +18,7 @@ public final class Logs {
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
private Logs() {}
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
@@ -29,7 +29,7 @@ public final class Logs {
}
/**
* Log an {@link Execution} via the executor logger named: 'executor.{tenantId}.{namespace}.{flowId}'.
* Log an {@link Execution} via the execution logger named: 'execution.{flowId}'.
*/
public static void logExecution(Execution execution, Level level, String message, Object... args) {
Logger logger = logger(execution);
@@ -43,7 +43,7 @@ public final class Logs {
}
/**
* Log a {@link TriggerContext} via the scheduler logger named: 'trigger.{tenantId}.{namespace}.{flowId}.{triggerId}'.
* Log a {@link TriggerContext} via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
*/
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
Logger logger = logger(triggerContext);
@@ -57,7 +57,7 @@ public final class Logs {
}
/**
* Log a {@link TaskRun} via the worker logger named: 'worker.{tenantId}.{namespace}.{flowId}.{taskId}'.
* Log a {@link TaskRun} via the taskRun logger named: 'task.{flowId}.{taskId}'.
*/
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
String prefix = TASKRUN_PREFIX_WITH_TENANT;
@@ -73,19 +73,19 @@ public final class Logs {
private static Logger logger(TaskRun taskRun) {
return LoggerFactory.getLogger(
"worker." + taskRun.getTenantId() + "." + taskRun.getNamespace() + "." + taskRun.getFlowId() + "." + taskRun.getTaskId()
"task." + taskRun.getFlowId() + "." + taskRun.getTaskId()
);
}
private static Logger logger(TriggerContext triggerContext) {
return LoggerFactory.getLogger(
"scheduler." + triggerContext.getTenantId() + "." + triggerContext.getNamespace() + "." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
"trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
);
}
private static Logger logger(Execution execution) {
return LoggerFactory.getLogger(
"executor." + execution.getTenantId() + "." + execution.getNamespace() + "." + execution.getFlowId()
"execution." + execution.getFlowId()
);
}
}

View File

@@ -120,10 +120,7 @@ public class MapUtils {
private static Collection<?> mergeCollections(Collection<?> colA, Collection<?> colB) {
List<Object> merged = new ArrayList<>(colA.size() + colB.size());
merged.addAll(colA);
if (!colB.isEmpty()) {
List<?> filtered = colB.stream().filter(it -> !colA.contains(it)).toList();
merged.addAll(filtered);
}
merged.addAll(colB);
return merged;
}

View File

@@ -1,12 +1,14 @@
package io.kestra.core.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.tasks.FileExistComportment;
import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.runners.RunContext;
import io.kestra.core.storages.NamespaceFile;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.lang3.time.StopWatch;
@@ -17,27 +19,28 @@ import java.io.InputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.*;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static io.kestra.core.utils.Rethrow.throwConsumer;
public final class NamespaceFilesUtils {
private static final int maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
0,
maxThreads,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("namespace-files").build()
);;
@Singleton
public class NamespaceFilesUtils {
@Inject
private ExecutorsUtils executorsUtils;
private NamespaceFilesUtils() {
// utility class pattern
private ExecutorService executorService;
private int maxThreads;
@PostConstruct
public void postConstruct() {
this.maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
this.executorService = executorsUtils.maxCachedThreadPool(maxThreads, "namespace-file");
}
public static void loadNamespaceFiles(
public void loadNamespaceFiles(
RunContext runContext,
NamespaceFiles namespaceFiles
)
@@ -66,7 +69,7 @@ public final class NamespaceFilesUtils {
int parallelism = maxThreads / 2;
Flux.fromIterable(matchedNamespaceFiles)
.parallel(parallelism)
.runOn(Schedulers.fromExecutorService(EXECUTOR_SERVICE))
.runOn(Schedulers.fromExecutorService(executorService))
.doOnNext(throwConsumer(nsFile -> {
InputStream content = runContext.storage().getFile(nsFile.uri());
Path path = folderPerNamespace ?

View File

@@ -23,6 +23,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.StorageService;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageSplitInterface;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.validations.NoSystemLabelValidation;
@@ -539,7 +540,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
FileSerde.write(bos, runContext.inputAndOutput().renderOutputs(flow.getOutputs()));
FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext));
URI uri = runContext.storage().putFile(
new ByteArrayInputStream(bos.toByteArray()),
URI.create((String) taskRun.getOutputs().get("uri"))
@@ -601,8 +602,9 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
String subflowOutputsBase = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);
URI subflowOutputsBaseUri = URI.create(StorageContext.KESTRA_PROTOCOL + subflowOutputsBase + "/");
if (runContext.storage().isFileExist(subflowOutputsBaseUri)) {
List<FileAttributes> list = runContext.storage().list(subflowOutputsBaseUri);;
StorageInterface storage = ((DefaultRunContext) runContext).getApplicationContext().getBean(StorageInterface.class);
if (storage.exists(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri)) {
List<FileAttributes> list = storage.list(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri);
if (!list.isEmpty()) {
// Merge outputs from each sub-flow into a single stored in the internal storage.

View File

@@ -63,8 +63,7 @@ import java.util.*;
- id: run_post_approval
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
runner: PROCESS
commands:
- echo "Manual approval received! Continuing the execution..."

View File

@@ -18,6 +18,7 @@ import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecution;
@@ -37,6 +38,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.slf4j.event.Level;
import java.time.ZonedDateTime;
import java.util.Collections;
@@ -244,11 +246,11 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
try {
var inputAndOutput = runContext.inputAndOutput();
Map<String, Object> rOutputs = inputAndOutput.renderOutputs(subflowOutputs);
Map<String, Object> rOutputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext);
if (flow.getOutputs() != null) {
rOutputs = inputAndOutput.typedOutputs(flow, execution, rOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
rOutputs = flowInputOutput.typedOutputs(flow, execution, rOutputs);
}
builder.outputs(rOutputs);
} catch (Exception e) {

View File

@@ -260,7 +260,8 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
}
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class);
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
}
if (this.inputFiles != null) {

View File

@@ -26,28 +26,25 @@ import java.util.concurrent.atomic.AtomicLong;
@Getter
@NoArgsConstructor
@Schema(
title = "Purge namespace files for one or multiple namespaces.",
description = "This task purges namespace files (and their versions) stored in Kestra. You can restrict the purge to specific namespaces (or a namespace glob pattern), optionally include child namespaces, and filter files by a glob pattern. The purge strategy is controlled via `behavior` (e.g. keep the last N versions and/or delete versions older than a given date)."
title = "Delete expired keys globally for a specific namespace.",
description = "This task will delete expired keys from the Kestra KV store. By default, it will only delete expired keys, but you can choose to delete all keys by setting `expiredOnly` to false. You can also filter keys by a specific pattern and choose to include child namespaces."
)
@Plugin(
examples = {
@Example(
title = "Purge old versions of namespace files for a namespace tree.",
title = "Delete expired keys globally for a specific namespace, with or without including child namespaces.",
full = true,
code = """
id: purge_namespace_files
id: purge_kv_store
namespace: system
tasks:
- id: purge_files
type: io.kestra.plugin.core.namespace.PurgeFiles
- id: purge_kv
type: io.kestra.plugin.core.kv.PurgeKV
expiredOnly: true
namespaces:
- company
includeChildNamespaces: true
filePattern: "**/*.sql"
behavior:
type: version
before: "2025-01-01T00:00:00Z"
"""
)
}
@@ -119,7 +116,7 @@ public class PurgeFiles extends Task implements PurgeTask<NamespaceFile>, Runnab
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The number of purged namespace file versions"
title = "The number of purged KV pairs"
)
private Long size;
}

View File

@@ -32,9 +32,17 @@ import lombok.experimental.SuperBuilder;
examples = {
@Example(
code = """
spec: |
type: io.kestra.plugin.core.http.Download
{{ task.property }}: {{ task.value }}
id: templated_task
namespace: company.team
variables:
property: uri
value: https://kestra.io
tasks:
- id: templated_task
type: io.kestra.plugin.core.templating.TemplatedTask
spec: |
type: io.kestra.plugin.core.http.Download
{{ vars.property }}: {{ vars.value }}
"""
)
},

View File

@@ -1,107 +0,0 @@
package io.kestra.plugin.core.trigger;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Backfill;
import io.kestra.core.models.triggers.Schedulable;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.LabelService;
import io.kestra.core.utils.ListUtils;
import java.time.ZonedDateTime;
import java.time.chrono.ChronoZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Factory class for constructing a new {@link Execution} from a {@link Schedulable} trigger.
*
* @see io.kestra.plugin.core.trigger.Schedule
* @see io.kestra.plugin.core.trigger.ScheduleOnDates
*/
final class SchedulableExecutionFactory {
static Execution createFailedExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext) throws IllegalVariableEvaluationException {
return Execution.builder()
.id(conditionContext.getRunContext().getTriggerExecutionId())
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.labels(SchedulableExecutionFactory.getLabels(trigger, conditionContext.getRunContext(), triggerContext.getBackfill(), conditionContext.getFlow()))
.state(new State().withState(State.Type.FAILED))
.build();
}
static Execution createExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext, Map<String, Object> variables, ZonedDateTime scheduleDate) throws IllegalVariableEvaluationException {
RunContext runContext = conditionContext.getRunContext();
ExecutionTrigger executionTrigger = ExecutionTrigger.of((AbstractTrigger) trigger, variables);
List<Label> labels = getLabels(trigger, runContext, triggerContext.getBackfill(), conditionContext.getFlow());
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
executionLabels.add(new Label(Label.FROM, "trigger"));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
}
Execution execution = Execution.builder()
.id(runContext.getTriggerExecutionId())
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.variables(conditionContext.getFlow().getVariables())
.labels(executionLabels)
.state(new State())
.trigger(executionTrigger)
.scheduleDate(Optional.ofNullable(scheduleDate).map(ChronoZonedDateTime::toInstant).orElse(null))
.build();
Map<String, Object> allInputs = getInputs(trigger, runContext, triggerContext.getBackfill());
// add inputs and inject defaults (FlowInputOutput handles defaults internally)
execution = execution.withInputs(runContext.inputAndOutput().readInputs(conditionContext.getFlow(), execution, allInputs));
return execution;
}
private static Map<String, Object> getInputs(Schedulable trigger, RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
Map<String, Object> inputs = new HashMap<>();
if (trigger.getInputs() != null) {
inputs.putAll(runContext.render(trigger.getInputs()));
}
if (backfill != null && backfill.getInputs() != null) {
inputs.putAll(runContext.render(backfill.getInputs()));
}
return inputs;
}
private static List<Label> getLabels(Schedulable trigger, RunContext runContext, Backfill backfill, FlowInterface flow) throws IllegalVariableEvaluationException {
List<Label> labels = LabelService.fromTrigger(runContext, flow, (AbstractTrigger) trigger);
if (backfill != null && backfill.getLabels() != null) {
for (Label label : backfill.getLabels()) {
final var value = runContext.render(label.value());
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
return labels;
}
}

View File

@@ -6,7 +6,9 @@ import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
@@ -14,8 +16,12 @@ import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.*;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.LabelService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.ScheduleValidation;
import io.kestra.core.validations.TimezoneId;
@@ -23,7 +29,6 @@ import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Null;
import lombok.AccessLevel;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -35,8 +40,6 @@ import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwPredicate;
@Slf4j
@SuperBuilder
@ToString
@@ -221,7 +224,11 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
@PluginProperty
@Deprecated
private List<ScheduleCondition> scheduleConditions;
@Schema(
title = "The inputs to pass to the scheduled flow"
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs;
@Schema(
@@ -241,7 +248,13 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
@PluginProperty
@Deprecated
private Map<String, Object> backfill;
@Schema(
title = "Action to take in the case of missed schedules",
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
"The default is `ALL` unless a different value is configured using the global plugin configuration."
)
@PluginProperty
private RecoverMissedSchedules recoverMissedSchedules;
@Override
@@ -390,11 +403,20 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
if (!conditionResults) {
return Optional.empty();
}
} catch (InternalException ie) {
} catch(InternalException ie) {
// validate schedule condition can fail to render variables
// in this case, we return a failed execution so the trigger is not evaluated each second
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
return Optional.of(SchedulableExecutionFactory.createFailedExecution(this, conditionContext, triggerContext));
Execution execution = Execution.builder()
.id(runContext.getTriggerExecutionId())
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.labels(generateLabels(runContext, conditionContext, backfill))
.state(new State().withState(State.Type.FAILED))
.build();
return Optional.of(execution);
}
// recalculate true output for previous and next based on conditions
@@ -408,12 +430,14 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
variables = scheduleDates.toMap();
}
Execution execution = SchedulableExecutionFactory.createExecution(
Execution execution = TriggerService.generateScheduledExecution(
this,
conditionContext,
triggerContext,
generateLabels(runContext, conditionContext, backfill),
generateInputs(runContext, backfill),
variables,
null
Optional.empty()
);
return Optional.of(execution);
@@ -424,6 +448,34 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
return parser.parse(this.cron);
}
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
List<Label> labels = LabelService.fromTrigger(runContext, conditionContext.getFlow(), this);
if (backfill != null && backfill.getLabels() != null) {
for (Label label : backfill.getLabels()) {
final var value = runContext.render(label.value());
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
return labels;
}
private Map<String, Object> generateInputs(RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
Map<String, Object> inputs = new HashMap<>();
if (this.inputs != null) {
inputs.putAll(runContext.render(this.inputs));
}
if (backfill != null && backfill.getInputs() != null) {
inputs.putAll(runContext.render(backfill.getInputs()));
}
return inputs;
}
private Optional<Output> scheduleDates(ExecutionTime executionTime, ZonedDateTime date) {
Optional<ZonedDateTime> next = executionTime.nextExecution(date.minus(Duration.ofSeconds(1)));
@@ -497,9 +549,9 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, ZonedDateTime toTestDate, boolean next) throws InternalException {
int upperYearBound = ZonedDateTime.now().getYear() + 10;
int lowerYearBound = ZonedDateTime.now().getYear() - 10;
while ((next && toTestDate.getYear() < upperYearBound) || (!next && toTestDate.getYear() > lowerYearBound)) {
Optional<ZonedDateTime> currentDate = next ?
executionTime.nextExecution(toTestDate) :
executionTime.lastExecution(toTestDate);
@@ -555,10 +607,11 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
private boolean validateScheduleCondition(ConditionContext conditionContext) throws InternalException {
if (conditions != null) {
return conditions.stream()
.filter(c -> c instanceof ScheduleCondition)
.map(c -> (ScheduleCondition) c)
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
ConditionService conditionService = ((DefaultRunContext)conditionContext.getRunContext()).getApplicationContext().getBean(ConditionService.class);
return conditionService.isValid(
conditions.stream().filter(c -> c instanceof ScheduleCondition).map(c -> (ScheduleCondition) c).toList(),
conditionContext
);
}
return true;

View File

@@ -10,6 +10,7 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.models.triggers.*;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.LabelService;
import io.kestra.core.validations.TimezoneId;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
@@ -22,10 +23,7 @@ import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.function.Predicate;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -47,7 +45,11 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
@Builder.Default
@Null
private final Duration interval = null;
@Schema(
title = "The inputs to pass to the scheduled flow"
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs;
@TimezoneId
@@ -61,24 +63,31 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
@NotNull
private Property<List<ZonedDateTime>> dates;
@Schema(
title = "Action to take in the case of missed schedules",
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
"The default is `ALL` unless a different value is configured using the global plugin configuration."
)
@PluginProperty
private RecoverMissedSchedules recoverMissedSchedules;
@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext triggerContext) throws Exception {
RunContext runContext = conditionContext.getRunContext();
ZonedDateTime lastEvaluation = triggerContext.getDate();
Optional<ZonedDateTime> nextDate = nextDate(runContext, date -> date.isEqual(lastEvaluation) || date.isAfter(lastEvaluation));
if (nextDate.isPresent()) {
log.info("Schedule execution on {}", nextDate.get());
Execution execution = SchedulableExecutionFactory.createExecution(
Execution execution = TriggerService.generateScheduledExecution(
this,
conditionContext,
triggerContext,
LabelService.fromTrigger(runContext, conditionContext.getFlow(), this),
this.inputs != null ? runContext.render(this.inputs) : Collections.emptyMap(),
Collections.emptyMap(),
nextDate.orElse(null)
nextDate
);
return Optional.of(execution);
@@ -88,21 +97,29 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
}
@Override
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> triggerContext) {
return triggerContext
.map(ctx -> ctx.getBackfill() != null ? ctx.getBackfill().getCurrentDate() : ctx.getDate())
.map(this::withTimeZone)
.or(() -> Optional.of(ZonedDateTime.now()))
.flatMap(dt -> {
try {
return nextDate(conditionContext.getRunContext(), date -> date.isAfter(dt));
} catch (IllegalVariableEvaluationException e) {
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
throw new InvalidTriggerConfigurationException("Failed to evaluate schedule 'dates'. Cause: " + e.getMessage());
}
}).orElseGet(() -> ZonedDateTime.now().plusYears(1));
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
try {
return last
.map(throwFunction(context ->
nextDate(conditionContext.getRunContext(), date -> date.isAfter(context.getDate()))
.orElse(ZonedDateTime.now().plusYears(1))
))
.orElse(conditionContext.getRunContext()
.render(dates)
.asList(ZonedDateTime.class)
.stream()
.sorted()
.findFirst()
.orElse(ZonedDateTime.now()))
.truncatedTo(ChronoUnit.SECONDS);
} catch (IllegalVariableEvaluationException e) {
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
return ZonedDateTime.now().plusYears(1);
}
}
@Override
public ZonedDateTime nextEvaluationDate() {
// TODO this may be the next date from now?
@@ -122,17 +139,9 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
return previousDates.isEmpty() ? ZonedDateTime.now() : previousDates.getFirst();
}
private ZonedDateTime withTimeZone(ZonedDateTime date) {
if (this.timezone == null) {
return date;
}
return date.withZoneSameInstant(ZoneId.of(this.timezone));
}
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> predicate) throws IllegalVariableEvaluationException {
return runContext.render(dates)
.asList(ZonedDateTime.class).stream().sorted()
.filter(predicate)
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> filter) throws IllegalVariableEvaluationException {
return runContext.render(dates).asList(ZonedDateTime.class).stream().sorted()
.filter(date -> filter.test(date))
.map(throwFunction(date -> timezone == null ? date : date.withZoneSameInstant(ZoneId.of(runContext.render(timezone)))))
.findFirst()
.map(date -> date.truncatedTo(ChronoUnit.SECONDS));

View File

@@ -9,14 +9,10 @@
<property name="pattern" value="%date{HH:mm:ss}.%ms %highlight(%-5.5level) %magenta(%-12.36thread) %cyan(%-12.36logger{36}) %msg%n" />
<logger name="io.kestra" level="INFO" />
<!-- Flow execution logs - disabled by default -->
<logger name="flow" level="OFF" />
<!-- Server loggers -->
<logger name="worker" level="INFO" />
<logger name="executor" level="INFO" />
<logger name="scheduler" level="INFO" />
<logger name="flow" level="INFO" />
<logger name="task" level="INFO" />
<logger name="execution" level="INFO" />
<logger name="trigger" level="INFO" />
<logger name="io.kestra.ee.runner.kafka.services.KafkaConsumerService" level="WARN" />
<logger name="io.kestra.ee.runner.kafka.services.KafkaProducerService" level="WARN" />

View File

@@ -170,11 +170,10 @@ class JsonSchemaGeneratorTest {
Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class);
assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf(
Matchers.aMapWithSize(4),
Matchers.aMapWithSize(3),
hasKey("conditions"),
hasKey("stopAfter"),
hasKey("type"),
hasKey("allowConcurrent")
hasKey("type")
));
});
}

View File

@@ -60,15 +60,6 @@ class SystemInformationReportTest {
return setting;
}
@Override
public Setting internalSave(Setting setting) throws ConstraintViolationException {
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
UUID = setting.getValue();
}
return setting;
}
@Override
public Setting delete(Setting setting) {
return setting;

View File

@@ -1,9 +1,9 @@
package io.kestra.core.repositories;
import com.devskiller.friendly_id.FriendlyId;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter;
@@ -24,6 +24,7 @@ import io.kestra.core.models.flows.State.Type;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.TestsUtils;
@@ -41,9 +42,10 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -183,7 +185,6 @@ public abstract class AbstractExecutionRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
@FlakyTest(description = "Filtering tests are sometimes returning 0")
void should_find_all(QueryFilter filter, int expectedSize){
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant, "executionTriggerId");

View File

@@ -10,83 +10,82 @@ import org.junit.jupiter.api.TestInstance;
@KestraTest(startRunner = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractRunnerConcurrencyTest {
public static final String TENANT_1 = "tenant1";
@Inject
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel.yml"}, tenantId = "concurrency-cancel")
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
void concurrencyCancel() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancel("concurrency-cancel");
flowConcurrencyCaseTest.flowConcurrencyCancel();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-fail.yml"}, tenantId = "concurrency-fail")
@LoadFlows({"flows/valids/flow-concurrency-fail.yml"})
void concurrencyFail() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyFail("concurrency-fail");
flowConcurrencyCaseTest.flowConcurrencyFail();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "concurrency-queue")
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
void concurrencyQueue() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueue("concurrency-queue");
flowConcurrencyCaseTest.flowConcurrencyQueue();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-pause.yml"}, tenantId = "concurrency-queue-pause")
@LoadFlows({"flows/valids/flow-concurrency-queue-pause.yml"})
protected void concurrencyQueuePause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueuePause("concurrency-queue-pause");
flowConcurrencyCaseTest.flowConcurrencyQueuePause();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel-pause.yml"}, tenantId = "concurrency-cancel-pause")
@LoadFlows({"flows/valids/flow-concurrency-cancel-pause.yml"})
protected void concurrencyCancelPause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancelPause("concurrency-cancel-pause");
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-with-for-each-item")
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem("flow-concurrency-with-for-each-item");
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-fail.yml"}, tenantId = "concurrency-queue-restarted")
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
protected void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted("concurrency-queue-restarted");
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-after-execution.yml"}, tenantId = "concurrency-queue-after-execution")
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
void concurrencyQueueAfterExecution() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution("concurrency-queue-after-execution");
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = "flow-concurrency-subflow")
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = TENANT_1)
void flowConcurrencySubflow() throws Exception {
flowConcurrencyCaseTest.flowConcurrencySubflow("flow-concurrency-subflow");
flowConcurrencyCaseTest.flowConcurrencySubflow(TENANT_1);
}
@Test
@FlakyTest(description = "Only flaky in CI")
@LoadFlows(
value = {"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"},
tenantId = "flow-concurrency-parallel-subflow-kill"
)
@LoadFlows({"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"})
protected void flowConcurrencyParallelSubflowKill() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill("flow-concurrency-parallel-subflow-kill");
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill();
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-killed")
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
void flowConcurrencyKilled() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyKilled("flow-concurrency-killed");
flowConcurrencyCaseTest.flowConcurrencyKilled();
}
@Test
@FlakyTest(description = "Only flaky in CI")
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-queue-killed")
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
void flowConcurrencyQueueKilled() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueKilled("flow-concurrency-queue-killed");
flowConcurrencyCaseTest.flowConcurrencyQueueKilled();
}
}

View File

@@ -31,6 +31,7 @@ import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@Singleton
@@ -56,12 +57,12 @@ public class FlowConcurrencyCaseTest {
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
public void flowConcurrencyCancel(String tenantId) throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
public void flowConcurrencyCancel() throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
try {
List<Execution> shouldFailExecutions = List.of(
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel"),
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel")
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel"),
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel")
);
assertThat(execution1.getState().isRunning()).isTrue();
@@ -72,12 +73,12 @@ public class FlowConcurrencyCaseTest {
}
}
public void flowConcurrencyFail(String tenantId) throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
public void flowConcurrencyFail() throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
try {
List<Execution> shouldFailExecutions = List.of(
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail"),
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail")
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail"),
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail")
);
assertThat(execution1.getState().isRunning()).isTrue();
@@ -88,10 +89,10 @@ public class FlowConcurrencyCaseTest {
}
}
public void flowConcurrencyQueue(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
public void flowConcurrencyQueue() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(tenantId, NAMESPACE, "flow-concurrency-queue", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
@@ -107,10 +108,10 @@ public class FlowConcurrencyCaseTest {
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyQueuePause(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-queue-pause");
public void flowConcurrencyQueuePause() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause");
Flow flow = flowRepository
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
@@ -126,10 +127,10 @@ public class FlowConcurrencyCaseTest {
assertThat(secondExecutionResult.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyCancelPause(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-cancel-pause");
public void flowConcurrencyCancelPause() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause");
Flow flow = flowRepository
.findById(tenantId, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.CANCELLED), execution2);
@@ -165,11 +166,11 @@ public class FlowConcurrencyCaseTest {
.toList()).contains(Type.QUEUED);
}
public void flowConcurrencyQueueRestarted(String tenantId) throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE,
public void flowConcurrencyQueueRestarted() throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE,
"flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
@@ -178,10 +179,7 @@ public class FlowConcurrencyCaseTest {
// we restart the first one, it should be queued then fail again.
Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1);
Execution restarted = executionService.restart(failedExecution, null);
Execution executionResult1 = runnerUtils.restartExecution(
e -> e.getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED) && e.getState().getCurrent().equals(Type.FAILED),
restarted
);
Execution executionResult1 = runnerUtils.restartExecution(e -> e.getState().getCurrent().equals(Type.FAILED), restarted);
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2);
assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED);
@@ -195,10 +193,10 @@ public class FlowConcurrencyCaseTest {
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyQueueAfterExecution(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
public void flowConcurrencyQueueAfterExecution() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
@@ -218,15 +216,15 @@ public class FlowConcurrencyCaseTest {
List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel");
assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED);
// run another execution to be sure that everything works (purge is correctly done)
// run another execution to be sure that everything work (purge is correctly done)
Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");
}
public void flowConcurrencyParallelSubflowKill(String tenantId) throws QueueException {
Execution parent = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
public void flowConcurrencyParallelSubflowKill() throws QueueException {
Execution parent = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
// Kill the parent
killQueue.emit(ExecutionKilledExecution
@@ -234,7 +232,7 @@ public class FlowConcurrencyCaseTest {
.state(ExecutionKilled.State.REQUESTED)
.executionId(parent.getId())
.isOnKillCascade(true)
.tenantId(tenantId)
.tenantId(MAIN_TENANT)
.build()
);
@@ -244,11 +242,11 @@ public class FlowConcurrencyCaseTest {
assertThat(terminated.getTaskRunList()).isNull();
}
public void flowConcurrencyKilled(String tenantId) throws QueueException, InterruptedException {
public void flowConcurrencyKilled() throws QueueException, InterruptedException {
Flow flow = flowRepository
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.orElseThrow();
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
@@ -263,7 +261,7 @@ public class FlowConcurrencyCaseTest {
.state(ExecutionKilled.State.REQUESTED)
.executionId(execution1.getId())
.isOnKillCascade(true)
.tenantId(tenantId)
.tenantId(MAIN_TENANT)
.build()
);
@@ -281,19 +279,20 @@ public class FlowConcurrencyCaseTest {
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
} finally {
// kill everything to avoid dangling executions
runnerUtils.killExecution(execution1);
runnerUtils.killExecution(execution2);
runnerUtils.killExecution(execution3);
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
}
}
public void flowConcurrencyQueueKilled(String tenantId) throws QueueException, InterruptedException {
public void flowConcurrencyQueueKilled() throws QueueException, InterruptedException {
Flow flow = flowRepository
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.orElseThrow();
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
@@ -308,7 +307,7 @@ public class FlowConcurrencyCaseTest {
.state(ExecutionKilled.State.REQUESTED)
.executionId(execution2.getId())
.isOnKillCascade(true)
.tenantId(tenantId)
.tenantId(MAIN_TENANT)
.build()
);
@@ -323,10 +322,11 @@ public class FlowConcurrencyCaseTest {
} finally {
// kill everything to avoid dangling executions
runnerUtils.killExecution(execution1);
runnerUtils.killExecution(execution2);
runnerUtils.killExecution(execution3);
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
}
}

View File

@@ -1,24 +1,15 @@
package io.kestra.core.utils;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest
@KestraTest
public class EditionProviderTest {
@Inject
private EditionProvider editionProvider;
@Inject
private SettingRepositoryInterface settingRepository;
protected EditionProvider.Edition expectedEdition() {
return EditionProvider.Edition.OSS;
}
@@ -26,10 +17,5 @@ public class EditionProviderTest {
@Test
void shouldReturnCurrentEdition() {
Assertions.assertEquals(expectedEdition(), editionProvider.get());
// check that the edition is persisted in settings
Optional<Setting> editionSettings = settingRepository.findByKey(Setting.INSTANCE_EDITION);
assertThat(editionSettings).isPresent();
assertThat(editionSettings.get().getValue()).isEqualTo(expectedEdition().name());
}
}

View File

@@ -1,107 +1,48 @@
package io.kestra.core.utils;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.TriggerContext;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
class LogsTest {
private static final InMemoryAppender MEMORY_APPENDER = new InMemoryAppender();
@BeforeAll
static void setupLogger() {
Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
MEMORY_APPENDER.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
MEMORY_APPENDER.start();
logger.addAppender(MEMORY_APPENDER);
}
@AfterEach
void clearLogs() {
MEMORY_APPENDER.clear();
}
@Test
void logFlow() {
var flow = Flow.builder().tenantId("tenant").namespace("namespace").id("flow").build();
var flow = Flow.builder().namespace("namespace").id("flow").build();
Logs.logExecution(flow, log, Level.INFO, "Some log");
Logs.logExecution(flow, log, Level.INFO, "Some log with an {}", "attribute");
Logs.logExecution(flow, log, Level.ERROR, "Some log with an {} and an error", "attribute", new RuntimeException("Test Exception"));
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
assertThat(logs).hasSize(3);
}
@Test
void logExecution() {
var execution = Execution.builder().tenantId("tenant").namespace("namespace").flowId("flow").id("execution").build();
var execution = Execution.builder().namespace("namespace").flowId("flow").id("execution").build();
Logs.logExecution(execution, log, Level.INFO, "Some log");
Logs.logExecution(execution, log, Level.INFO, "Some log with an {}", "attribute");
Logs.logExecution(execution, Level.INFO, "Some log");
Logs.logExecution(execution, Level.INFO, "Some log with an {}", "attribute");
Logs.logExecution(execution, Level.INFO, "Some log");
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
assertThat(logs).hasSize(3);
assertThat(logs.getFirst().getLoggerName()).isEqualTo("executor.tenant.namespace.flow");
}
@Test
void logTrigger() {
var trigger = TriggerContext.builder().tenantId("tenant").namespace("namespace").flowId("flow").triggerId("trigger").build();
var trigger = TriggerContext.builder().namespace("namespace").flowId("flow").triggerId("trigger").build();
Logs.logTrigger(trigger, log, Level.INFO, "Some log");
Logs.logTrigger(trigger, log, Level.INFO, "Some log with an {}", "attribute");
Logs.logTrigger(trigger, Level.INFO, "Some log");
Logs.logTrigger(trigger, Level.INFO, "Some log with an {}", "attribute");
Logs.logTrigger(trigger, Level.INFO, "Some log");
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
assertThat(logs).hasSize(3);
assertThat(logs.getFirst().getLoggerName()).isEqualTo("scheduler.tenant.namespace.flow.trigger");
}
@Test
void logTaskRun() {
var taskRun = TaskRun.builder().tenantId("tenant").namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
var taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").value("value").build();
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
assertThat(logs).hasSize(4);
assertThat(logs.getFirst().getLoggerName()).isEqualTo("worker.tenant.namespace.flow.task");
}
private static class InMemoryAppender extends AppenderBase<ILoggingEvent> {
private final List<ILoggingEvent> logs = new CopyOnWriteArrayList<>();
@Override
protected void append(ILoggingEvent event) {
logs.add(event);
}
public List<ILoggingEvent> getLogs() {
return logs;
}
public void clear() {
logs.clear();
}
}
}

View File

@@ -216,23 +216,4 @@ class MapUtilsTest {
"k1.k4", "v2"
));
}
@Test
@SuppressWarnings("unchecked")
void mergeShouldNotDuplicateListElements() {
Map<String, Object> first = Map.of(
"key1", "value1",
"key2", List.of("something", "else")
);
Map<String, Object> second = Map.of(
"key2", List.of("something", "other"),
"key3", "value3"
);
Map<String, Object> results = MapUtils.merge(first, second);
assertThat(results).hasSize(3);
List<String> list = (List<String>) results.get("key2");
assertThat(list).hasSize(3);
}
}

View File

@@ -20,6 +20,7 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
import reactor.core.publisher.Flux;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
@@ -44,6 +45,9 @@ class NamespaceFilesUtilsTest {
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
QueueInterface<LogEntry> workerTaskLogQueue;
@Inject
NamespaceFilesUtils namespaceFilesUtils;
@Inject
NamespaceFactory namespaceFactory;
@@ -62,7 +66,7 @@ class NamespaceFilesUtilsTest {
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
}
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
receive.blockLast();
@@ -87,7 +91,7 @@ class NamespaceFilesUtilsTest {
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
}
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
receive.blockLast();
@@ -112,7 +116,7 @@ class NamespaceFilesUtilsTest {
namespaceStorage.putFile(Path.of("/folder2/test.txt"), data);
namespaceStorage.putFile(Path.of("/test.txt"), data);
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
receive.blockLast();
@@ -137,7 +141,7 @@ class NamespaceFilesUtilsTest {
namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data);
namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data);
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
.namespaces(Property.ofValue(List.of(ns1, ns2)))
.folderPerNamespace(Property.ofValue(true))
.build());

View File

@@ -1,30 +0,0 @@
package io.kestra.core.utils;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest
class VersionProviderTest {
@Inject
private VersionProvider versionProvider;
@Inject
private SettingRepositoryInterface settingRepository;
@Test
void shouldResolveVersion() {
assertThat(versionProvider.getVersion()).endsWith("-SNAPSHOT");
// check that the version is persisted in settings
Optional<Setting> versionSettings = settingRepository.findByKey(Setting.INSTANCE_VERSION);
assertThat(versionSettings).isPresent();
assertThat(versionSettings.get().getValue()).isEqualTo(versionProvider.getVersion());
}
}

View File

@@ -9,15 +9,9 @@ import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.services.FlowService;
import jakarta.validation.ConstraintViolationException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.io.File;
import java.net.URL;
import java.util.Optional;
@@ -29,107 +23,6 @@ class FlowValidationTest {
@Inject
private ModelValidator modelValidator;
@Inject
private FlowService flowService;
private static final ObjectMapper mapper = new ObjectMapper();
// Helper class to create JsonProcessingException with location
private static class TestJsonProcessingException extends JsonProcessingException {
public TestJsonProcessingException(String msg, JsonLocation location) {
super(msg, location);
}
public TestJsonProcessingException(String msg) {
super(msg);
}
}
@Test
void testFormatYamlErrorMessage_WithExpectedFieldName() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("Expected a field name", new JsonLocation(null, 100, 5, 10));
Object dummyTarget = new Object(); // Dummy target for toConstraintViolationException
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").contains("(at line 5)");
}
@Test
void testFormatYamlErrorMessage_WithMappingStartEvent() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("MappingStartEvent", new JsonLocation(null, 200, 3, 5));
Object dummyTarget = new Object();
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML syntax error: Unexpected mapping start").contains("(at line 3)");
}
@Test
void testFormatYamlErrorMessage_WithScalarValue() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("Scalar value", new JsonLocation(null, 150, 7, 12));
Object dummyTarget = new Object();
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML syntax error: Expected a simple value").contains("(at line 7)");
}
@Test
void testFormatYamlErrorMessage_GenericError() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("Some other error", new JsonLocation(null, 50, 2, 8));
Object dummyTarget = new Object();
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML parsing error: Some other error").contains("(at line 2)");
}
@Test
void testFormatYamlErrorMessage_NoLocation() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("Expected a field name");
Object dummyTarget = new Object();
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").doesNotContain("at line");
}
@Test
void testValidateFlowWithYamlSyntaxError() {
String invalidYaml = """
id: test-flow
namespace: io.kestra.unittest
tasks:
- id:hello
type: io.kestra.plugin.core.log.Log
message: {{ abc }}
""";
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", invalidYaml);
assertThat(results).hasSize(1);
assertThat(results.getFirst().getConstraints()).contains("YAML parsing error").contains("at line");
}
@Test
void testValidateFlowWithUndefinedVariable() {
String yamlWithUndefinedVar = """
id: test-flow
namespace: io.kestra.unittest
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: {{ undefinedVar }}
""";
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", yamlWithUndefinedVar);
assertThat(results).hasSize(1);
assertThat(results.getFirst().getConstraints()).contains("Validation error");
}
@Test
void invalidRecursiveFlow() {
Flow flow = this.parse("flows/invalids/recursive-flow.yaml");
@@ -237,4 +130,4 @@ class FlowValidationTest {
return YamlParser.parse(file, Flow.class);
}
}
}

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.State.History;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.InputAndOutput;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.services.VariablesService;
import io.micronaut.context.ApplicationContext;
@@ -47,15 +46,11 @@ class SubflowTest {
@Mock
private ApplicationContext applicationContext;
@Mock
private InputAndOutput inputAndOutput;
@BeforeEach
void beforeEach() {
Mockito.when(applicationContext.getBean(VariablesService.class)).thenReturn(new VariablesService());
Mockito.when(runContext.logger()).thenReturn(LOG);
Mockito.when(runContext.getApplicationContext()).thenReturn(applicationContext);
Mockito.when(runContext.inputAndOutput()).thenReturn(inputAndOutput);
}
@Test
@@ -123,7 +118,7 @@ class SubflowTest {
Map<String, Object> outputs = Map.of("key", "value");
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(outputs);
Mockito.when(inputAndOutput.renderOutputs(Mockito.anyList())).thenReturn(Map.of("key", "value"));
Subflow subflow = Subflow.builder()
.outputs(outputs)
@@ -164,7 +159,6 @@ class SubflowTest {
Output output = Output.builder().id("key").value("value").build();
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue()));
Mockito.when(inputAndOutput.typedOutputs(Mockito.any(), Mockito.any(), Mockito.anyMap())).thenReturn(Map.of("key", "value"));
Flow flow = Flow.builder()
.outputs(List.of(output))
.build();

View File

@@ -57,7 +57,7 @@ class ScheduleOnDatesTest {
}
@Test
public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() {
public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() throws Exception {
// given
var now = ZonedDateTime.now();
var before = now.minusMinutes(1).truncatedTo(ChronoUnit.SECONDS);
@@ -75,7 +75,7 @@ class ScheduleOnDatesTest {
ZonedDateTime nextDate = scheduleOnDates.nextEvaluationDate(conditionContext, Optional.empty());
// then
assertThat(nextDate).isEqualTo(after);
assertThat(nextDate).isEqualTo(before);
}
@Test

View File

@@ -13,7 +13,6 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.flows.input.MultiselectInput;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.RunContextFactory;
@@ -104,9 +103,8 @@ class ScheduleTest {
);
assertThat(evaluate.isPresent()).isTrue();
assertThat(evaluate.get().getLabels()).hasSize(4);
assertThat(evaluate.get().getLabels()).hasSize(3);
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
var vars = evaluate.get().getTrigger().getVariables();
var inputs = evaluate.get().getInputs();
@@ -139,9 +137,8 @@ class ScheduleTest {
);
assertThat(evaluate.isPresent()).isTrue();
assertThat(evaluate.get().getLabels()).hasSize(4);
assertThat(evaluate.get().getLabels()).hasSize(3);
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
var inputs = evaluate.get().getInputs();
@@ -478,81 +475,6 @@ class ScheduleTest {
assertThat(result.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
}
@Test
void successWithMultiselectInputDefaults() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
ZonedDateTime date = ZonedDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS)
.minusMonths(1);
Optional<Execution> evaluate = trigger.evaluate(
conditionContextWithMultiselectInput(trigger),
triggerContext(date, trigger));
assertThat(evaluate.isPresent()).isTrue();
var inputs = evaluate.get().getInputs();
// Verify MULTISELECT input with explicit defaults works correctly
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option1", "option2"));
}
@Test
void successWithMultiselectInputAutoSelectFirst() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
ZonedDateTime date = ZonedDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS)
.minusMonths(1);
Optional<Execution> evaluate = trigger.evaluate(
conditionContextWithMultiselectAutoSelectFirst(trigger),
triggerContext(date, trigger));
assertThat(evaluate.isPresent()).isTrue();
var inputs = evaluate.get().getInputs();
// Verify MULTISELECT input with autoSelectFirst defaults to first option
assertThat(inputs.get("multiselectAutoSelect")).isEqualTo(List.of("first"));
}
@Test
void successWithMultiselectInputProvidedValue() throws Exception {
// Test that provided values override defaults for MULTISELECT
Schedule trigger = Schedule.builder()
.id("schedule")
.type(Schedule.class.getName())
.cron("0 0 1 * *")
.inputs(Map.of("multiselectInput", List.of("option3")))
.build();
ZonedDateTime date = ZonedDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS)
.minusMonths(1);
Optional<Execution> evaluate = trigger.evaluate(
conditionContextWithMultiselectInput(trigger),
triggerContext(date, trigger));
assertThat(evaluate.isPresent()).isTrue();
var inputs = evaluate.get().getInputs();
// Verify provided value overrides defaults
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option3"));
}
private ConditionContext conditionContext(AbstractTrigger trigger) {
Flow flow = Flow.builder()
.id(IdUtils.create())
@@ -582,79 +504,17 @@ class ScheduleTest {
.build();
}
private ConditionContext conditionContextWithMultiselectInput(AbstractTrigger trigger) {
Flow flow = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.tests")
.labels(
List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2")))
.variables(Map.of("custom_var", "VARIABLE VALUE"))
.inputs(List.of(
MultiselectInput.builder()
.id("multiselectInput")
.type(Type.MULTISELECT)
.values(List.of("option1", "option2", "option3"))
.defaults(Property.ofValue(List.of("option1", "option2")))
.build()))
.build();
TriggerContext triggerContext = TriggerContext.builder()
.namespace(flow.getNamespace())
.flowId(flow.getId())
.triggerId(trigger.getId())
.build();
return ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
triggerContext, trigger))
.flow(flow)
.build();
}
private ConditionContext conditionContextWithMultiselectAutoSelectFirst(AbstractTrigger trigger) {
Flow flow = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.tests")
.labels(
List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2")))
.variables(Map.of("custom_var", "VARIABLE VALUE"))
.inputs(List.of(
MultiselectInput.builder()
.id("multiselectAutoSelect")
.type(Type.MULTISELECT)
.values(List.of("first", "second", "third"))
.autoSelectFirst(true)
.build()))
.build();
TriggerContext triggerContext = TriggerContext.builder()
.namespace(flow.getNamespace())
.flowId(flow.getId())
.triggerId(trigger.getId())
.build();
return ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
triggerContext, trigger))
.flow(flow)
.build();
}
private ZonedDateTime dateFromVars(String date, ZonedDateTime expexted) {
return ZonedDateTime.parse(date).withZoneSameInstant(expexted.getZone());
}
@Test
void shouldGetNextExecutionDateWithConditionMatchingFutureDate() throws InternalException {
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
OffsetTime before = now.minusHours(1).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
OffsetTime after = now.minusHours(4).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
Schedule trigger = Schedule.builder()
.id("schedule").type(Schedule.class.getName())
.cron("0 * * * *") // every hour
@@ -667,25 +527,25 @@ class ScheduleTest {
.build()
))
.build();
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
ConditionContext conditionContext = ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
.build();
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
assertThat(result).isNotEmpty();
}
@Test
void shouldGetNextExecutionDateWithConditionMatchingCurrentDate() throws InternalException {
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
OffsetTime before = now.plusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
OffsetTime after = now.minusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
Schedule trigger = Schedule.builder()
.id("schedule").type(Schedule.class.getName())
.cron("*/30 * * * * *")
@@ -698,13 +558,13 @@ class ScheduleTest {
.build()
))
.build();
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
ConditionContext conditionContext = ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
.build();
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
assertThat(result).isNotEmpty();
}

View File

@@ -8,4 +8,4 @@ concurrency:
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT2S
duration: PT10S

View File

@@ -402,11 +402,10 @@ public class ExecutorService {
if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
var inputAndOutput = runContext.inputAndOutput();
try {
Map<String, Object> outputs = inputAndOutput.renderOutputs(flow.getOutputs());
outputs = inputAndOutput.typedOutputs(flow, executor.getExecution(), outputs);
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
newExecution = newExecution.withOutputs(outputs);
} catch (Exception e) {
Logs.logExecution(

View File

@@ -1,6 +1,6 @@
package io.kestra.runner.h2;
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
public class H2RunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
public class H2RunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
}

View File

@@ -1,6 +1,6 @@
package io.kestra.runner.mysql;
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
public class MysqlRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
public class MysqlRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
}

View File

@@ -1,6 +1,6 @@
package io.kestra.runner.postgres;
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
public class PostgresRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
public class PostgresRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
}

View File

@@ -44,15 +44,9 @@ public abstract class AbstractJdbcSettingRepository extends AbstractJdbcCrudRepo
@Override
public Setting save(Setting setting) {
this.eventPublisher.publishEvent(new CrudEvent<>(setting, CrudEventType.UPDATE));
return internalSave(setting);
}
@Override
public Setting internalSave(Setting setting) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(setting);
this.jdbcRepository.persist(setting, fields);
this.eventPublisher.publishEvent(new CrudEvent<>(setting, CrudEventType.UPDATE));
return setting;
}

View File

@@ -72,12 +72,12 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
@Override
public Optional<Trigger> findLast(TriggerContext trigger) {
return findByUid(trigger.uid());
return findOne(DSL.trueCondition(), field("key").eq(trigger.uid()));
}
@Override
public Optional<Trigger> findByUid(String uid) {
return findOne(DSL.trueCondition(), field("key").eq(uid));
public Optional<Trigger> findByExecution(Execution execution) {
return findOne(execution.getTenantId(), field("execution_id").eq(execution.getId()));
}
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) {

View File

@@ -74,19 +74,15 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
* Decrement the concurrency limit counter.
* Must only be called when a flow having concurrency limit ends.
*/
public int decrement(FlowInterface flow) {
return this.jdbcRepository
public void decrement(FlowInterface flow) {
this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
.transaction(configuration -> {
var dslContext = DSL.using(configuration);
return fetchOne(dslContext, flow).map(
concurrencyLimit -> {
int newLimit = concurrencyLimit.getRunning() == 0 ? 0 : concurrencyLimit.getRunning() - 1;
update(dslContext, concurrencyLimit.withRunning(newLimit));
return newLimit;
}
).orElse(0);
fetchOne(dslContext, flow).ifPresent(
concurrencyLimit -> update(dslContext, concurrencyLimit.withRunning(concurrencyLimit.getRunning() == 0 ? 0 : concurrencyLimit.getRunning() - 1))
);
});
}

View File

@@ -13,7 +13,6 @@ import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.queues.QueueException;
@@ -1139,7 +1138,9 @@ public class JdbcExecutor implements ExecutorInterface {
execution.getTrigger().getId()
);
} else {
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
}
}
@@ -1209,30 +1210,24 @@ public class JdbcExecutor implements ExecutorInterface {
// as we may receive multiple time killed execution (one when we kill it, then one for each running worker task), we limit to the first we receive: when the state transitionned from KILLING to KILLED
boolean killingThenKilled = execution.getState().getCurrent().isKilled() && executor.getOriginalState() == State.Type.KILLING;
if (!queuedThenKilled && !concurrencyShortCircuitState && (!execution.getState().getCurrent().isKilled() || killingThenKilled)) {
int newLimit = concurrencyLimitStorage.decrement(executor.getFlow());
// decrement execution concurrency limit and pop a new queued execution if needed
concurrencyLimitStorage.decrement(executor.getFlow());
if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
var finalFlow = executor.getFlow();
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
executor.getFlow().getNamespace(),
executor.getFlow().getId(),
throwBiConsumer((dslContext, queued) -> {
var newExecution = queued.withState(State.Type.RUNNING);
concurrencyLimitStorage.increment(dslContext, finalFlow);
executionQueue.emit(newExecution);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
if (newLimit < finalFlow.getConcurrency().getLimit()) {
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
executor.getFlow().getNamespace(),
executor.getFlow().getId(),
throwBiConsumer((dslContext, queued) -> {
var newExecution = queued.withState(State.Type.RUNNING);
concurrencyLimitStorage.increment(dslContext, finalFlow);
executionQueue.emit(newExecution);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
// process flow triggers to allow listening on RUNNING state after a QUEUED state
processFlowTriggers(newExecution);
})
);
} else {
log.error("Concurrency limit reached for flow {}.{} after decrementing the execution running count due to the terminated execution {}. No new executions will be dequeued.", executor.getFlow().getNamespace(), executor.getFlow().getId(), executor.getExecution().getId());
}
} else if (newLimit >= executor.getFlow().getConcurrency().getLimit()) {
log.error("Concurrency limit reached for flow {}.{} after decrementing the execution running count due to the terminated execution {}. This should not happen.", executor.getFlow().getNamespace(), executor.getFlow().getId(), executor.getExecution().getId());
// process flow triggers to allow listening on RUNNING state after a QUEUED state
processFlowTriggers(newExecution);
})
);
}
}
}
@@ -1240,7 +1235,11 @@ public class JdbcExecutor implements ExecutorInterface {
// purge the trigger: reset scheduler trigger at end
if (execution.getTrigger() != null) {
FlowWithSource flow = executor.getFlow();
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> {
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
});
}
// Purge the workerTaskResultQueue and the workerJobQueue

View File

@@ -1,64 +0,0 @@
package io.kestra.jdbc.runner;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class JdbcConcurrencyRunnerTest extends AbstractRunnerConcurrencyTest {
public static final String NAMESPACE = "io.kestra.tests";
@Inject
private AbstractJdbcConcurrencyLimitStorage concurrencyLimitStorage;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
private TestRunnerUtils runnerUtils;
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-queued-protection")
void flowConcurrencyQueuedProtection() throws QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
assertThat(execution1.getState().isRunning()).isTrue();
Flow flow = flowRepository
.findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", Optional.empty())
.orElseThrow();
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(State.Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
// manually update the concurrency count so that queued protection kicks in and no new execution would be popped
ConcurrencyLimit concurrencyLimit = concurrencyLimitStorage.findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue").orElseThrow();
concurrencyLimit = concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1);
concurrencyLimitStorage.update(concurrencyLimit);
Execution executionResult1 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(State.Type.SUCCESS), execution1);
assertThat(executionResult1.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// we wait for a few ms and checked that the second execution is still queued
Thread.sleep(500);
Execution executionResult2 = executionRepository.findById("flow-concurrency-queued-protection", execution2.getId()).orElseThrow();
assertThat(executionResult2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
// we manually reset the concurrency count to avoid messing with any other tests
concurrencyLimitStorage.update(concurrencyLimit.withRunning(concurrencyLimit.getRunning() - 1));
}
}

View File

@@ -31,11 +31,11 @@ dependencies {
api enforcedPlatform("com.fasterxml.jackson:jackson-bom:$jacksonVersion")
api enforcedPlatform("org.slf4j:slf4j-api:$slf4jVersion")
api platform("io.micronaut.platform:micronaut-platform:4.9.4")
api platform("io.qameta.allure:allure-bom:2.32.0")
api platform("io.qameta.allure:allure-bom:2.31.0")
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
api platform('com.google.cloud:libraries-bom:26.73.0')
api platform('com.google.cloud:libraries-bom:26.72.0')
api platform("com.azure:azure-sdk-bom:1.3.3")
api platform('software.amazon.awssdk:bom:2.40.10')
api platform('software.amazon.awssdk:bom:2.40.5')
api platform("dev.langchain4j:langchain4j-bom:$langchain4jVersion")
api platform("dev.langchain4j:langchain4j-community-bom:$langchain4jCommunityVersion")
@@ -77,12 +77,12 @@ dependencies {
api "org.apache.kafka:kafka-clients:$kafkaVersion"
api "org.apache.kafka:kafka-streams:$kafkaVersion"
// AWS CRT is not included in the AWS BOM but needed for the S3 Transfer manager
api 'software.amazon.awssdk.crt:aws-crt:0.41.0'
api 'software.amazon.awssdk.crt:aws-crt:0.40.3'
// Other libs
api("org.projectlombok:lombok:1.18.42")
api("org.codehaus.janino:janino:3.1.12")
api group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.25.3'
api group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.25.2'
api group: 'org.slf4j', name: 'jul-to-slf4j', version: slf4jVersion
api group: 'org.slf4j', name: 'jcl-over-slf4j', version: slf4jVersion
api group: 'org.fusesource.jansi', name: 'jansi', version: '2.4.2'
@@ -99,11 +99,11 @@ dependencies {
api group: 'org.apache.maven.resolver', name: 'maven-resolver-transport-file', version: mavenResolverVersion
api group: 'org.apache.maven.resolver', name: 'maven-resolver-transport-apache', version: mavenResolverVersion
api 'com.github.oshi:oshi-core:6.9.1'
api 'io.pebbletemplates:pebble:4.1.0'
api 'io.pebbletemplates:pebble:4.0.0'
api group: 'co.elastic.logging', name: 'logback-ecs-encoder', version: '1.7.0'
api group: 'de.focus-shift', name: 'jollyday-core', version: jollydayVersion
api group: 'de.focus-shift', name: 'jollyday-jaxb', version: jollydayVersion
api 'nl.basjes.gitignore:gitignore-reader:1.14.1'
api 'nl.basjes.gitignore:gitignore-reader:1.13.0'
api group: 'dev.failsafe', name: 'failsafe', version: '3.3.2'
api group: 'com.cronutils', name: 'cron-utils', version: '9.2.1'
api group: 'com.github.victools', name: 'jsonschema-generator', version: jsonschemaVersion

View File

@@ -288,7 +288,7 @@ public abstract class AbstractScheduler implements Scheduler {
disableInvalidTrigger(workerTriggerResult.getTriggerContext(), e);
return;
}
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate, workerTriggerResult.getTrigger());
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate);
} else {
ZonedDateTime nextExecutionDate;
try {
@@ -715,8 +715,7 @@ public abstract class AbstractScheduler implements Scheduler {
Optional<SchedulerExecutionWithTrigger> schedulerExecutionWithTrigger = evaluateScheduleTrigger(f);
if (schedulerExecutionWithTrigger.isPresent()) {
this.handleEvaluateSchedulingTriggerResult(schedule, schedulerExecutionWithTrigger.get(), f.getConditionContext(), scheduleContext);
}
else{
} else {
// compute next date and save the trigger to avoid evaluating it each second
Trigger trigger = Trigger.fromEvaluateFailed(
f.getTriggerContext(),
@@ -751,7 +750,26 @@ public abstract class AbstractScheduler implements Scheduler {
// validate schedule condition can fail to render variables
// in this case, we send a failed execution so the trigger is not evaluated each second.
logger.error("Unable to evaluate the trigger '{}'", f.getAbstractTrigger().getId(), ie);
handleFailedEvaluatedTrigger(f, scheduleContext, ie);
Execution execution = Execution.builder()
.id(IdUtils.create())
.tenantId(f.getTriggerContext().getTenantId())
.namespace(f.getTriggerContext().getNamespace())
.flowId(f.getTriggerContext().getFlowId())
.flowRevision(f.getFlow().getRevision())
.labels(LabelService.labelsExcludingSystem(f.getFlow()))
.state(new State().withState(State.Type.FAILED))
.build();
ZonedDateTime nextExecutionDate;
try {
nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
} catch (InvalidTriggerConfigurationException e2) {
logError(f, e2);
disableInvalidTrigger(f, e2);
return;
}
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
}
});
});
@@ -768,7 +786,7 @@ public abstract class AbstractScheduler implements Scheduler {
}
private void handleEvaluateWorkerTriggerResult(SchedulerExecutionWithTrigger result, ZonedDateTime
nextExecutionDate, AbstractTrigger abstractTrigger) {
nextExecutionDate) {
Optional.ofNullable(result)
.ifPresent(executionWithTrigger -> {
log(executionWithTrigger);
@@ -779,12 +797,6 @@ public abstract class AbstractScheduler implements Scheduler {
nextExecutionDate
);
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
// i.e., the trigger will not be locked
if (abstractTrigger.isAllowConcurrent()) {
trigger = trigger.toBuilder().executionId(null).build();
}
// Worker triggers result is evaluated in another thread with the workerTriggerResultQueue.
// We can then update the trigger directly.
this.saveLastTriggerAndEmitExecution(executionWithTrigger.getExecution(), trigger, triggerToSave -> this.triggerState.update(triggerToSave));
@@ -806,12 +818,6 @@ public abstract class AbstractScheduler implements Scheduler {
if (result.getExecution().getState().getCurrent() == State.Type.FAILED) {
trigger = trigger.resetExecution(State.Type.FAILED);
}
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
// i.e., the trigger will not be locked
if (((AbstractTrigger)schedule).isAllowConcurrent()) {
trigger = trigger.toBuilder().executionId(null).build();
}
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
// So we must save them by passing the scheduleContext.
@@ -977,43 +983,11 @@ public abstract class AbstractScheduler implements Scheduler {
));
} catch (Exception e) {
logError(flowWithTrigger, e);
Execution failedExecution = createFailedExecution( flowWithTrigger, e);
this.emitExecution(failedExecution, flowWithTrigger.getTriggerContext());
return Optional.empty();
}
});
}
private Execution createFailedExecution(FlowWithWorkerTrigger flowWithTrigger, Throwable e){
Execution execution = Execution.builder()
.id(IdUtils.create())
.tenantId(flowWithTrigger.getTriggerContext().getTenantId())
.namespace(flowWithTrigger.getTriggerContext().getNamespace())
.flowId(flowWithTrigger.getTriggerContext().getFlowId())
.flowRevision(flowWithTrigger.getFlow().getRevision())
.labels(LabelService.labelsExcludingSystem(flowWithTrigger.getFlow()))
.state(new State().withState(State.Type.FAILED))
.build();
Logger logger = runContextFactory.of(flowWithTrigger.getFlow(), execution).logger();
logger.error("[trigger: {}] [date: {}] Evaluate Failed with error '{}'" , flowWithTrigger.getAbstractTrigger().getId(), now(), e.getMessage());
return execution;
}
private void handleFailedEvaluatedTrigger(FlowWithWorkerTrigger flowWithTrigger, ScheduleContextInterface scheduleContext, Throwable e ){
Execution execution = createFailedExecution(flowWithTrigger, e);
ZonedDateTime nextExecutionDate;
try {
nextExecutionDate = this.nextEvaluationDate(flowWithTrigger.getAbstractTrigger());
} catch (InvalidTriggerConfigurationException e2) {
logError(flowWithTrigger, e2);
disableInvalidTrigger(flowWithTrigger, e2);
return;
}
var trigger = flowWithTrigger.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
trigger = trigger.checkBackfill();
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
}
private void logError(FlowWithWorkerTrigger flowWithWorkerTriggerNextDate, Throwable e) {
Logger logger = flowWithWorkerTriggerNextDate.getConditionContext().getRunContext().logger();

View File

@@ -91,7 +91,6 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
assertThat(queueCount.getCount()).isEqualTo(0L);
assertThat(last.get()).isNotNull();
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
}
}
@@ -137,7 +136,6 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
assertThat(queueCount.getCount()).isEqualTo(0L);
assertThat(last.get()).isNotNull();
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
// Assert that the trigger is now disabled.
// It needs to await on assertion as it will be disabled AFTER we receive a success execution.

View File

@@ -489,8 +489,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getDisabled()).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(10));
}
}
@Test
void failedEvaluationFromFailedCondition() {
void failedEvaluationTest() {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
Schedule schedule = createScheduleTrigger("Europe/Paris", "* * * * *", "failedEvaluation", false)
@@ -526,61 +527,6 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
assertThat(execution).isNotNull();
assertThat(execution.getFlowId()).isEqualTo(flow.getId());
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
queueCount.countDown();
});
scheduler.run();
queueCount.await(1, TimeUnit.MINUTES);
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
receive.blockLast();
assertThat(queueCount.getCount()).isEqualTo(0L);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
void failedEvaluationFromInvalidExpression() {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
Schedule schedule = createScheduleTrigger("Europe/Paris", "* * * * *", "failedEvaluation", false)
.inputs(
Map.of("invalidExpressionInput", Expression.builder()
.type(Expression.class.getName())
.expression(Property.ofExpression("{{ now().hour == 0 ? 3 : 2 }}"))
.build()
)
)
.build();
FlowWithSource flow = createFlow(this.tenantId,Collections.singletonList(schedule));
doReturn(List.of(flow))
.when(flowListenersServiceSpy)
.flows();
// to avoid waiting too much before a trigger execution, we add a last trigger with a date now - 1m.
Trigger lastTrigger = Trigger
.builder()
.triggerId("failedEvaluation")
.tenantId(this.tenantId)
.flowId(flow.getId())
.namespace(flow.getNamespace())
.date(ZonedDateTime.now().minusMinutes(1L))
.build();
triggerState.create(lastTrigger);
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
assertThat(execution).isNotNull();
assertThat(execution.getFlowId()).isEqualTo(flow.getId());
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);

View File

@@ -104,7 +104,6 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
assertThat(SchedulerStreamingTest.startedEvaluate.get(false), is(1));
assertThat(last.getTrigger().getVariables().get("startedEvaluate"), is(1));
assertTrue(last.getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(last.getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
}
);
}

View File

@@ -149,7 +149,8 @@ public class CommandsWrapper implements TaskCommands {
public <T extends TaskRunnerDetailResult> ScriptOutput run() throws Exception {
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class);
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
}
TaskRunner<T> realTaskRunner = this.getTaskRunner();
@@ -157,7 +158,9 @@ public class CommandsWrapper implements TaskCommands {
FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
}
RunContext taskRunnerRunContext = runContext.cloneForPlugin(realTaskRunner);
RunContextInitializer initializer = ((DefaultRunContext) runContext).getApplicationContext().getBean(RunContextInitializer.class);
RunContext taskRunnerRunContext = initializer.forPlugin(((DefaultRunContext) runContext).clone(), realTaskRunner);
List<String> renderedCommands = this.renderCommands(runContext, commands);
List<String> renderedBeforeCommands = this.renderCommands(runContext, beforeCommands);

View File

@@ -6,7 +6,7 @@
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<link rel="icon" href="/favicon.ico">
<link rel="icon" type="image/png" sizes="192x192" href="/favicon-192x192.png">
<link rel="icon" type="image/png" sizes="192x192" href="/favicon-192x192.png">
<meta name="msapplication-TileColor" content="#192A4E">
<meta name="theme-color" content="#192A4E">
<link rel="stylesheet" href="/loader.css" />
@@ -26,6 +26,26 @@
document.getElementsByTagName("html")[0].classList.add(localStorage.getItem("theme"));
}
</script>
<!-- Optional but recommended for faster connection -->
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<!-- Load Google Fonts non-blocking -->
<link
rel="stylesheet"
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;600;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
media="print"
onload="this.media='all'"
>
<!-- Fallback for when JavaScript is disabled -->
<noscript>
<link
rel="stylesheet"
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;600;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
>
</noscript>
</head>
<body>
<noscript>

1177
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,11 +24,11 @@
},
"dependencies": {
"@js-joda/core": "^5.6.5",
"@kestra-io/ui-libs": "^0.0.268",
"@kestra-io/ui-libs": "^0.0.264",
"@vue-flow/background": "^1.3.2",
"@vue-flow/controls": "^1.1.2",
"@vue-flow/core": "^1.48.0",
"@vueuse/core": "^14.1.0",
"@vue-flow/core": "^1.47.0",
"@vueuse/core": "^14.0.0",
"ansi-to-html": "^0.7.2",
"axios": "^1.13.2",
"bootstrap": "^5.3.8",
@@ -39,8 +39,8 @@
"cytoscape": "^3.33.0",
"dagre": "^0.8.5",
"dotenv": "^17.2.3",
"element-plus": "2.12.0",
"humanize-duration": "^3.33.2",
"element-plus": "2.11.8",
"humanize-duration": "^3.33.1",
"js-yaml": "^4.1.1",
"lodash": "^4.17.21",
"mailchecker": "^6.0.19",
@@ -57,33 +57,33 @@
"moment-timezone": "^0.5.46",
"nprogress": "^0.2.0",
"path-browserify": "^1.0.1",
"pdfjs-dist": "^5.4.449",
"pdfjs-dist": "^5.4.394",
"pinia": "^3.0.4",
"posthog-js": "^1.308.0",
"posthog-js": "^1.291.0",
"rapidoc": "^9.3.8",
"semver": "^7.7.3",
"shiki": "^3.20.0",
"vue": "^3.5.25",
"shiki": "^3.15.0",
"vue": "^3.5.24",
"vue-axios": "^3.5.2",
"vue-chartjs": "^5.3.3",
"vue-gtag": "^3.6.3",
"vue-i18n": "^11.2.2",
"vue-material-design-icons": "^5.3.1",
"vue-router": "^4.6.4",
"vue-sidebar-menu": "^5.9.1",
"vue-router": "^4.6.3",
"vue-sidebar-menu": "^5.8.0",
"vue-virtual-scroller": "^2.0.0-beta.8",
"vue3-popper": "^1.5.0",
"vue3-tour": "github:kestra-io/vue3-tour",
"xss": "^1.0.15",
"yaml": "^2.8.2"
"yaml": "^2.8.1"
},
"devDependencies": {
"@codecov/vite-plugin": "^1.9.1",
"@esbuild-plugins/node-modules-polyfill": "^0.2.2",
"@eslint/js": "^9.39.2",
"@playwright/test": "^1.57.0",
"@eslint/js": "^9.39.1",
"@playwright/test": "^1.56.1",
"@rushstack/eslint-patch": "^1.14.1",
"@shikijs/markdown-it": "^3.20.0",
"@shikijs/markdown-it": "^3.15.0",
"@storybook/addon-themes": "^9.1.16",
"@storybook/addon-vitest": "^9.1.16",
"@storybook/test-runner": "^0.23.0",
@@ -91,58 +91,58 @@
"@types/humanize-duration": "^3.27.4",
"@types/js-yaml": "^4.0.9",
"@types/moment": "^2.13.0",
"@types/node": "^25.0.3",
"@types/node": "^24.10.2",
"@types/nprogress": "^0.2.3",
"@types/path-browserify": "^1.0.3",
"@types/semver": "^7.7.1",
"@types/testing-library__jest-dom": "^6.0.0",
"@types/testing-library__user-event": "^4.2.0",
"@typescript-eslint/parser": "^8.50.0",
"@vitejs/plugin-vue": "^6.0.3",
"@typescript-eslint/parser": "^8.46.4",
"@vitejs/plugin-vue": "^6.0.2",
"@vitejs/plugin-vue-jsx": "^5.1.2",
"@vitest/browser": "^3.2.4",
"@vitest/coverage-v8": "^3.2.4",
"@vue/eslint-config-prettier": "^10.2.0",
"@vue/test-utils": "^2.4.6",
"@vueuse/router": "^14.1.0",
"@vueuse/router": "^14.0.0",
"change-case": "5.4.4",
"cross-env": "^10.1.0",
"eslint": "^9.39.2",
"eslint": "^9.39.1",
"eslint-plugin-storybook": "^9.1.16",
"eslint-plugin-vue": "^9.33.0",
"globals": "^16.5.0",
"husky": "^9.1.7",
"jsdom": "^27.3.0",
"lint-staged": "^16.2.7",
"jsdom": "^27.1.0",
"lint-staged": "^16.2.6",
"monaco-editor": "^0.52.2",
"monaco-yaml": "5.3.1",
"patch-package": "^8.0.1",
"playwright": "^1.55.0",
"prettier": "^3.7.4",
"rimraf": "^6.1.2",
"rolldown-vite": "^7.2.11",
"prettier": "^3.6.2",
"rimraf": "^6.1.0",
"rolldown-vite": "^7.2.5",
"rollup-plugin-copy": "^3.5.0",
"sass": "^1.97.0",
"sass": "^1.93.3",
"storybook": "^9.1.16",
"storybook-vue3-router": "^6.0.2",
"ts-node": "^10.9.2",
"typescript": "^5.9.3",
"typescript-eslint": "^8.50.0",
"typescript-eslint": "^8.46.4",
"uuid": "^13.0.0",
"vite": "npm:rolldown-vite@latest",
"vitest": "^3.2.4",
"vue-tsc": "^3.1.8"
"vue-tsc": "^3.1.4"
},
"optionalDependencies": {
"@esbuild/darwin-arm64": "^0.27.2",
"@esbuild/darwin-x64": "^0.27.2",
"@esbuild/linux-x64": "^0.27.2",
"@rollup/rollup-darwin-arm64": "^4.53.5",
"@rollup/rollup-darwin-x64": "^4.53.5",
"@rollup/rollup-linux-x64-gnu": "^4.53.5",
"@swc/core-darwin-arm64": "^1.15.5",
"@swc/core-darwin-x64": "^1.15.5",
"@swc/core-linux-x64-gnu": "^1.15.5"
"@esbuild/darwin-arm64": "^0.27.1",
"@esbuild/darwin-x64": "^0.27.1",
"@esbuild/linux-x64": "^0.27.1",
"@rollup/rollup-darwin-arm64": "^4.53.3",
"@rollup/rollup-darwin-x64": "^4.53.3",
"@rollup/rollup-linux-x64-gnu": "^4.53.3",
"@swc/core-darwin-arm64": "^1.15.3",
"@swc/core-darwin-x64": "^1.15.3",
"@swc/core-linux-x64-gnu": "^1.15.3"
},
"overrides": {
"bootstrap": {

View File

@@ -11,7 +11,7 @@
title?: string;
message?: string;
content?: {
message?: string;
message: string;
_embedded?: {
errors?: any[];
};
@@ -19,8 +19,8 @@
response?: {
status: number;
config: {
url?: string;
method?: string;
url: string;
method: string;
};
};
variant?: "success" | "warning" | "info" | "error" | "primary";
@@ -97,8 +97,8 @@
if (props.message.response) {
error.error.response = {};
error.error.request = {
method: props.message.response.config.method ?? "GET",
url: props.message.response.config.url ?? "unknown url",
method: props.message.response.config.method,
url: props.message.response.config.url,
};
if (props.message.response.status) {
@@ -111,11 +111,7 @@
notifications.value = ElNotification({
title: title.value || "Error",
message: h(ErrorToastContainer, {
message: {
content:{
message: props.message?.content?.message ?? ""
}
},
message: props.message,
items: items.value,
onClose: () => close()
}),

View File

@@ -204,26 +204,24 @@
className="row-action"
>
<template #default="scope">
<div class="action-container">
<el-button v-if="scope.row.executionId || scope.row.evaluateRunningDate">
<Kicon
:tooltip="$t(`unlock trigger.tooltip.${scope.row.executionId ? 'execution' : 'evaluation'}`)"
placement="left"
@click="triggerToUnlock = scope.row"
>
<LockOff />
</Kicon>
</el-button>
<el-button>
<Kicon
:tooltip="$t('delete trigger')"
placement="left"
@click="confirmDeleteTrigger(scope.row)"
>
<Delete />
</Kicon>
</el-button>
</div>
<el-button v-if="scope.row.executionId || scope.row.evaluateRunningDate">
<Kicon
:tooltip="$t(`unlock trigger.tooltip.${scope.row.executionId ? 'execution' : 'evaluation'}`)"
placement="left"
@click="triggerToUnlock = scope.row"
>
<LockOff />
</Kicon>
</el-button>
<el-button>
<Kicon
:tooltip="$t('delete trigger')"
placement="left"
@click="confirmDeleteTrigger(scope.row)"
>
<Delete />
</Kicon>
</el-button>
</template>
</el-table-column>
<el-table-column :label="$t('backfill')" columnKey="backfill">
@@ -857,12 +855,6 @@
align-items: center;
}
.action-container {
display: flex;
align-items: center;
gap: 5px;
}
.statusIcon {
font-size: large;
}
@@ -935,4 +927,4 @@
}
}
}
</style>
</style>

View File

@@ -271,9 +271,7 @@
width: 100%;
max-width: 400px;
padding: 1rem;
display: flex;
flex-direction: column;
justify-content: center;
.logo {
width: 250px;
margin-bottom: 40px;

View File

@@ -7,27 +7,35 @@
<el-steps :space="60" direction="vertical" :active="activeStep" finishStatus="success">
<el-step :icon="activeStep > 0 ? CheckBold : AccountPlus" :title="t('setup.steps.user')" :class="{'primary-icon': activeStep <= 0}" />
<el-step
:icon="activeStep > 1 ? CheckBold : MessageOutline"
:title="t('setup.steps.survey')"
:icon="activeStep > 1 ? CheckBold : Cogs"
:title="t('setup.steps.config')"
:class="{'primary-icon': activeStep <= 1}"
/>
<el-step
:icon="activeStep > 2 ? CheckBold : MessageOutline"
:title="t('setup.steps.survey')"
:class="{'primary-icon': activeStep <= 2}"
/>
<el-step :icon="LightningBolt" :title="t('setup.steps.complete')" class="primary-icon" />
</el-steps>
</el-col>
<el-col :xs="24" :md="16" class="setup-main">
<el-card class="setup-card">
<template #header v-if="activeStep !== 2">
<template #header v-if="activeStep !== 3">
<div class="card-header">
<el-text size="large" class="header-title" v-if="activeStep === 0">
{{ t('setup.titles.user') }}
</el-text>
<el-text size="large" class="header-title" v-else-if="activeStep === 1">
Welcome {{ userFormData.firstName }}
</el-text>
<el-text size="large" class="header-title" v-else-if="activeStep === 2">
{{ t('setup.titles.survey') }}
</el-text>
<el-text class="d-block mt-4">
{{ subtitles[activeStep] }}
</el-text>
<el-button v-if="activeStep === 1" class="skip-button" @click="handleSurveySkip()">
<el-button v-if="activeStep === 2" class="skip-button" @click="handleSurveySkip()">
{{ t('setup.survey.skip') }}
</el-button>
</div>
@@ -90,7 +98,45 @@
</div>
</div>
<div v-else-if="activeStep === 1">
<div class="d-flex flex-column gap-4" v-else-if="activeStep === 1">
<el-card v-if="isLoading">
<el-text>Loading configuration...</el-text>
</el-card>
<el-card v-else-if="setupConfigurationLines.length > 0">
<el-row
v-for="config in setupConfigurationLines"
:key="config.name"
class="lh-lg mt-1 mb-1 align-items-center gap-2"
>
<component :is="config.icon" />
<el-text size="small">
{{ t("setup.config." + config.name) }}
</el-text>
<el-divider class="m-auto" />
<Check class="text-success" v-if="config.value === true" />
<Close class="text-danger" v-else-if="config.value === false" />
<el-text v-else size="small">
{{ config.value === "NOT SETUP" ? config.value : config.value.toString().capitalize() }}
</el-text>
</el-row>
</el-card>
<el-card v-else>
<el-text>No configuration data available</el-text>
</el-card>
<el-text class="align-self-start">
{{ t("setup.confirm.config_title") }}
</el-text>
<div class="d-flex align-self-start">
<el-button @click="previousStep()">
{{ t("setup.confirm.not_valid") }}
</el-button>
<el-button type="primary" @click="initBasicAuth()">
{{ t("setup.confirm.valid") }}
</el-button>
</div>
</div>
<div v-else-if="activeStep === 2">
<el-form ref="surveyForm" labelPosition="top" :model="surveyData" :showMessage="false">
<el-form-item :label="t('setup.survey.company_size')">
<el-radio-group v-model="surveyData.companySize" class="survey-radio-group">
@@ -137,7 +183,7 @@
</div>
</div>
<div v-else-if="activeStep === 2" class="success-step">
<div v-else-if="activeStep === 3" class="success-step">
<img :src="success" alt="success" class="success-img">
<div class="success-content">
<h1 class="success-title">
@@ -166,12 +212,19 @@
import {useSurveySkip} from "../../composables/useSurveyData"
import {initPostHogForSetup, trackSetupEvent} from "../../composables/usePosthog"
import Cogs from "vue-material-design-icons/Cogs.vue"
import AccountPlus from "vue-material-design-icons/AccountPlus.vue"
import LightningBolt from "vue-material-design-icons/LightningBolt.vue"
import MessageOutline from "vue-material-design-icons/MessageOutline.vue"
import Logo from "../home/Logo.vue"
import Check from "vue-material-design-icons/Check.vue"
import Close from "vue-material-design-icons/Close.vue"
import CheckBold from "vue-material-design-icons/CheckBold.vue"
import InformationOutline from "vue-material-design-icons/InformationOutline.vue"
import Database from "vue-material-design-icons/Database.vue"
import CurrentDc from "vue-material-design-icons/CurrentDc.vue"
import CloudOutline from "vue-material-design-icons/CloudOutline.vue"
import Lock from "vue-material-design-icons/Lock.vue"
import success from "../../assets/success.svg"
import * as BasicAuth from "../../utils/basicAuth";
@@ -188,6 +241,12 @@
newsletter: boolean
}
interface ConfigLine {
name: string
icon: any
value: any
}
interface CompanySizeOption {
value: string
label: string
@@ -199,6 +258,8 @@
const {storeSurveySkipData} = useSurveySkip()
const activeStep = ref(0)
const isLoading = ref(true)
const usageData = ref<any>(null)
const userForm: Ref<any> = ref(null)
const surveyForm: Ref<any> = ref(null)
@@ -216,6 +277,7 @@
})
const formData = computed(() => userFormData.value)
const setupConfiguration = computed(() => usageData.value?.configurations ?? {})
const initializeSetup = async () => {
try {
@@ -232,8 +294,12 @@
localStorage.setItem("basicAuthSetupInProgress", "true")
localStorage.setItem("setupStartTime", Date.now().toString())
usageData.value = await miscStore.loadAllUsages()
} catch {
/* Silently handle config loading errors */
/* Silently handle usage data loading errors */
} finally {
isLoading.value = false
}
}
@@ -245,8 +311,23 @@
}
})
const setupConfigurationLines = computed<ConfigLine[]>(() => {
if (!setupConfiguration.value) return []
const configs = miscStore.configs
const basicAuthValue = activeStep.value >= 1 || configs?.isBasicAuthInitialized
return [
{name: "repository", icon: Database, value: setupConfiguration.value.repositoryType || "NOT SETUP"},
{name: "queue", icon: CurrentDc, value: setupConfiguration.value.queueType || "NOT SETUP"},
{name: "storage", icon: CloudOutline, value: setupConfiguration.value.storageType || "NOT SETUP"},
{name: "basicauth", icon: Lock, value: basicAuthValue}
]
})
const subtitles = computed(() => [
t("setup.subtitles.user"),
t("setup.subtitles.config"),
t("setup.subtitles.survey"),
])
@@ -308,6 +389,14 @@
return field?.validateState === "error" ? field.validateMessage : null
}
const nextStep = () => {
activeStep.value++
}
const previousStep = () => {
activeStep.value--
}
const handleUserFormSubmit = async () => {
try {
await miscStore.addBasicAuth({
@@ -321,6 +410,12 @@
await miscStore.loadConfigs()
try {
usageData.value = await miscStore.loadAllUsages()
} catch {
/* Silently handle usage data loading */
}
trackSetupEvent("setup_flow:account_created", {
user_firstname: userFormData.value.firstName,
user_lastname: userFormData.value.lastName,
@@ -330,7 +425,7 @@
localStorage.setItem("basicAuthUserCreated", "true")
activeStep.value = 1
nextStep()
} catch (error: any) {
trackSetupEvent("setup_flow:account_creation_failed", {
error_message: error.message || "Unknown error"
@@ -339,6 +434,10 @@
}
}
const initBasicAuth = () => {
nextStep()
}
const handleSurveyContinue = () => {
localStorage.setItem("basicAuthSurveyData", JSON.stringify(surveyData.value))
@@ -360,7 +459,7 @@
...surveySelections
}, userFormData.value)
activeStep.value = 2
nextStep()
}
const handleSurveySkip = () => {
@@ -382,7 +481,7 @@
...surveySelections
}, userFormData.value)
activeStep.value = 2
nextStep()
}
const completeSetup = () => {

View File

@@ -24,7 +24,6 @@ $checkbox-checked-color: #8405FF;
width: 100%;
margin: 0 auto;
padding-top: 2rem;
height: fit-content;
@media screen and (min-width: 992px) {
gap: 3rem;

View File

@@ -24,7 +24,7 @@
/>
</el-form-item>
<el-form-item>
<el-button @click="expandCollapseAll()" :disabled="raw_view" :icon="logDisplayButtonIcon">
<el-button @click="expandCollapseAll()" :disabled="raw_view">
{{ logDisplayButtonText }}
</el-button>
</el-form-item>
@@ -32,7 +32,7 @@
<el-tooltip
:content="!raw_view ? $t('logs_view.raw_details') : $t('logs_view.compact_details')"
>
<el-button @click="toggleViewType" :icon="logViewTypeButtonIcon">
<el-button @click="toggleViewType">
{{ !raw_view ? $t('logs_view.raw') : $t('logs_view.compact') }}
</el-button>
</el-tooltip>
@@ -121,10 +121,6 @@
import TaskRunDetails from "../logs/TaskRunDetails.vue";
import Download from "vue-material-design-icons/Download.vue";
import ContentCopy from "vue-material-design-icons/ContentCopy.vue";
import UnfoldMoreHorizontal from "vue-material-design-icons/UnfoldMoreHorizontal.vue";
import UnfoldLessHorizontal from "vue-material-design-icons/UnfoldLessHorizontal.vue";
import ViewList from "vue-material-design-icons/ViewList.vue";
import ViewGrid from "vue-material-design-icons/ViewGrid.vue";
import Kicon from "../Kicon.vue";
import LogLevelNavigator from "../logs/LogLevelNavigator.vue";
import {DynamicScroller, DynamicScrollerItem} from "vue-virtual-scroller";
@@ -139,7 +135,6 @@
import {mapStores} from "pinia";
import {useExecutionsStore} from "../../stores/executions";
import KSFilter from "../filter/components/KSFilter.vue";
import {storageKeys} from "../../utils/constants";
export default {
components: {
@@ -162,7 +157,7 @@
level: undefined,
filter: undefined,
openedTaskrunsCount: 0,
raw_view: (localStorage.getItem(storageKeys.LOGS_VIEW_TYPE) ?? "false").toLowerCase() === "true",
raw_view: false,
logIndicesByLevel: Object.fromEntries(LogUtils.levelOrLower(undefined).map(level => [level, []])),
logCursor: undefined
};
@@ -214,12 +209,6 @@
logDisplayButtonText() {
return this.openedTaskrunsCount === 0 ? this.$t("expand all") : this.$t("collapse all")
},
logDisplayButtonIcon() {
return this.openedTaskrunsCount === 0 ? UnfoldMoreHorizontal : UnfoldLessHorizontal;
},
logViewTypeButtonIcon() {
return this.raw_view ? ViewGrid : ViewList;
},
currentLevelOrLower() {
return LogUtils.levelOrLower(this.level);
},
@@ -303,7 +292,6 @@
toggleViewType() {
this.logCursor = undefined;
this.raw_view = !this.raw_view;
localStorage.setItem(storageKeys.LOGS_VIEW_TYPE, String(this.raw_view));
},
sortLogsByViewOrder(a, b) {
const aSplit = a.split("/");

View File

@@ -18,10 +18,10 @@
</template>
<template #footer>
<el-button @click="onCancel">
<el-button @click="isOpen = false">
{{ t("cancel") }}
</el-button>
<el-button type="primary" :loading="isSaving" @click="setLabels()">
<el-button type="primary" @click="setLabels()">
{{ t("ok") }}
</el-button>
</template>
@@ -32,7 +32,7 @@
<el-form-item :label="t('execution labels')">
<LabelInput
v-model:labels="executionLabels"
:existingLabels="executionLabels"
:existingLabels="execution.labels"
/>
</el-form-item>
</el-form>
@@ -86,7 +86,6 @@
const isOpen = ref(false);
const executionLabels = ref<Label[]>([]);
const isSaving = ref(false);
const enabled = computed(() => {
if (
@@ -101,12 +100,6 @@
return !State.isRunning(props.execution.state.current);
});
const onCancel = () => {
// discard temp and close dialog without mutating parent
isOpen.value = false;
executionLabels.value = [];
};
const setLabels = async () => {
const filtered = filterValidLabels(executionLabels.value);
@@ -115,42 +108,31 @@
return;
}
isSaving.value = true;
isOpen.value = false;
try {
const response = await executionsStore.setLabels({
labels: filtered.labels,
executionId: props.execution.id,
});
if (response && response.data) {
executionsStore.execution = response.data;
}
executionsStore.execution = response.data;
toast.success(t("Set labels done"));
// close and clear only after success
isOpen.value = false;
executionLabels.value = [];
} catch (err) {
console.error(err); // keep dialog open so user can fix / retry
} finally {
isSaving.value = false;
console.error(err); // Error handling is done by the store/interceptor
}
};
// initialize the temp clone only when opening the dialog
watch(isOpen, (open) => {
if (open) {
const toIgnore = miscStore.configs?.hiddenLabelsPrefixes || [];
const source = props.execution.labels || [];
watch(isOpen, () => {
executionLabels.value = [];
// deep clone so child edits never mutate the original
executionLabels.value = JSON.parse(JSON.stringify(source || []))
.filter((label: Label) => !toIgnore.some((prefix: string) => label.key?.startsWith(prefix)));
const toIgnore = miscStore.configs?.hiddenLabelsPrefixes || [];
} else {
// when dialog closed, clear temp state (safe-guard)
executionLabels.value = [];
if (props.execution.labels) {
executionLabels.value = props.execution.labels.filter(
(label) =>
!toIgnore.some((prefix: string) =>
label.key?.startsWith(prefix),
),
);
}
});
</script>

View File

@@ -478,6 +478,8 @@
<style scoped lang="scss">
@import "@kestra-io/ui-libs/src/scss/variables";
$font-size-sm: $font-size-base * 0.875; // TODO: Move it into varaibles file of ui-libs
#overview {
:deep(.el-splitter-panel:has(> .sidebar:first-child)) {
background-color: var(--ks-background-table-row);

View File

@@ -25,18 +25,22 @@ export function applyDefaultFilters(
includeScope,
legacyQuery,
}: DefaultFilterOptions = {}): { query: LocationQuery, change: boolean } {
if(currentQuery && Object.keys(currentQuery).length > 0) {
return {
query: currentQuery,
change: false,
}
}
const query = {...currentQuery};
let change = false;
if (namespace === undefined && defaultNamespace() && !hasFilterKey(query, NAMESPACE_FILTER_PREFIX)) {
query[legacyQuery ? "namespace" : `${NAMESPACE_FILTER_PREFIX}[PREFIX]`] = defaultNamespace();
change = true;
}
if (includeScope && !hasFilterKey(query, SCOPE_FILTER_PREFIX)) {
query[legacyQuery ? "scope" : `${SCOPE_FILTER_PREFIX}[EQUALS]`] = "USER";
change = true;
}
const TIME_FILTER_KEYS = /startDate|endDate|timeRange/;
@@ -44,10 +48,9 @@ export function applyDefaultFilters(
if (includeTimeRange && !Object.keys(query).some(key => TIME_FILTER_KEYS.test(key))) {
const defaultDuration = useMiscStore().configs?.chartDefaultDuration ?? "P30D";
query[legacyQuery ? "timeRange" : `${TIME_RANGE_FILTER_PREFIX}[EQUALS]`] = defaultDuration;
change = true;
}
return {query, change};
return {query, change: true};
}
export function useDefaultFilter(

View File

@@ -20,9 +20,6 @@
import {useVueTour} from "../../composables/useVueTour";
import type {BlueprintType} from "../../stores/blueprints"
import {useAuthStore} from "../../override/stores/auth";
import permission from "../../models/permission";
import action from "../../models/action";
const route = useRoute();
const {t} = useI18n();
@@ -32,21 +29,13 @@
const blueprintsStore = useBlueprintsStore();
const coreStore = useCoreStore();
const flowStore = useFlowStore();
const authStore = useAuthStore();
const setupFlow = async () => {
const blueprintId = route.query.blueprintId as string;
const blueprintSource = route.query.blueprintSource as BlueprintType;
const implicitDefaultNamespace = authStore.user.getNamespacesForAction(
permission.FLOW,
action.CREATE,
)[0];
let flowYaml = "";
const id = getRandomID();
const selectedNamespace = (route.query.namespace as string)
?? defaultNamespace()
?? implicitDefaultNamespace
?? "company.team";
const selectedNamespace = (route.query.namespace as string) || defaultNamespace() || "company.team";
if (route.query.copy && flowStore.flow) {
flowYaml = flowStore.flow.source;

View File

@@ -10,15 +10,15 @@
</el-alert>
</div>
<el-form labelPosition="top" :model="inputs" ref="form" @submit.prevent="false">
<InputsForm
:initialInputs="flow.inputs"
:selectedTrigger="selectedTrigger"
:flow="flow"
<InputsForm
:initialInputs="flow.inputs"
:selectedTrigger="selectedTrigger"
:flow="flow"
v-model="inputs"
:executeClicked="executeClicked"
@confirm="onSubmit($refs.form)"
@update:model-value-no-default="values => inputsNoDefaults=values"
@update:checks="values => checks=values"
@update:model-value-no-default="values => inputsNoDefaults=values"
@update:checks="values => checks=values"
/>
<el-collapse v-model="collapseName">
@@ -192,7 +192,6 @@
if (formRef && this.flowCanBeExecuted) {
this.checks = [];
this.executeClicked = false;
this.coreStore.message = null;
formRef.validate((valid) => {
if (!valid) {
return false;
@@ -208,7 +207,7 @@
this.executionLabels
.filter(label => label.key && label.value)
.map(label => `${label.key}:${label.value}`)
), "system.from:ui"],
)],
scheduleDate: this.scheduleDate
});
} else {
@@ -221,7 +220,7 @@
this.executionLabels
.filter(label => label.key && label.value)
.map(label => `${label.key}:${label.value}`)
), "system.from:ui"],
)],
scheduleDate: this.$moment(this.scheduleDate).tz(localStorage.getItem(storageKeys.TIMEZONE_STORAGE_KEY) ?? moment.tz.guess()).toISOString(true),
nextStep: true,
});

View File

@@ -176,20 +176,13 @@
:label="t('last execution date')"
>
<template #default="scope">
<router-link
v-if="lastExecutionByFlowReady && getLastExecution(scope.row)"
:to="{
name: 'executions/update',
params: {
namespace: scope.row.namespace,
flowId: scope.row.id,
id: getLastExecution(scope.row).id
}
}"
class="table-link"
>
<DateAgo :date="getLastExecution(scope.row)?.startDate" inverted />
</router-link>
<DateAgo
v-if="lastExecutionByFlowReady"
:inverted="true"
:date="getLastExecution(scope.row)
?.startDate
"
/>
</template>
</el-table-column>
@@ -200,22 +193,10 @@
>
<template #default="scope">
<div
v-if="lastExecutionByFlowReady && getLastExecution(scope.row)"
v-if="lastExecutionByFlowReady && getLastExecution(scope.row)?.status"
class="d-flex justify-content-between align-items-center"
>
<router-link
:to="{
name: 'executions/update',
params: {
namespace: scope.row.namespace,
flowId: scope.row.id,
id: getLastExecution(scope.row).id
}
}"
class="table-link"
>
<Status :status="getLastExecution(scope.row).status" size="small" />
</router-link>
<Status :status="getLastExecution(scope.row)?.status" size="small" />
</div>
</template>
</el-table-column>
@@ -710,16 +691,4 @@
align-items: flex-end;
}
}
.table-link {
cursor: pointer;
& :deep(button) {
cursor: pointer !important;
}
&:hover {
text-decoration: none;
}
}
</style>

View File

@@ -13,7 +13,9 @@
@show="handlePopoverShow"
>
<template #reference>
<TaskIcon :onlyIcon="true" :cls="trigger?.type" :icons="pluginsStore.icons" />
<el-button class="trigger-icon" @click="copyLink(trigger)" size="small">
<TaskIcon :onlyIcon="true" :cls="trigger?.type" :icons="pluginsStore.icons" />
</el-button>
</template>
<template #default>
<TriggerVars :data="trigger" :execution="execution" @on-copy="copyLink(trigger)" />

View File

@@ -1,7 +1,7 @@
<template>
<span ref="rootContainer">
<!-- Valid -->
<el-button v-if="!errors && !warnings && !infos" v-bind="$attrs" :link="link" :size="size" type="default" class="success square" disabled>
<el-button v-if="!errors && !warnings &&!infos" v-bind="$attrs" :link="link" :size="size" type="default" class="success square" disabled>
<CheckBoldIcon class="text-success" />
</el-button>
@@ -157,7 +157,6 @@
}
&.success {
cursor: default;
border-color: var(--ks-border-success);
}

View File

@@ -5,19 +5,19 @@
<ValidationError
class="validation"
tooltipPlacement="bottom-start"
:errors="flowStore.flowErrors"
:errors="flowErrors"
:warnings="flowWarnings"
:infos="flowStore.flowInfos"
:infos="flowInfos"
/>
<EditorButtons
:isCreating="flowStore.isCreating"
:isReadOnly="flowStore.isReadOnly"
:isReadOnly="isReadOnly"
:canDelete="true"
:isAllowedEdit="flowStore.isAllowedEdit"
:isAllowedEdit="isAllowedEdit"
:haveChange="haveChange"
:flowHaveTasks="Boolean(flowStore.flowHaveTasks)"
:errors="flowStore.flowErrors"
:flowHaveTasks="Boolean(flowHaveTasks)"
:errors="flowErrors"
:warnings="flowWarnings"
@save="save"
@copy="
@@ -49,6 +49,7 @@
import ValidationError from "../flows/ValidationError.vue";
import localUtils from "../../utils/utils";
import {useFlowOutdatedErrors} from "./flowOutdatedErrors";
import {useFlowStore} from "../../stores/flow";
import {useToast} from "../../utils/toast";
@@ -72,14 +73,22 @@
const route = useRoute()
const routeParams = computed(() => route.params)
const {translateError, translateErrorWithKey} = useFlowOutdatedErrors();
// If playground is not defined, enable it by default
const isSettingsPlaygroundEnabled = computed(() => localStorage.getItem("editorPlayground") === "false" ? false : true);
const isReadOnly = computed(() => flowStore.isReadOnly)
const isAllowedEdit = computed(() => flowStore.isAllowedEdit)
const flowHaveTasks = computed(() => flowStore.flowHaveTasks)
const flowErrors = computed(() => flowStore.flowErrors?.map(translateError));
const flowInfos = computed(() => flowStore.flowInfos)
const toast = useToast();
const flowWarnings = computed(() => {
const outdatedWarning =
flowStore.flowValidation?.outdated && !flowStore.isCreating
? flowStore.flowValidation?.constraints?.split(", ") ?? []
? [translateErrorWithKey(flowStore.flowValidation?.constraints ?? "")]
: [];
const deprecationWarnings =

View File

@@ -86,7 +86,6 @@
import {computed, onActivated, onMounted, ref, provide, onBeforeUnmount, watch, InjectionKey, inject} from "vue";
import {useRoute, useRouter} from "vue-router";
import {apiUrl} from "override/utils/route";
import type * as monaco from "monaco-editor/esm/vs/editor/editor.api";
import {EDITOR_CURSOR_INJECTION_KEY, EDITOR_WRAPPER_INJECTION_KEY} from "../no-code/injectionKeys";
import {usePluginsStore} from "../../stores/plugins";
@@ -166,6 +165,7 @@
pluginsStore.lazyLoadSchemaType({type: "flow"});
}
loadFile();
loadPluginsHash();
window.addEventListener("keydown", handleGlobalSave);
window.addEventListener("keydown", toggleAiShortcut);
if(route.query.ai === "open") {
@@ -215,15 +215,15 @@
const isCreating = computed(() => flowStore.isCreating);
const timeout = ref<any>(null);
const hash = ref<any>(null);
const editorContent = computed(() => {
return draftSource.value ?? source.value;
});
const pluginsStore = usePluginsStore();
const namespacesStore = useNamespacesStore();
const miscStore = useMiscStore();
const hash = computed<number>(() => miscStore.configs?.pluginsHash ?? 0);
const editorScrollKey = computed(() => {
if (props.flow) {
@@ -238,6 +238,11 @@
return undefined;
});
function loadPluginsHash() {
miscStore.loadConfigs().then(config => {
hash.value = config.pluginsHash;
});
}
const updateContent = inject(FILES_UPDATE_CONTENT_INJECTION_KEY);
@@ -274,10 +279,34 @@
clearTimeout(timeout.value);
});
function updatePluginDocumentation(event: {position: monaco.Position, model: monaco.editor.ITextModel}) {
const cls = YAML_UTILS.getTypeAtPosition(source.value, event.position, pluginsStore.allTypes);
const version = YAML_UTILS.getVersionAtPosition(source.value, event.position);
pluginsStore.updateDocumentation({cls, version, hash: hash.value});
function updatePluginDocumentation(event: any) {
const source = event.model.getValue();
const cursorOffset = event.model.getOffsetAt(event.position);
const isPlugin = (type: string) => pluginsStore.allTypes.includes(type);
const isInRange = (range: [number, number, number]) =>
cursorOffset >= range[0] && cursorOffset <= range[2];
const getRangeSize = (range: [number, number, number]) => range[2] - range[0];
const getElementFromRange = (typeElement: any) => {
const wrapper = YAML_UTILS.localizeElementAtIndex(source, typeElement.range[0]);
return wrapper?.value?.type && isPlugin(wrapper.value.type)
? wrapper.value
: {type: typeElement.type};
};
const selectedElement = YAML_UTILS.extractFieldFromMaps(source, "type", () => true, isPlugin)
.filter(el => el.range && isInRange(el.range))
.reduce((closest, current) =>
!closest || getRangeSize(current.range) < getRangeSize(closest.range)
? current
: closest
, null as any);
let result = selectedElement ? getElementFromRange(selectedElement) : undefined;
result = {...result, hash: hash.value, forceRefresh: true};
pluginsStore.updateDocumentation(result as Parameters<typeof pluginsStore.updateDocumentation>[0]);
};
const saveFlowYaml = async () => {

View File

@@ -86,8 +86,8 @@
flowStore.flowYaml = source
const result = await flowStore.onEdit({
source,
currentIsFlow,
editorViewType: "YAML",
topologyVisible: true,
})
if (currentIsFlow && source) {

View File

@@ -0,0 +1,22 @@
import {useI18n} from "vue-i18n";
export function useFlowOutdatedErrors(){
const {t} = useI18n();
function translateError(error: string): string {
if(error.startsWith(">>>>")){
const key = error.substring(4).trim();
return translateErrorWithKey(key);
} else {
return error;
}
}
function translateErrorWithKey(key: string): string {
return `${t(key + ".description")} ${t(key + ".details")}`
}
return {
translateError,
translateErrorWithKey
}
}

View File

@@ -20,7 +20,7 @@
overflow-y: auto;
@media screen and (max-width: 992px) {
align-items: stretch;
align-items: flex-start;
}
}
</style>

View File

@@ -4,8 +4,8 @@
<div class="d-flex flex-column align-items-center gap-2 px-2">
<img :src>
<h2>{{ $t(`empty.${props.type}.title`) }}</h2>
<p v-html="$t(`empty.${props.type}.content`)" />
<h2>{{ t(`empty.${props.type}.title`) }}</h2>
<p v-html="t(`empty.${props.type}.content`)" />
<slot name="button" />
</div>
@@ -16,11 +16,13 @@
<script setup lang="ts">
import {computed} from "vue";
import {images} from "./images";
const props = defineProps({type: {type: String, required: true}});
import {useI18n} from "vue-i18n";
const {t} = useI18n({useScope: "global"});
import {images} from "./images";
const src = computed((): string => images[props.type]);
</script>

View File

@@ -29,7 +29,6 @@
filter=""
:excludeMetas="isFlowEdit ? ['namespace', 'flowId'] : []"
:log="log"
:class="{'log-0': i === 0}"
/>
</div>

View File

@@ -11,7 +11,7 @@
remote
remoteShowSuffix
:remoteMethod="onSearch"
:placeholder="$t('namespaces')"
:placeholder="t('namespaces')"
:suffixIcon="readOnly ? Lock : undefined"
>
<template #tag>
@@ -37,11 +37,14 @@
<script setup lang="ts">
import {computed, onMounted} from "vue"
import {useI18n} from "vue-i18n"
import {useNamespacesStore} from "override/stores/namespaces"
import DotsSquare from "vue-material-design-icons/DotsSquare.vue"
import Lock from "vue-material-design-icons/Lock.vue";
import {defaultNamespace} from "../../../composables/useNamespaces";
const {t} = useI18n();
withDefaults(defineProps<{
multiple?: boolean,
readOnly?: boolean,

View File

@@ -2,13 +2,16 @@
<button @click="emit('add', what)" class="py-2 adding" type="button">
{{
what
? $t("no_code.adding", {what})
: $t("no_code.adding_default")
? t("no_code.adding", {what})
: t("no_code.adding_default")
}}
</button>
</template>
<script setup lang="ts">
import {useI18n} from "vue-i18n";
const {t} = useI18n({useScope: "global"});
const emit = defineEmits<{
(e: "add", what: string | undefined): void;
}>();

View File

@@ -19,7 +19,7 @@
</el-form>
<div @click="() => onTaskEditorClick(taskModel)">
<TaskObject
v-loading="isLoading || isPluginSchemaLoading"
v-loading="isLoading"
v-if="(selectedTaskType || !isTaskDefinitionBasedOnType) && schema"
name="root"
:modelValue="taskModel"
@@ -51,7 +51,6 @@
import {getValueAtJsonPath, resolve$ref} from "../../../utils/utils";
import PlaygroundRunTaskButton from "../../inputs/PlaygroundRunTaskButton.vue";
import isEqual from "lodash/isEqual";
import {useMiscStore} from "../../../override/stores/misc";
const {t} = useI18n();
@@ -165,7 +164,7 @@
// when tab is opened, load the documentation
onActivated(() => {
if(selectedTaskType.value && parentPath !== "inputs"){
pluginsStore.updateDocumentation({type: selectedTaskType.value, ...taskModel.value});
pluginsStore.updateDocumentation(taskModel.value as Parameters<typeof pluginsStore.updateDocumentation>[0]);
}
});
@@ -217,24 +216,6 @@
return typeMap.value[selectedTaskType.value ?? ""] || [];
});
const versionedSchema = ref<Schemas|undefined>()
const isPluginSchemaLoading = ref(false)
watch([selectedTaskType, resolvedTypes], async ([val, types]) => {
if(types.length > 1 && val){
isPluginSchemaLoading.value = true;
try{
const {schema} = await pluginsStore.load({
cls: val,
version: taskModel.value?.version,
})
versionedSchema.value = schema?.properties
} finally {
isPluginSchemaLoading.value = false;
}
}
}, {immediate: true});
const resolvedType = computed<string>(() => {
if(resolvedTypes.value.length > 1 && selectedTaskType.value){
// find the resolvedType that match the current dataType
@@ -280,9 +261,9 @@
});
const resolvedLocalSchema = computed(() => {
return versionedSchema.value ?? (isTaskDefinitionBasedOnType.value
return isTaskDefinitionBasedOnType.value
? definitions.value?.[resolvedType.value] ?? {}
: schemaAtBlockPath.value)
: schemaAtBlockPath.value
});
const resolvedProperties = computed<Schemas["properties"] | undefined>(() => {
@@ -389,12 +370,10 @@
onTaskInput(value);
}
const miscStore = useMiscStore();
const hash = computed(() => miscStore.configs?.pluginsHash ?? 0);
const onTaskEditorClick = inject(ON_TASK_EDITOR_CLICK_INJECTION_KEY, (elt?: PartialNoCodeElement) => {
if(isPlugin.value && elt?.type){
pluginsStore.updateDocumentation({cls: elt.type, version: elt.version, hash: hash.value});
const type = elt?.type;
if(isPlugin.value && type){
pluginsStore.updateDocumentation({type});
}else{
pluginsStore.updateDocumentation();
}

Some files were not shown because too many files have changed in this diff Show More