mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
84 Commits
plugin/tem
...
docs/purge
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42c8334e2e | ||
|
|
7ea95f393e | ||
|
|
6935900699 | ||
|
|
123d7fb426 | ||
|
|
0bc8e8d74a | ||
|
|
e0c3cfa1f9 | ||
|
|
7f77b24ae0 | ||
|
|
ec6820dc25 | ||
|
|
d94193c143 | ||
|
|
c9628047fa | ||
|
|
4cbc069af4 | ||
|
|
eabe573fe6 | ||
|
|
ecd64617c3 | ||
|
|
a5650bca0f | ||
|
|
ed59e262d4 | ||
|
|
a5f9d54f7d | ||
|
|
47f4f43198 | ||
|
|
5d31c97f7f | ||
|
|
f8107285c4 | ||
|
|
8dc8dc1796 | ||
|
|
834dfd2947 | ||
|
|
6edb88841f | ||
|
|
5653531628 | ||
|
|
ee61276106 | ||
|
|
abcf76f7b4 | ||
|
|
67ada7f61b | ||
|
|
0c13633f77 | ||
|
|
a6cf2015ff | ||
|
|
2f9216c70b | ||
|
|
1903e6fac5 | ||
|
|
2d2cb00cab | ||
|
|
01b5441d16 | ||
|
|
efc778e294 | ||
|
|
60235a4e73 | ||
|
|
b167c52e76 | ||
|
|
216b124294 | ||
|
|
b6e4df8de2 | ||
|
|
429e7c7945 | ||
|
|
e302b4be4a | ||
|
|
8e7ad9ae25 | ||
|
|
41a11abf16 | ||
|
|
1be16d5e9d | ||
|
|
e263224d7b | ||
|
|
12b89588a6 | ||
|
|
eae5eb80cb | ||
|
|
c0f6298484 | ||
|
|
ba1d6b2232 | ||
|
|
048dcb80cc | ||
|
|
a81de811d7 | ||
|
|
a960a9f982 | ||
|
|
c4d4fd935f | ||
|
|
f063a5a2d9 | ||
|
|
ac91d5605f | ||
|
|
e3d3c3651b | ||
|
|
5b6836237e | ||
|
|
2f8284b133 | ||
|
|
42992fd7c3 | ||
|
|
3a481f93d3 | ||
|
|
7e964ae563 | ||
|
|
25e54edbc9 | ||
|
|
e88dc7af76 | ||
|
|
b7a027f0dc | ||
|
|
98141d6010 | ||
|
|
bf119ab6df | ||
|
|
9bd6353b77 | ||
|
|
c0ab581cf1 | ||
|
|
0f38e19663 | ||
|
|
0c14ea621c | ||
|
|
fb14e57a7c | ||
|
|
09c707d865 | ||
|
|
86e08d71dd | ||
|
|
94c00cedeb | ||
|
|
eb12832b1e | ||
|
|
687cefdfb9 | ||
|
|
8eae8aba72 | ||
|
|
abdbb8d364 | ||
|
|
8a55ab3af6 | ||
|
|
b7cb933e1e | ||
|
|
3af003e5e4 | ||
|
|
c3861a5532 | ||
|
|
ae1f10f45a | ||
|
|
612dccfb8c | ||
|
|
2ae8df2f5f | ||
|
|
1abfa74a16 |
6
.github/dependabot.yml
vendored
6
.github/dependabot.yml
vendored
@@ -51,7 +51,7 @@ updates:
|
||||
|
||||
storybook:
|
||||
applies-to: version-updates
|
||||
patterns: ["storybook*", "@storybook/*"]
|
||||
patterns: ["storybook*", "@storybook/*", "eslint-plugin-storybook"]
|
||||
|
||||
vitest:
|
||||
applies-to: version-updates
|
||||
@@ -67,10 +67,10 @@ updates:
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of these packages from major updates
|
||||
"eslint-plugin-storybook",
|
||||
"eslint-plugin-vue",
|
||||
]
|
||||
|
||||
@@ -84,6 +84,7 @@ updates:
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of these packages from minor updates
|
||||
@@ -102,6 +103,7 @@ updates:
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
]
|
||||
|
||||
@@ -29,8 +29,8 @@ start_time2=$(date +%s)
|
||||
|
||||
echo "cd ./ui"
|
||||
cd ./ui
|
||||
echo "npm i"
|
||||
npm i
|
||||
echo "npm ci"
|
||||
npm ci
|
||||
|
||||
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
||||
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
|
||||
@@ -21,7 +21,7 @@ plugins {
|
||||
|
||||
// test
|
||||
id "com.adarshr.test-logger" version "4.0.0"
|
||||
id "org.sonarqube" version "7.2.0.6526"
|
||||
id "org.sonarqube" version "7.2.1.6560"
|
||||
id 'jacoco-report-aggregation'
|
||||
|
||||
// helper
|
||||
@@ -331,7 +331,7 @@ subprojects {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
agent "org.aspectj:aspectjweaver:1.9.25"
|
||||
agent "org.aspectj:aspectjweaver:1.9.25.1"
|
||||
}
|
||||
|
||||
test {
|
||||
|
||||
@@ -82,8 +82,8 @@ dependencies {
|
||||
testImplementation "io.micronaut:micronaut-http-server-netty"
|
||||
testImplementation "io.micronaut:micronaut-management"
|
||||
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.3"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.4"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.4"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on"
|
||||
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.docs;
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@@ -117,10 +118,17 @@ public class Plugin {
|
||||
.filter(not(io.kestra.core.models.Plugin::isInternal))
|
||||
.filter(clazzFilter)
|
||||
.filter(c -> !c.getName().startsWith("org.kestra."))
|
||||
.map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null))
|
||||
.map(c -> {
|
||||
Schema schema = c.getAnnotation(Schema.class);
|
||||
|
||||
var title = Optional.ofNullable(schema).map(Schema::title).filter(t -> !t.isEmpty()).orElse(null);
|
||||
var description = Optional.ofNullable(schema).map(Schema::description).filter(d -> !d.isEmpty()).orElse(null);
|
||||
var deprecated = io.kestra.core.models.Plugin.isDeprecated(c) ? true : null;
|
||||
|
||||
return new PluginElementMetadata(c.getName(), deprecated, title, description);
|
||||
})
|
||||
.toList();
|
||||
}
|
||||
|
||||
public record PluginElementMetadata(String cls, Boolean deprecated) {
|
||||
}
|
||||
public record PluginElementMetadata(String cls, Boolean deprecated, String title, String description) {}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ public record Label(
|
||||
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
|
||||
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
|
||||
public static final String TEST = SYSTEM_PREFIX + "test";
|
||||
public static final String FROM = SYSTEM_PREFIX + "from";
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a nested map.
|
||||
|
||||
@@ -16,6 +16,7 @@ import jakarta.validation.constraints.NotNull;
|
||||
public class Setting {
|
||||
public static final String INSTANCE_UUID = "instance.uuid";
|
||||
public static final String INSTANCE_VERSION = "instance.version";
|
||||
public static final String INSTANCE_EDITION = "instance.edition";
|
||||
|
||||
@NotNull
|
||||
private String key;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
@@ -130,7 +129,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
@Valid
|
||||
@PluginProperty
|
||||
List<SLA> sla;
|
||||
|
||||
|
||||
@Schema(
|
||||
title = "Conditions evaluated before the flow is executed.",
|
||||
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
|
||||
@@ -355,7 +354,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
* To be conservative a flow MUST not return any source.
|
||||
*/
|
||||
@Override
|
||||
@JsonIgnore
|
||||
@Schema(hidden = true)
|
||||
public String getSource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@@ -48,7 +46,7 @@ public class FlowWithSource extends Flow {
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore(value = false)
|
||||
@Schema(hidden = false)
|
||||
public String getSource() {
|
||||
return this.source;
|
||||
}
|
||||
|
||||
@@ -82,6 +82,12 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean failOnTriggerError = false;
|
||||
|
||||
@PluginProperty(group = PluginProperty.CORE_GROUP)
|
||||
@Schema(
|
||||
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
|
||||
)
|
||||
private boolean allowConcurrent = false;
|
||||
|
||||
/**
|
||||
* For backward compatibility: we rename minLogLevel to logLevel.
|
||||
* @deprecated use {@link #logLevel} instead
|
||||
|
||||
@@ -1,22 +1,37 @@
|
||||
package io.kestra.core.models.triggers;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
public interface Schedulable extends PollingTriggerInterface{
|
||||
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
Map<String, Object> getInputs();
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
/**
|
||||
* Compute the previous evaluation of a trigger.
|
||||
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
|
||||
*/
|
||||
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
|
||||
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
|
||||
/**
|
||||
* Load the default RecoverMissedSchedules from plugin property, or else ALL.
|
||||
*/
|
||||
|
||||
@@ -172,7 +172,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
|
||||
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
|
||||
try {
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.empty());
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, lastTrigger);
|
||||
} catch (InvalidTriggerConfigurationException e) {
|
||||
disabled = true;
|
||||
}
|
||||
|
||||
@@ -6,12 +6,9 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
|
||||
public abstract class TriggerService {
|
||||
@@ -51,58 +48,6 @@ public abstract class TriggerService {
|
||||
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
|
||||
}
|
||||
|
||||
public static Execution generateScheduledExecution(
|
||||
AbstractTrigger trigger,
|
||||
ConditionContext conditionContext,
|
||||
TriggerContext context,
|
||||
List<Label> labels,
|
||||
Map<String, Object> inputs,
|
||||
Map<String, Object> variables,
|
||||
Optional<ZonedDateTime> scheduleDate
|
||||
) {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
|
||||
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
||||
}
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(context.getTenantId())
|
||||
.namespace(context.getNamespace())
|
||||
.flowId(context.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.variables(conditionContext.getFlow().getVariables())
|
||||
.labels(executionLabels)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
|
||||
.build();
|
||||
|
||||
Map<String, Object> allInputs = new HashMap<>();
|
||||
// add flow inputs with default value
|
||||
var flow = conditionContext.getFlow();
|
||||
if (flow.getInputs() != null) {
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null)
|
||||
.forEach(input -> allInputs.put(input.getId(), input.getDefaults()));
|
||||
}
|
||||
|
||||
if (inputs != null) {
|
||||
allInputs.putAll(inputs);
|
||||
}
|
||||
|
||||
// add inputs and inject defaults
|
||||
if (!allInputs.isEmpty()) {
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
|
||||
}
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
private static Execution generateExecution(
|
||||
String id,
|
||||
AbstractTrigger trigger,
|
||||
@@ -111,6 +56,7 @@ public abstract class TriggerService {
|
||||
ConditionContext conditionContext
|
||||
) {
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
|
||||
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, id));
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
public interface SettingRepositoryInterface {
|
||||
Optional<Setting> findByKey(String key);
|
||||
@@ -13,5 +13,7 @@ public interface SettingRepositoryInterface {
|
||||
|
||||
Setting save(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting internalSave(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting delete(Setting setting);
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@ import java.util.function.Function;
|
||||
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
|
||||
Optional<Trigger> findLast(TriggerContext trigger);
|
||||
|
||||
Optional<Trigger> findByExecution(Execution execution);
|
||||
|
||||
Optional<Trigger> findByUid(String uid);
|
||||
|
||||
List<Trigger> findAll(String tenantId);
|
||||
|
||||
List<Trigger> findAllForAllTenants();
|
||||
|
||||
@@ -6,10 +6,12 @@ import com.google.common.base.CaseFormat;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
import io.kestra.core.services.KVStoreService;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -235,6 +237,14 @@ public class DefaultRunContext extends RunContext {
|
||||
return runContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RunContext cloneForPlugin(Plugin plugin) {
|
||||
PluginConfigurations pluginConfigurations = applicationContext.getBean(PluginConfigurations.class);
|
||||
DefaultRunContext runContext = clone();
|
||||
runContext.pluginConfiguration = pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass());
|
||||
return runContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@@ -589,6 +599,11 @@ public class DefaultRunContext extends RunContext {
|
||||
return localPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputAndOutput inputAndOutput() {
|
||||
return new InputAndOutputImpl(this.applicationContext, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder class for constructing new {@link DefaultRunContext} objects.
|
||||
*/
|
||||
|
||||
@@ -189,12 +189,11 @@ public final class ExecutableUtils {
|
||||
variables.put("taskRunIteration", currentTaskRun.getIteration());
|
||||
}
|
||||
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
|
||||
Execution execution = Execution
|
||||
.newExecution(
|
||||
flow,
|
||||
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
|
||||
(f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs),
|
||||
newLabels,
|
||||
Optional.empty())
|
||||
.withTrigger(ExecutionTrigger.builder()
|
||||
|
||||
@@ -3,13 +3,11 @@ package io.kestra.core.runners;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.flows.RenderableInput;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
@@ -539,30 +537,6 @@ public class FlowInputOutput {
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutable wrapper to hold a flow's input, and it's resolved value.
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* InputAndOutput could be used to work with flow execution inputs and outputs.
|
||||
*/
|
||||
public interface InputAndOutput {
|
||||
/**
|
||||
* Reads the inputs of a flow execution.
|
||||
*/
|
||||
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
|
||||
|
||||
/**
|
||||
* Processes the outputs of a flow execution (parse them based on their types).
|
||||
*/
|
||||
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
|
||||
|
||||
/**
|
||||
* Render flow execution outputs.
|
||||
*/
|
||||
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
class InputAndOutputImpl implements InputAndOutput {
|
||||
private final FlowInputOutput flowInputOutput;
|
||||
private final RunContext runContext;
|
||||
|
||||
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
|
||||
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
|
||||
this.runContext = runContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
|
||||
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
|
||||
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
@@ -204,4 +205,15 @@ public abstract class RunContext implements PropertyContext {
|
||||
* when Namespace ACLs are used (EE).
|
||||
*/
|
||||
public abstract AclChecker acl();
|
||||
|
||||
/**
|
||||
* Clone this run context for a specific plugin.
|
||||
* @return a new run context with the plugin configuration of the given plugin.
|
||||
*/
|
||||
public abstract RunContext cloneForPlugin(Plugin plugin);
|
||||
|
||||
/**
|
||||
* @return an InputAndOutput that can be used to work with inputs and outputs.
|
||||
*/
|
||||
public abstract InputAndOutput inputAndOutput();
|
||||
}
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
@@ -53,20 +51,6 @@ public class RunContextInitializer {
|
||||
@Value("${kestra.encryption.secret-key}")
|
||||
protected Optional<String> secretKey;
|
||||
|
||||
/**
|
||||
* Initializes the given {@link RunContext} for the given {@link Plugin}.
|
||||
*
|
||||
* @param runContext The {@link RunContext} to initialize.
|
||||
* @param plugin The {@link TaskRunner} used for initialization.
|
||||
* @return The {@link RunContext} to initialize
|
||||
*/
|
||||
public DefaultRunContext forPlugin(final DefaultRunContext runContext,
|
||||
final Plugin plugin) {
|
||||
runContext.init(applicationContext);
|
||||
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass()));
|
||||
return runContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor.
|
||||
*
|
||||
|
||||
@@ -55,11 +55,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
|
||||
public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) {
|
||||
if (logEntry.getTaskId() != null) {
|
||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTaskId();
|
||||
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTaskId();
|
||||
} else if (logEntry.getTriggerId() != null) {
|
||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTriggerId();
|
||||
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTriggerId();
|
||||
} else {
|
||||
this.loggerName = "flow." + logEntry.getFlowId();
|
||||
this.loggerName = baseLoggerName(logEntry);
|
||||
}
|
||||
|
||||
this.logQueue = logQueue;
|
||||
@@ -68,6 +68,10 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
this.logToFile = logToFile;
|
||||
}
|
||||
|
||||
private String baseLoggerName(LogEntry logEntry) {
|
||||
return "flow." + logEntry.getTenantId() + "." + logEntry.getNamespace() + "." + logEntry.getFlowId();
|
||||
}
|
||||
|
||||
private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) {
|
||||
Iterable<String> split;
|
||||
|
||||
|
||||
@@ -81,7 +81,24 @@ public final class YamlParser {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static String formatYamlErrorMessage(String originalMessage, JsonProcessingException e) {
|
||||
StringBuilder friendlyMessage = new StringBuilder();
|
||||
if (originalMessage.contains("Expected a field name")) {
|
||||
friendlyMessage.append("YAML syntax error: Invalid structure. Check indentation and ensure all fields are properly formatted.");
|
||||
} else if (originalMessage.contains("MappingStartEvent")) {
|
||||
friendlyMessage.append("YAML syntax error: Unexpected mapping start. Verify that scalar values are properly quoted if needed.");
|
||||
} else if (originalMessage.contains("Scalar value")) {
|
||||
friendlyMessage.append("YAML syntax error: Expected a simple value but found complex structure. Check for unquoted special characters.");
|
||||
} else {
|
||||
friendlyMessage.append("YAML parsing error: ").append(originalMessage.replaceAll("org\\.yaml\\.snakeyaml.*", "").trim());
|
||||
}
|
||||
if (e.getLocation() != null) {
|
||||
int line = e.getLocation().getLineNr();
|
||||
friendlyMessage.append(String.format(" (at line %d)", line));
|
||||
}
|
||||
// Return a generic but cleaner message for other YAML errors
|
||||
return friendlyMessage.toString();
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
|
||||
@@ -121,11 +138,12 @@ public final class YamlParser {
|
||||
)
|
||||
));
|
||||
} else {
|
||||
String userFriendlyMessage = formatYamlErrorMessage(e.getMessage(), e);
|
||||
return new ConstraintViolationException(
|
||||
"Illegal " + resource + " source: " + e.getMessage(),
|
||||
"Illegal " + resource + " source: " + userFriendlyMessage,
|
||||
Collections.singleton(
|
||||
ManualConstraintViolation.of(
|
||||
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(),
|
||||
userFriendlyMessage,
|
||||
target,
|
||||
(Class<T>) target.getClass(),
|
||||
"yaml",
|
||||
@@ -136,4 +154,3 @@ public final class YamlParser {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.cronutils.utils.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
@@ -65,16 +64,6 @@ public class ConditionService {
|
||||
return this.valid(flow, conditions, conditionContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
*/
|
||||
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
|
||||
return conditions
|
||||
.stream()
|
||||
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
|
||||
@@ -92,7 +92,14 @@ public class FlowService {
|
||||
return flowRepository
|
||||
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
|
||||
}
|
||||
|
||||
private static String formatValidationError(String message) {
|
||||
if (message.startsWith("Illegal flow source:")) {
|
||||
// Already formatted by YamlParser, return as-is
|
||||
return message;
|
||||
}
|
||||
// For other validation errors, provide context
|
||||
return "Validation error: " + message;
|
||||
}
|
||||
/**
|
||||
* Evaluates all checks defined in the given flow using the provided inputs.
|
||||
* <p>
|
||||
@@ -174,10 +181,12 @@ public class FlowService {
|
||||
modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false));
|
||||
|
||||
} catch (ConstraintViolationException e) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
String friendlyMessage = formatValidationError(e.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} catch (FlowProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
if (e.getCause() instanceof ConstraintViolationException cve) {
|
||||
String friendlyMessage = formatValidationError(cve.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} else {
|
||||
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
||||
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
|
||||
@@ -579,4 +588,4 @@ public class FlowService {
|
||||
private IllegalStateException noRepositoryException() {
|
||||
return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
@@ -272,7 +271,13 @@ public class InternalStorage implements Storage {
|
||||
return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<StorageContext.Task> getTaskStorageContext() {
|
||||
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FileAttributes> list(URI uri) throws IOException {
|
||||
return this.storage.list(context.getTenantId(), context.getNamespace(), uri);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,4 +173,6 @@ public interface Storage {
|
||||
* @return the task storage context
|
||||
*/
|
||||
Optional<StorageContext.Task> getTaskStorageContext();
|
||||
|
||||
List<FileAttributes> list(URI uri) throws IOException;
|
||||
}
|
||||
|
||||
@@ -1,13 +1,39 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
public class EditionProvider {
|
||||
public Edition get() {
|
||||
return Edition.OSS;
|
||||
}
|
||||
|
||||
@Inject
|
||||
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
|
||||
|
||||
@PostConstruct
|
||||
void start() {
|
||||
// check the edition in the settings and update if needed, we didn't use it would allow us to detect incompatible update later if needed
|
||||
settingRepository.ifPresent(settingRepositoryInterface -> persistEdition(settingRepositoryInterface, get()));
|
||||
}
|
||||
|
||||
private void persistEdition(SettingRepositoryInterface settingRepositoryInterface, Edition edition) {
|
||||
Optional<Setting> versionSetting = settingRepositoryInterface.findByKey(Setting.INSTANCE_EDITION);
|
||||
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(edition)) {
|
||||
settingRepositoryInterface.save(Setting.builder()
|
||||
.key(Setting.INSTANCE_EDITION)
|
||||
.value(edition)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public enum Edition {
|
||||
OSS,
|
||||
EE
|
||||
|
||||
@@ -11,6 +11,11 @@ import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Utility class to create {@link java.util.concurrent.ExecutorService} with {@link java.util.concurrent.ExecutorService} instances.
|
||||
* WARNING: those instances will use the {@link ThreadUncaughtExceptionHandler} which terminates Kestra if an error occurs in any thread,
|
||||
* so it should not be used inside plugins.
|
||||
*/
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ExecutorsUtils {
|
||||
|
||||
@@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
/**
|
||||
* Utility class for logging
|
||||
* Utility class for server logging
|
||||
*/
|
||||
public final class Logs {
|
||||
|
||||
@@ -18,7 +18,7 @@ public final class Logs {
|
||||
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
|
||||
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
|
||||
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
|
||||
|
||||
|
||||
private Logs() {}
|
||||
|
||||
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
|
||||
@@ -29,7 +29,7 @@ public final class Logs {
|
||||
}
|
||||
|
||||
/**
|
||||
* Log an {@link Execution} via the execution logger named: 'execution.{flowId}'.
|
||||
* Log an {@link Execution} via the executor logger named: 'executor.{tenantId}.{namespace}.{flowId}'.
|
||||
*/
|
||||
public static void logExecution(Execution execution, Level level, String message, Object... args) {
|
||||
Logger logger = logger(execution);
|
||||
@@ -43,7 +43,7 @@ public final class Logs {
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a {@link TriggerContext} via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
|
||||
* Log a {@link TriggerContext} via the scheduler logger named: 'trigger.{tenantId}.{namespace}.{flowId}.{triggerId}'.
|
||||
*/
|
||||
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
|
||||
Logger logger = logger(triggerContext);
|
||||
@@ -57,7 +57,7 @@ public final class Logs {
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a {@link TaskRun} via the taskRun logger named: 'task.{flowId}.{taskId}'.
|
||||
* Log a {@link TaskRun} via the worker logger named: 'worker.{tenantId}.{namespace}.{flowId}.{taskId}'.
|
||||
*/
|
||||
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
|
||||
String prefix = TASKRUN_PREFIX_WITH_TENANT;
|
||||
@@ -73,19 +73,19 @@ public final class Logs {
|
||||
|
||||
private static Logger logger(TaskRun taskRun) {
|
||||
return LoggerFactory.getLogger(
|
||||
"task." + taskRun.getFlowId() + "." + taskRun.getTaskId()
|
||||
"worker." + taskRun.getTenantId() + "." + taskRun.getNamespace() + "." + taskRun.getFlowId() + "." + taskRun.getTaskId()
|
||||
);
|
||||
}
|
||||
|
||||
private static Logger logger(TriggerContext triggerContext) {
|
||||
return LoggerFactory.getLogger(
|
||||
"trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
|
||||
"scheduler." + triggerContext.getTenantId() + "." + triggerContext.getNamespace() + "." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
|
||||
);
|
||||
}
|
||||
|
||||
private static Logger logger(Execution execution) {
|
||||
return LoggerFactory.getLogger(
|
||||
"execution." + execution.getFlowId()
|
||||
"executor." + execution.getTenantId() + "." + execution.getNamespace() + "." + execution.getFlowId()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +120,10 @@ public class MapUtils {
|
||||
private static Collection<?> mergeCollections(Collection<?> colA, Collection<?> colB) {
|
||||
List<Object> merged = new ArrayList<>(colA.size() + colB.size());
|
||||
merged.addAll(colA);
|
||||
merged.addAll(colB);
|
||||
if (!colB.isEmpty()) {
|
||||
List<?> filtered = colB.stream().filter(it -> !colA.contains(it)).toList();
|
||||
merged.addAll(filtered);
|
||||
}
|
||||
return merged;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import io.kestra.core.models.executions.metrics.Counter;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.kestra.core.models.tasks.FileExistComportment;
|
||||
import io.kestra.core.models.tasks.NamespaceFiles;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.time.DurationFormatUtils;
|
||||
import org.apache.commons.lang3.time.StopWatch;
|
||||
@@ -19,28 +17,27 @@ import java.io.InputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
@Singleton
|
||||
public class NamespaceFilesUtils {
|
||||
@Inject
|
||||
private ExecutorsUtils executorsUtils;
|
||||
public final class NamespaceFilesUtils {
|
||||
private static final int maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
|
||||
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
|
||||
0,
|
||||
maxThreads,
|
||||
60L,
|
||||
TimeUnit.SECONDS,
|
||||
new SynchronousQueue<>(),
|
||||
new ThreadFactoryBuilder().setNameFormat("namespace-files").build()
|
||||
);;
|
||||
|
||||
private ExecutorService executorService;
|
||||
private int maxThreads;
|
||||
|
||||
@PostConstruct
|
||||
public void postConstruct() {
|
||||
this.maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
|
||||
this.executorService = executorsUtils.maxCachedThreadPool(maxThreads, "namespace-file");
|
||||
private NamespaceFilesUtils() {
|
||||
// utility class pattern
|
||||
}
|
||||
|
||||
public void loadNamespaceFiles(
|
||||
public static void loadNamespaceFiles(
|
||||
RunContext runContext,
|
||||
NamespaceFiles namespaceFiles
|
||||
)
|
||||
@@ -69,7 +66,7 @@ public class NamespaceFilesUtils {
|
||||
int parallelism = maxThreads / 2;
|
||||
Flux.fromIterable(matchedNamespaceFiles)
|
||||
.parallel(parallelism)
|
||||
.runOn(Schedulers.fromExecutorService(executorService))
|
||||
.runOn(Schedulers.fromExecutorService(EXECUTOR_SERVICE))
|
||||
.doOnNext(throwConsumer(nsFile -> {
|
||||
InputStream content = runContext.storage().getFile(nsFile.uri());
|
||||
Path path = folderPerNamespace ?
|
||||
|
||||
@@ -23,7 +23,6 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
import io.kestra.core.services.StorageService;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.StorageSplitInterface;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.validations.NoSystemLabelValidation;
|
||||
@@ -540,7 +539,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
|
||||
|
||||
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
|
||||
FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext));
|
||||
FileSerde.write(bos, runContext.inputAndOutput().renderOutputs(flow.getOutputs()));
|
||||
URI uri = runContext.storage().putFile(
|
||||
new ByteArrayInputStream(bos.toByteArray()),
|
||||
URI.create((String) taskRun.getOutputs().get("uri"))
|
||||
@@ -602,9 +601,8 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
String subflowOutputsBase = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);
|
||||
URI subflowOutputsBaseUri = URI.create(StorageContext.KESTRA_PROTOCOL + subflowOutputsBase + "/");
|
||||
|
||||
StorageInterface storage = ((DefaultRunContext) runContext).getApplicationContext().getBean(StorageInterface.class);
|
||||
if (storage.exists(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri)) {
|
||||
List<FileAttributes> list = storage.list(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri);
|
||||
if (runContext.storage().isFileExist(subflowOutputsBaseUri)) {
|
||||
List<FileAttributes> list = runContext.storage().list(subflowOutputsBaseUri);;
|
||||
|
||||
if (!list.isEmpty()) {
|
||||
// Merge outputs from each sub-flow into a single stored in the internal storage.
|
||||
|
||||
@@ -63,7 +63,8 @@ import java.util.*;
|
||||
|
||||
- id: run_post_approval
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
runner: PROCESS
|
||||
taskRunner:
|
||||
type: io.kestra.plugin.core.runner.Process
|
||||
commands:
|
||||
- echo "Manual approval received! Continuing the execution..."
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.ExecutableUtils;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.SubflowExecution;
|
||||
@@ -38,7 +37,6 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Collections;
|
||||
@@ -246,11 +244,11 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
|
||||
if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
|
||||
try {
|
||||
Map<String, Object> rOutputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext);
|
||||
var inputAndOutput = runContext.inputAndOutput();
|
||||
Map<String, Object> rOutputs = inputAndOutput.renderOutputs(subflowOutputs);
|
||||
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
rOutputs = flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
||||
if (flow.getOutputs() != null) {
|
||||
rOutputs = inputAndOutput.typedOutputs(flow, execution, rOutputs);
|
||||
}
|
||||
builder.outputs(rOutputs);
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -260,8 +260,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
||||
}
|
||||
|
||||
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
|
||||
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class);
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
||||
}
|
||||
|
||||
if (this.inputFiles != null) {
|
||||
|
||||
@@ -26,25 +26,28 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Delete expired keys globally for a specific namespace.",
|
||||
description = "This task will delete expired keys from the Kestra KV store. By default, it will only delete expired keys, but you can choose to delete all keys by setting `expiredOnly` to false. You can also filter keys by a specific pattern and choose to include child namespaces."
|
||||
title = "Purge namespace files for one or multiple namespaces.",
|
||||
description = "This task purges namespace files (and their versions) stored in Kestra. You can restrict the purge to specific namespaces (or a namespace glob pattern), optionally include child namespaces, and filter files by a glob pattern. The purge strategy is controlled via `behavior` (e.g. keep the last N versions and/or delete versions older than a given date)."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Delete expired keys globally for a specific namespace, with or without including child namespaces.",
|
||||
title = "Purge old versions of namespace files for a namespace tree.",
|
||||
full = true,
|
||||
code = """
|
||||
id: purge_kv_store
|
||||
id: purge_namespace_files
|
||||
namespace: system
|
||||
|
||||
|
||||
tasks:
|
||||
- id: purge_kv
|
||||
type: io.kestra.plugin.core.kv.PurgeKV
|
||||
expiredOnly: true
|
||||
- id: purge_files
|
||||
type: io.kestra.plugin.core.namespace.PurgeFiles
|
||||
namespaces:
|
||||
- company
|
||||
includeChildNamespaces: true
|
||||
filePattern: "**/*.sql"
|
||||
behavior:
|
||||
type: version
|
||||
before: "2025-01-01T00:00:00Z"
|
||||
"""
|
||||
)
|
||||
}
|
||||
@@ -116,7 +119,7 @@ public class PurgeFiles extends Task implements PurgeTask<NamespaceFile>, Runnab
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
@Schema(
|
||||
title = "The number of purged KV pairs"
|
||||
title = "The number of purged namespace file versions"
|
||||
)
|
||||
private Long size;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,107 @@
|
||||
package io.kestra.plugin.core.trigger;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.Backfill;
|
||||
import io.kestra.core.models.triggers.Schedulable;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.chrono.ChronoZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Factory class for constructing a new {@link Execution} from a {@link Schedulable} trigger.
|
||||
*
|
||||
* @see io.kestra.plugin.core.trigger.Schedule
|
||||
* @see io.kestra.plugin.core.trigger.ScheduleOnDates
|
||||
*/
|
||||
final class SchedulableExecutionFactory {
|
||||
|
||||
static Execution createFailedExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext) throws IllegalVariableEvaluationException {
|
||||
return Execution.builder()
|
||||
.id(conditionContext.getRunContext().getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.labels(SchedulableExecutionFactory.getLabels(trigger, conditionContext.getRunContext(), triggerContext.getBackfill(), conditionContext.getFlow()))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
}
|
||||
|
||||
static Execution createExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext, Map<String, Object> variables, ZonedDateTime scheduleDate) throws IllegalVariableEvaluationException {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of((AbstractTrigger) trigger, variables);
|
||||
|
||||
List<Label> labels = getLabels(trigger, runContext, triggerContext.getBackfill(), conditionContext.getFlow());
|
||||
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
||||
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
||||
}
|
||||
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.variables(conditionContext.getFlow().getVariables())
|
||||
.labels(executionLabels)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
.scheduleDate(Optional.ofNullable(scheduleDate).map(ChronoZonedDateTime::toInstant).orElse(null))
|
||||
.build();
|
||||
|
||||
Map<String, Object> allInputs = getInputs(trigger, runContext, triggerContext.getBackfill());
|
||||
|
||||
// add inputs and inject defaults (FlowInputOutput handles defaults internally)
|
||||
execution = execution.withInputs(runContext.inputAndOutput().readInputs(conditionContext.getFlow(), execution, allInputs));
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
private static Map<String, Object> getInputs(Schedulable trigger, RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
Map<String, Object> inputs = new HashMap<>();
|
||||
|
||||
if (trigger.getInputs() != null) {
|
||||
inputs.putAll(runContext.render(trigger.getInputs()));
|
||||
}
|
||||
|
||||
if (backfill != null && backfill.getInputs() != null) {
|
||||
inputs.putAll(runContext.render(backfill.getInputs()));
|
||||
}
|
||||
|
||||
return inputs;
|
||||
}
|
||||
|
||||
private static List<Label> getLabels(Schedulable trigger, RunContext runContext, Backfill backfill, FlowInterface flow) throws IllegalVariableEvaluationException {
|
||||
List<Label> labels = LabelService.fromTrigger(runContext, flow, (AbstractTrigger) trigger);
|
||||
|
||||
if (backfill != null && backfill.getLabels() != null) {
|
||||
for (Label label : backfill.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
return labels;
|
||||
}
|
||||
}
|
||||
@@ -6,9 +6,7 @@ import com.cronutils.model.time.ExecutionTime;
|
||||
import com.cronutils.parser.CronParser;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
@@ -16,12 +14,8 @@ import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.*;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.ConditionService;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.validations.ScheduleValidation;
|
||||
import io.kestra.core.validations.TimezoneId;
|
||||
@@ -29,6 +23,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Null;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -40,6 +35,8 @@ import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
|
||||
@Slf4j
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@@ -224,11 +221,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private List<ScheduleCondition> scheduleConditions;
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
|
||||
private Map<String, Object> inputs;
|
||||
|
||||
@Schema(
|
||||
@@ -248,13 +241,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private Map<String, Object> backfill;
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
|
||||
private RecoverMissedSchedules recoverMissedSchedules;
|
||||
|
||||
@Override
|
||||
@@ -403,20 +390,11 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
if (!conditionResults) {
|
||||
return Optional.empty();
|
||||
}
|
||||
} catch(InternalException ie) {
|
||||
} catch (InternalException ie) {
|
||||
// validate schedule condition can fail to render variables
|
||||
// in this case, we return a failed execution so the trigger is not evaluated each second
|
||||
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.labels(generateLabels(runContext, conditionContext, backfill))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
return Optional.of(execution);
|
||||
return Optional.of(SchedulableExecutionFactory.createFailedExecution(this, conditionContext, triggerContext));
|
||||
}
|
||||
|
||||
// recalculate true output for previous and next based on conditions
|
||||
@@ -430,14 +408,12 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
variables = scheduleDates.toMap();
|
||||
}
|
||||
|
||||
Execution execution = TriggerService.generateScheduledExecution(
|
||||
Execution execution = SchedulableExecutionFactory.createExecution(
|
||||
this,
|
||||
conditionContext,
|
||||
triggerContext,
|
||||
generateLabels(runContext, conditionContext, backfill),
|
||||
generateInputs(runContext, backfill),
|
||||
variables,
|
||||
Optional.empty()
|
||||
null
|
||||
);
|
||||
|
||||
return Optional.of(execution);
|
||||
@@ -448,34 +424,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
return parser.parse(this.cron);
|
||||
}
|
||||
|
||||
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
List<Label> labels = LabelService.fromTrigger(runContext, conditionContext.getFlow(), this);
|
||||
|
||||
if (backfill != null && backfill.getLabels() != null) {
|
||||
for (Label label : backfill.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return labels;
|
||||
}
|
||||
|
||||
private Map<String, Object> generateInputs(RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
Map<String, Object> inputs = new HashMap<>();
|
||||
|
||||
if (this.inputs != null) {
|
||||
inputs.putAll(runContext.render(this.inputs));
|
||||
}
|
||||
|
||||
if (backfill != null && backfill.getInputs() != null) {
|
||||
inputs.putAll(runContext.render(backfill.getInputs()));
|
||||
}
|
||||
|
||||
return inputs;
|
||||
}
|
||||
private Optional<Output> scheduleDates(ExecutionTime executionTime, ZonedDateTime date) {
|
||||
Optional<ZonedDateTime> next = executionTime.nextExecution(date.minus(Duration.ofSeconds(1)));
|
||||
|
||||
@@ -549,9 +497,9 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, ZonedDateTime toTestDate, boolean next) throws InternalException {
|
||||
int upperYearBound = ZonedDateTime.now().getYear() + 10;
|
||||
int lowerYearBound = ZonedDateTime.now().getYear() - 10;
|
||||
|
||||
|
||||
while ((next && toTestDate.getYear() < upperYearBound) || (!next && toTestDate.getYear() > lowerYearBound)) {
|
||||
|
||||
|
||||
Optional<ZonedDateTime> currentDate = next ?
|
||||
executionTime.nextExecution(toTestDate) :
|
||||
executionTime.lastExecution(toTestDate);
|
||||
@@ -607,11 +555,10 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
|
||||
private boolean validateScheduleCondition(ConditionContext conditionContext) throws InternalException {
|
||||
if (conditions != null) {
|
||||
ConditionService conditionService = ((DefaultRunContext)conditionContext.getRunContext()).getApplicationContext().getBean(ConditionService.class);
|
||||
return conditionService.isValid(
|
||||
conditions.stream().filter(c -> c instanceof ScheduleCondition).map(c -> (ScheduleCondition) c).toList(),
|
||||
conditionContext
|
||||
);
|
||||
return conditions.stream()
|
||||
.filter(c -> c instanceof ScheduleCondition)
|
||||
.map(c -> (ScheduleCondition) c)
|
||||
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
@@ -10,7 +10,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.VoidOutput;
|
||||
import io.kestra.core.models.triggers.*;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.validations.TimezoneId;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -23,7 +22,10 @@ import java.time.Duration;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@@ -45,11 +47,7 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
@Builder.Default
|
||||
@Null
|
||||
private final Duration interval = null;
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
|
||||
private Map<String, Object> inputs;
|
||||
|
||||
@TimezoneId
|
||||
@@ -63,31 +61,24 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
@NotNull
|
||||
private Property<List<ZonedDateTime>> dates;
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
private RecoverMissedSchedules recoverMissedSchedules;
|
||||
|
||||
@Override
|
||||
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext triggerContext) throws Exception {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
|
||||
ZonedDateTime lastEvaluation = triggerContext.getDate();
|
||||
Optional<ZonedDateTime> nextDate = nextDate(runContext, date -> date.isEqual(lastEvaluation) || date.isAfter(lastEvaluation));
|
||||
|
||||
if (nextDate.isPresent()) {
|
||||
log.info("Schedule execution on {}", nextDate.get());
|
||||
|
||||
Execution execution = TriggerService.generateScheduledExecution(
|
||||
Execution execution = SchedulableExecutionFactory.createExecution(
|
||||
this,
|
||||
conditionContext,
|
||||
triggerContext,
|
||||
LabelService.fromTrigger(runContext, conditionContext.getFlow(), this),
|
||||
this.inputs != null ? runContext.render(this.inputs) : Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
nextDate
|
||||
nextDate.orElse(null)
|
||||
);
|
||||
|
||||
return Optional.of(execution);
|
||||
@@ -97,29 +88,21 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
|
||||
try {
|
||||
return last
|
||||
.map(throwFunction(context ->
|
||||
nextDate(conditionContext.getRunContext(), date -> date.isAfter(context.getDate()))
|
||||
.orElse(ZonedDateTime.now().plusYears(1))
|
||||
))
|
||||
.orElse(conditionContext.getRunContext()
|
||||
.render(dates)
|
||||
.asList(ZonedDateTime.class)
|
||||
.stream()
|
||||
.sorted()
|
||||
.findFirst()
|
||||
.orElse(ZonedDateTime.now()))
|
||||
.truncatedTo(ChronoUnit.SECONDS);
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
|
||||
return ZonedDateTime.now().plusYears(1);
|
||||
}
|
||||
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> triggerContext) {
|
||||
return triggerContext
|
||||
.map(ctx -> ctx.getBackfill() != null ? ctx.getBackfill().getCurrentDate() : ctx.getDate())
|
||||
.map(this::withTimeZone)
|
||||
.or(() -> Optional.of(ZonedDateTime.now()))
|
||||
.flatMap(dt -> {
|
||||
try {
|
||||
return nextDate(conditionContext.getRunContext(), date -> date.isAfter(dt));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
|
||||
throw new InvalidTriggerConfigurationException("Failed to evaluate schedule 'dates'. Cause: " + e.getMessage());
|
||||
}
|
||||
}).orElseGet(() -> ZonedDateTime.now().plusYears(1));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public ZonedDateTime nextEvaluationDate() {
|
||||
// TODO this may be the next date from now?
|
||||
@@ -139,9 +122,17 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
return previousDates.isEmpty() ? ZonedDateTime.now() : previousDates.getFirst();
|
||||
}
|
||||
|
||||
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> filter) throws IllegalVariableEvaluationException {
|
||||
return runContext.render(dates).asList(ZonedDateTime.class).stream().sorted()
|
||||
.filter(date -> filter.test(date))
|
||||
private ZonedDateTime withTimeZone(ZonedDateTime date) {
|
||||
if (this.timezone == null) {
|
||||
return date;
|
||||
}
|
||||
return date.withZoneSameInstant(ZoneId.of(this.timezone));
|
||||
}
|
||||
|
||||
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> predicate) throws IllegalVariableEvaluationException {
|
||||
return runContext.render(dates)
|
||||
.asList(ZonedDateTime.class).stream().sorted()
|
||||
.filter(predicate)
|
||||
.map(throwFunction(date -> timezone == null ? date : date.withZoneSameInstant(ZoneId.of(runContext.render(timezone)))))
|
||||
.findFirst()
|
||||
.map(date -> date.truncatedTo(ChronoUnit.SECONDS));
|
||||
|
||||
@@ -9,10 +9,14 @@
|
||||
<property name="pattern" value="%date{HH:mm:ss}.%ms %highlight(%-5.5level) %magenta(%-12.36thread) %cyan(%-12.36logger{36}) %msg%n" />
|
||||
|
||||
<logger name="io.kestra" level="INFO" />
|
||||
<logger name="flow" level="INFO" />
|
||||
<logger name="task" level="INFO" />
|
||||
<logger name="execution" level="INFO" />
|
||||
<logger name="trigger" level="INFO" />
|
||||
|
||||
<!-- Flow execution logs - disabled by default -->
|
||||
<logger name="flow" level="OFF" />
|
||||
|
||||
<!-- Server loggers -->
|
||||
<logger name="worker" level="INFO" />
|
||||
<logger name="executor" level="INFO" />
|
||||
<logger name="scheduler" level="INFO" />
|
||||
|
||||
<logger name="io.kestra.ee.runner.kafka.services.KafkaConsumerService" level="WARN" />
|
||||
<logger name="io.kestra.ee.runner.kafka.services.KafkaProducerService" level="WARN" />
|
||||
|
||||
@@ -170,10 +170,11 @@ class JsonSchemaGeneratorTest {
|
||||
|
||||
Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class);
|
||||
assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf(
|
||||
Matchers.aMapWithSize(3),
|
||||
Matchers.aMapWithSize(4),
|
||||
hasKey("conditions"),
|
||||
hasKey("stopAfter"),
|
||||
hasKey("type")
|
||||
hasKey("type"),
|
||||
hasKey("allowConcurrent")
|
||||
));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -60,6 +60,15 @@ class SystemInformationReportTest {
|
||||
return setting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting internalSave(Setting setting) throws ConstraintViolationException {
|
||||
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
|
||||
UUID = setting.getValue();
|
||||
}
|
||||
|
||||
return setting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting delete(Setting setting) {
|
||||
return setting;
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import com.devskiller.friendly_id.FriendlyId;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
@@ -24,7 +24,6 @@ import io.kestra.core.models.flows.State.Type;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
@@ -42,10 +41,9 @@ import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.*;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
@@ -185,6 +183,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
@FlakyTest(description = "Filtering tests are sometimes returning 0")
|
||||
void should_find_all(QueryFilter filter, int expectedSize){
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant, "executionTriggerId");
|
||||
|
||||
@@ -10,82 +10,83 @@ import org.junit.jupiter.api.TestInstance;
|
||||
@KestraTest(startRunner = true)
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
public abstract class AbstractRunnerConcurrencyTest {
|
||||
public static final String TENANT_1 = "tenant1";
|
||||
|
||||
@Inject
|
||||
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel.yml"}, tenantId = "concurrency-cancel")
|
||||
void concurrencyCancel() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancel();
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancel("concurrency-cancel");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-fail.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-fail.yml"}, tenantId = "concurrency-fail")
|
||||
void concurrencyFail() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyFail();
|
||||
flowConcurrencyCaseTest.flowConcurrencyFail("concurrency-fail");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "concurrency-queue")
|
||||
void concurrencyQueue() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueue();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueue("concurrency-queue");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-pause.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-pause.yml"}, tenantId = "concurrency-queue-pause")
|
||||
protected void concurrencyQueuePause() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueuePause();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueuePause("concurrency-queue-pause");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-cancel-pause.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel-pause.yml"}, tenantId = "concurrency-cancel-pause")
|
||||
protected void concurrencyCancelPause() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancelPause("concurrency-cancel-pause");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-with-for-each-item")
|
||||
protected void flowConcurrencyWithForEachItem() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem("flow-concurrency-with-for-each-item");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-fail.yml"}, tenantId = "concurrency-queue-restarted")
|
||||
protected void concurrencyQueueRestarted() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted("concurrency-queue-restarted");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-after-execution.yml"}, tenantId = "concurrency-queue-after-execution")
|
||||
void concurrencyQueueAfterExecution() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution("concurrency-queue-after-execution");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = TENANT_1)
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = "flow-concurrency-subflow")
|
||||
void flowConcurrencySubflow() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencySubflow(TENANT_1);
|
||||
flowConcurrencyCaseTest.flowConcurrencySubflow("flow-concurrency-subflow");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FlakyTest(description = "Only flaky in CI")
|
||||
@LoadFlows({"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"})
|
||||
@LoadFlows(
|
||||
value = {"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"},
|
||||
tenantId = "flow-concurrency-parallel-subflow-kill"
|
||||
)
|
||||
protected void flowConcurrencyParallelSubflowKill() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill();
|
||||
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill("flow-concurrency-parallel-subflow-kill");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-killed")
|
||||
void flowConcurrencyKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyKilled();
|
||||
flowConcurrencyCaseTest.flowConcurrencyKilled("flow-concurrency-killed");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FlakyTest(description = "Only flaky in CI")
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-queue-killed")
|
||||
void flowConcurrencyQueueKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueKilled();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueKilled("flow-concurrency-queue-killed");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,6 @@ import java.util.Optional;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Singleton
|
||||
@@ -57,12 +56,12 @@ public class FlowConcurrencyCaseTest {
|
||||
@Named(QueueFactoryInterface.KILL_NAMED)
|
||||
protected QueueInterface<ExecutionKilled> killQueue;
|
||||
|
||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
public void flowConcurrencyCancel(String tenantId) throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
try {
|
||||
List<Execution> shouldFailExecutions = List.of(
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel"),
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel")
|
||||
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel"),
|
||||
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel")
|
||||
);
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
|
||||
@@ -73,12 +72,12 @@ public class FlowConcurrencyCaseTest {
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyFail() throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
|
||||
public void flowConcurrencyFail(String tenantId) throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
|
||||
try {
|
||||
List<Execution> shouldFailExecutions = List.of(
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail"),
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail")
|
||||
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail"),
|
||||
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail")
|
||||
);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
@@ -89,10 +88,10 @@ public class FlowConcurrencyCaseTest {
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueue() throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
||||
public void flowConcurrencyQueue(String tenantId) throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
||||
@@ -108,10 +107,10 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueuePause() throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause");
|
||||
public void flowConcurrencyQueuePause(String tenantId) throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-queue-pause");
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
||||
@@ -127,10 +126,10 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(secondExecutionResult.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencyCancelPause() throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause");
|
||||
public void flowConcurrencyCancelPause(String tenantId) throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-cancel-pause");
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.CANCELLED), execution2);
|
||||
@@ -166,11 +165,11 @@ public class FlowConcurrencyCaseTest {
|
||||
.toList()).contains(Type.QUEUED);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueRestarted() throws Exception {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE,
|
||||
public void flowConcurrencyQueueRestarted(String tenantId) throws Exception {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE,
|
||||
"flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
|
||||
@@ -179,7 +178,10 @@ public class FlowConcurrencyCaseTest {
|
||||
// we restart the first one, it should be queued then fail again.
|
||||
Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1);
|
||||
Execution restarted = executionService.restart(failedExecution, null);
|
||||
Execution executionResult1 = runnerUtils.restartExecution(e -> e.getState().getCurrent().equals(Type.FAILED), restarted);
|
||||
Execution executionResult1 = runnerUtils.restartExecution(
|
||||
e -> e.getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED) && e.getState().getCurrent().equals(Type.FAILED),
|
||||
restarted
|
||||
);
|
||||
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2);
|
||||
|
||||
assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
@@ -193,10 +195,10 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueAfterExecution() throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
||||
public void flowConcurrencyQueueAfterExecution(String tenantId) throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
||||
@@ -216,15 +218,15 @@ public class FlowConcurrencyCaseTest {
|
||||
List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel");
|
||||
assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED);
|
||||
|
||||
// run another execution to be sure that everything work (purge is correctly done)
|
||||
// run another execution to be sure that everything works (purge is correctly done)
|
||||
Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
|
||||
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");
|
||||
}
|
||||
|
||||
public void flowConcurrencyParallelSubflowKill() throws QueueException {
|
||||
Execution parent = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
|
||||
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
|
||||
public void flowConcurrencyParallelSubflowKill(String tenantId) throws QueueException {
|
||||
Execution parent = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
|
||||
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
|
||||
|
||||
// Kill the parent
|
||||
killQueue.emit(ExecutionKilledExecution
|
||||
@@ -232,7 +234,7 @@ public class FlowConcurrencyCaseTest {
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.executionId(parent.getId())
|
||||
.isOnKillCascade(true)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.build()
|
||||
);
|
||||
|
||||
@@ -242,11 +244,11 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(terminated.getTaskRunList()).isNull();
|
||||
}
|
||||
|
||||
public void flowConcurrencyKilled() throws QueueException, InterruptedException {
|
||||
public void flowConcurrencyKilled(String tenantId) throws QueueException, InterruptedException {
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
|
||||
@@ -261,7 +263,7 @@ public class FlowConcurrencyCaseTest {
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.executionId(execution1.getId())
|
||||
.isOnKillCascade(true)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.build()
|
||||
);
|
||||
|
||||
@@ -279,20 +281,19 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||
} finally {
|
||||
// kill everything to avoid dangling executions
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.killExecution(execution2);
|
||||
runnerUtils.killExecution(execution3);
|
||||
|
||||
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
||||
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueKilled() throws QueueException, InterruptedException {
|
||||
public void flowConcurrencyQueueKilled(String tenantId) throws QueueException, InterruptedException {
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
|
||||
@@ -307,7 +308,7 @@ public class FlowConcurrencyCaseTest {
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.executionId(execution2.getId())
|
||||
.isOnKillCascade(true)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.build()
|
||||
);
|
||||
|
||||
@@ -322,11 +323,10 @@ public class FlowConcurrencyCaseTest {
|
||||
} finally {
|
||||
// kill everything to avoid dangling executions
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.killExecution(execution2);
|
||||
runnerUtils.killExecution(execution3);
|
||||
|
||||
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
||||
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,24 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@KestraTest
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@MicronautTest
|
||||
public class EditionProviderTest {
|
||||
@Inject
|
||||
private EditionProvider editionProvider;
|
||||
|
||||
@Inject
|
||||
private SettingRepositoryInterface settingRepository;
|
||||
|
||||
protected EditionProvider.Edition expectedEdition() {
|
||||
return EditionProvider.Edition.OSS;
|
||||
}
|
||||
@@ -17,5 +26,10 @@ public class EditionProviderTest {
|
||||
@Test
|
||||
void shouldReturnCurrentEdition() {
|
||||
Assertions.assertEquals(expectedEdition(), editionProvider.get());
|
||||
|
||||
// check that the edition is persisted in settings
|
||||
Optional<Setting> editionSettings = settingRepository.findByKey(Setting.INSTANCE_EDITION);
|
||||
assertThat(editionSettings).isPresent();
|
||||
assertThat(editionSettings.get().getValue()).isEqualTo(expectedEdition().name());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,48 +1,107 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import ch.qos.logback.classic.Logger;
|
||||
import ch.qos.logback.classic.LoggerContext;
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
import ch.qos.logback.core.AppenderBase;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Slf4j
|
||||
class LogsTest {
|
||||
|
||||
|
||||
private static final InMemoryAppender MEMORY_APPENDER = new InMemoryAppender();
|
||||
|
||||
@BeforeAll
|
||||
static void setupLogger() {
|
||||
Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
|
||||
MEMORY_APPENDER.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
|
||||
MEMORY_APPENDER.start();
|
||||
logger.addAppender(MEMORY_APPENDER);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void clearLogs() {
|
||||
MEMORY_APPENDER.clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
void logFlow() {
|
||||
var flow = Flow.builder().namespace("namespace").id("flow").build();
|
||||
var flow = Flow.builder().tenantId("tenant").namespace("namespace").id("flow").build();
|
||||
Logs.logExecution(flow, log, Level.INFO, "Some log");
|
||||
Logs.logExecution(flow, log, Level.INFO, "Some log with an {}", "attribute");
|
||||
Logs.logExecution(flow, log, Level.ERROR, "Some log with an {} and an error", "attribute", new RuntimeException("Test Exception"));
|
||||
|
||||
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||
assertThat(logs).hasSize(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
void logExecution() {
|
||||
var execution = Execution.builder().namespace("namespace").flowId("flow").id("execution").build();
|
||||
Logs.logExecution(execution, log, Level.INFO, "Some log");
|
||||
Logs.logExecution(execution, log, Level.INFO, "Some log with an {}", "attribute");
|
||||
var execution = Execution.builder().tenantId("tenant").namespace("namespace").flowId("flow").id("execution").build();
|
||||
Logs.logExecution(execution, Level.INFO, "Some log");
|
||||
Logs.logExecution(execution, Level.INFO, "Some log with an {}", "attribute");
|
||||
Logs.logExecution(execution, Level.INFO, "Some log");
|
||||
|
||||
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||
assertThat(logs).hasSize(3);
|
||||
assertThat(logs.getFirst().getLoggerName()).isEqualTo("executor.tenant.namespace.flow");
|
||||
}
|
||||
|
||||
@Test
|
||||
void logTrigger() {
|
||||
var trigger = TriggerContext.builder().namespace("namespace").flowId("flow").triggerId("trigger").build();
|
||||
Logs.logTrigger(trigger, log, Level.INFO, "Some log");
|
||||
Logs.logTrigger(trigger, log, Level.INFO, "Some log with an {}", "attribute");
|
||||
var trigger = TriggerContext.builder().tenantId("tenant").namespace("namespace").flowId("flow").triggerId("trigger").build();
|
||||
Logs.logTrigger(trigger, Level.INFO, "Some log");
|
||||
Logs.logTrigger(trigger, Level.INFO, "Some log with an {}", "attribute");
|
||||
Logs.logTrigger(trigger, Level.INFO, "Some log");
|
||||
|
||||
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||
assertThat(logs).hasSize(3);
|
||||
assertThat(logs.getFirst().getLoggerName()).isEqualTo("scheduler.tenant.namespace.flow.trigger");
|
||||
}
|
||||
|
||||
@Test
|
||||
void logTaskRun() {
|
||||
var taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
|
||||
var taskRun = TaskRun.builder().tenantId("tenant").namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
|
||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
|
||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
|
||||
|
||||
taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").value("value").build();
|
||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
|
||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
|
||||
|
||||
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||
assertThat(logs).hasSize(4);
|
||||
assertThat(logs.getFirst().getLoggerName()).isEqualTo("worker.tenant.namespace.flow.task");
|
||||
}
|
||||
|
||||
private static class InMemoryAppender extends AppenderBase<ILoggingEvent> {
|
||||
private final List<ILoggingEvent> logs = new CopyOnWriteArrayList<>();
|
||||
|
||||
@Override
|
||||
protected void append(ILoggingEvent event) {
|
||||
logs.add(event);
|
||||
}
|
||||
|
||||
public List<ILoggingEvent> getLogs() {
|
||||
return logs;
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
logs.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,4 +216,23 @@ class MapUtilsTest {
|
||||
"k1.k4", "v2"
|
||||
));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void mergeShouldNotDuplicateListElements() {
|
||||
Map<String, Object> first = Map.of(
|
||||
"key1", "value1",
|
||||
"key2", List.of("something", "else")
|
||||
);
|
||||
Map<String, Object> second = Map.of(
|
||||
"key2", List.of("something", "other"),
|
||||
"key3", "value3"
|
||||
);
|
||||
|
||||
Map<String, Object> results = MapUtils.merge(first, second);
|
||||
|
||||
assertThat(results).hasSize(3);
|
||||
List<String> list = (List<String>) results.get("key2");
|
||||
assertThat(list).hasSize(3);
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
@@ -45,9 +44,6 @@ class NamespaceFilesUtilsTest {
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
QueueInterface<LogEntry> workerTaskLogQueue;
|
||||
|
||||
@Inject
|
||||
NamespaceFilesUtils namespaceFilesUtils;
|
||||
|
||||
@Inject
|
||||
NamespaceFactory namespaceFactory;
|
||||
|
||||
@@ -66,7 +62,7 @@ class NamespaceFilesUtilsTest {
|
||||
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
|
||||
}
|
||||
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
|
||||
|
||||
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
||||
receive.blockLast();
|
||||
@@ -91,7 +87,7 @@ class NamespaceFilesUtilsTest {
|
||||
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
|
||||
}
|
||||
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||
|
||||
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
||||
receive.blockLast();
|
||||
@@ -116,7 +112,7 @@ class NamespaceFilesUtilsTest {
|
||||
namespaceStorage.putFile(Path.of("/folder2/test.txt"), data);
|
||||
namespaceStorage.putFile(Path.of("/test.txt"), data);
|
||||
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||
|
||||
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
||||
receive.blockLast();
|
||||
@@ -141,7 +137,7 @@ class NamespaceFilesUtilsTest {
|
||||
namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data);
|
||||
namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data);
|
||||
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
|
||||
.namespaces(Property.ofValue(List.of(ns1, ns2)))
|
||||
.folderPerNamespace(Property.ofValue(true))
|
||||
.build());
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@MicronautTest
|
||||
class VersionProviderTest {
|
||||
@Inject
|
||||
private VersionProvider versionProvider;
|
||||
|
||||
@Inject
|
||||
private SettingRepositoryInterface settingRepository;
|
||||
|
||||
@Test
|
||||
void shouldResolveVersion() {
|
||||
assertThat(versionProvider.getVersion()).endsWith("-SNAPSHOT");
|
||||
|
||||
// check that the version is persisted in settings
|
||||
Optional<Setting> versionSettings = settingRepository.findByKey(Setting.INSTANCE_VERSION);
|
||||
assertThat(versionSettings).isPresent();
|
||||
assertThat(versionSettings.get().getValue()).isEqualTo(versionProvider.getVersion());
|
||||
}
|
||||
}
|
||||
@@ -9,9 +9,15 @@ import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import io.kestra.core.models.validations.ValidateConstraintViolation;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.JsonLocation;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.util.List;
|
||||
import java.io.File;
|
||||
import java.net.URL;
|
||||
import java.util.Optional;
|
||||
@@ -23,6 +29,107 @@ class FlowValidationTest {
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
// Helper class to create JsonProcessingException with location
|
||||
private static class TestJsonProcessingException extends JsonProcessingException {
|
||||
public TestJsonProcessingException(String msg, JsonLocation location) {
|
||||
super(msg, location);
|
||||
}
|
||||
public TestJsonProcessingException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_WithExpectedFieldName() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("Expected a field name", new JsonLocation(null, 100, 5, 10));
|
||||
Object dummyTarget = new Object(); // Dummy target for toConstraintViolationException
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").contains("(at line 5)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_WithMappingStartEvent() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("MappingStartEvent", new JsonLocation(null, 200, 3, 5));
|
||||
Object dummyTarget = new Object();
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML syntax error: Unexpected mapping start").contains("(at line 3)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_WithScalarValue() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("Scalar value", new JsonLocation(null, 150, 7, 12));
|
||||
Object dummyTarget = new Object();
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML syntax error: Expected a simple value").contains("(at line 7)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_GenericError() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("Some other error", new JsonLocation(null, 50, 2, 8));
|
||||
Object dummyTarget = new Object();
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML parsing error: Some other error").contains("(at line 2)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_NoLocation() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("Expected a field name");
|
||||
Object dummyTarget = new Object();
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").doesNotContain("at line");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void testValidateFlowWithYamlSyntaxError() {
|
||||
String invalidYaml = """
|
||||
id: test-flow
|
||||
namespace: io.kestra.unittest
|
||||
tasks:
|
||||
- id:hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: {{ abc }}
|
||||
|
||||
""";
|
||||
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", invalidYaml);
|
||||
|
||||
assertThat(results).hasSize(1);
|
||||
assertThat(results.getFirst().getConstraints()).contains("YAML parsing error").contains("at line");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testValidateFlowWithUndefinedVariable() {
|
||||
String yamlWithUndefinedVar = """
|
||||
id: test-flow
|
||||
namespace: io.kestra.unittest
|
||||
tasks:
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: {{ undefinedVar }}
|
||||
""";
|
||||
|
||||
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", yamlWithUndefinedVar);
|
||||
|
||||
assertThat(results).hasSize(1);
|
||||
assertThat(results.getFirst().getConstraints()).contains("Validation error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void invalidRecursiveFlow() {
|
||||
Flow flow = this.parse("flows/invalids/recursive-flow.yaml");
|
||||
@@ -130,4 +237,4 @@ class FlowValidationTest {
|
||||
|
||||
return YamlParser.parse(file, Flow.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.State.History;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.InputAndOutput;
|
||||
import io.kestra.core.runners.SubflowExecutionResult;
|
||||
import io.kestra.core.services.VariablesService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -46,11 +47,15 @@ class SubflowTest {
|
||||
@Mock
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Mock
|
||||
private InputAndOutput inputAndOutput;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() {
|
||||
Mockito.when(applicationContext.getBean(VariablesService.class)).thenReturn(new VariablesService());
|
||||
Mockito.when(runContext.logger()).thenReturn(LOG);
|
||||
Mockito.when(runContext.getApplicationContext()).thenReturn(applicationContext);
|
||||
Mockito.when(runContext.inputAndOutput()).thenReturn(inputAndOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -118,7 +123,7 @@ class SubflowTest {
|
||||
|
||||
Map<String, Object> outputs = Map.of("key", "value");
|
||||
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(outputs);
|
||||
|
||||
Mockito.when(inputAndOutput.renderOutputs(Mockito.anyList())).thenReturn(Map.of("key", "value"));
|
||||
|
||||
Subflow subflow = Subflow.builder()
|
||||
.outputs(outputs)
|
||||
@@ -159,6 +164,7 @@ class SubflowTest {
|
||||
|
||||
Output output = Output.builder().id("key").value("value").build();
|
||||
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue()));
|
||||
Mockito.when(inputAndOutput.typedOutputs(Mockito.any(), Mockito.any(), Mockito.anyMap())).thenReturn(Map.of("key", "value"));
|
||||
Flow flow = Flow.builder()
|
||||
.outputs(List.of(output))
|
||||
.build();
|
||||
|
||||
@@ -57,7 +57,7 @@ class ScheduleOnDatesTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() throws Exception {
|
||||
public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() {
|
||||
// given
|
||||
var now = ZonedDateTime.now();
|
||||
var before = now.minusMinutes(1).truncatedTo(ChronoUnit.SECONDS);
|
||||
@@ -75,7 +75,7 @@ class ScheduleOnDatesTest {
|
||||
ZonedDateTime nextDate = scheduleOnDates.nextEvaluationDate(conditionContext, Optional.empty());
|
||||
|
||||
// then
|
||||
assertThat(nextDate).isEqualTo(before);
|
||||
assertThat(nextDate).isEqualTo(after);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -13,6 +13,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.flows.input.MultiselectInput;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
@@ -103,8 +104,9 @@ class ScheduleTest {
|
||||
);
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
assertThat(evaluate.get().getLabels()).hasSize(3);
|
||||
assertThat(evaluate.get().getLabels()).hasSize(4);
|
||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
||||
var vars = evaluate.get().getTrigger().getVariables();
|
||||
var inputs = evaluate.get().getInputs();
|
||||
@@ -137,8 +139,9 @@ class ScheduleTest {
|
||||
);
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
assertThat(evaluate.get().getLabels()).hasSize(3);
|
||||
assertThat(evaluate.get().getLabels()).hasSize(4);
|
||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
||||
var inputs = evaluate.get().getInputs();
|
||||
|
||||
@@ -475,6 +478,81 @@ class ScheduleTest {
|
||||
assertThat(result.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
||||
}
|
||||
|
||||
@Test
|
||||
void successWithMultiselectInputDefaults() throws Exception {
|
||||
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
|
||||
|
||||
ZonedDateTime date = ZonedDateTime.now()
|
||||
.withDayOfMonth(1)
|
||||
.withHour(0)
|
||||
.withMinute(0)
|
||||
.withSecond(0)
|
||||
.truncatedTo(ChronoUnit.SECONDS)
|
||||
.minusMonths(1);
|
||||
|
||||
Optional<Execution> evaluate = trigger.evaluate(
|
||||
conditionContextWithMultiselectInput(trigger),
|
||||
triggerContext(date, trigger));
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
var inputs = evaluate.get().getInputs();
|
||||
|
||||
// Verify MULTISELECT input with explicit defaults works correctly
|
||||
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option1", "option2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void successWithMultiselectInputAutoSelectFirst() throws Exception {
|
||||
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
|
||||
|
||||
ZonedDateTime date = ZonedDateTime.now()
|
||||
.withDayOfMonth(1)
|
||||
.withHour(0)
|
||||
.withMinute(0)
|
||||
.withSecond(0)
|
||||
.truncatedTo(ChronoUnit.SECONDS)
|
||||
.minusMonths(1);
|
||||
|
||||
Optional<Execution> evaluate = trigger.evaluate(
|
||||
conditionContextWithMultiselectAutoSelectFirst(trigger),
|
||||
triggerContext(date, trigger));
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
var inputs = evaluate.get().getInputs();
|
||||
|
||||
// Verify MULTISELECT input with autoSelectFirst defaults to first option
|
||||
assertThat(inputs.get("multiselectAutoSelect")).isEqualTo(List.of("first"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void successWithMultiselectInputProvidedValue() throws Exception {
|
||||
// Test that provided values override defaults for MULTISELECT
|
||||
Schedule trigger = Schedule.builder()
|
||||
.id("schedule")
|
||||
.type(Schedule.class.getName())
|
||||
.cron("0 0 1 * *")
|
||||
.inputs(Map.of("multiselectInput", List.of("option3")))
|
||||
.build();
|
||||
|
||||
ZonedDateTime date = ZonedDateTime.now()
|
||||
.withDayOfMonth(1)
|
||||
.withHour(0)
|
||||
.withMinute(0)
|
||||
.withSecond(0)
|
||||
.truncatedTo(ChronoUnit.SECONDS)
|
||||
.minusMonths(1);
|
||||
|
||||
Optional<Execution> evaluate = trigger.evaluate(
|
||||
conditionContextWithMultiselectInput(trigger),
|
||||
triggerContext(date, trigger));
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
var inputs = evaluate.get().getInputs();
|
||||
|
||||
// Verify provided value overrides defaults
|
||||
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option3"));
|
||||
}
|
||||
|
||||
private ConditionContext conditionContext(AbstractTrigger trigger) {
|
||||
Flow flow = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
@@ -504,17 +582,79 @@ class ScheduleTest {
|
||||
.build();
|
||||
}
|
||||
|
||||
private ConditionContext conditionContextWithMultiselectInput(AbstractTrigger trigger) {
|
||||
Flow flow = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.tests")
|
||||
.labels(
|
||||
List.of(
|
||||
new Label("flow-label-1", "flow-label-1"),
|
||||
new Label("flow-label-2", "flow-label-2")))
|
||||
.variables(Map.of("custom_var", "VARIABLE VALUE"))
|
||||
.inputs(List.of(
|
||||
MultiselectInput.builder()
|
||||
.id("multiselectInput")
|
||||
.type(Type.MULTISELECT)
|
||||
.values(List.of("option1", "option2", "option3"))
|
||||
.defaults(Property.ofValue(List.of("option1", "option2")))
|
||||
.build()))
|
||||
.build();
|
||||
|
||||
TriggerContext triggerContext = TriggerContext.builder()
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.triggerId(trigger.getId())
|
||||
.build();
|
||||
|
||||
return ConditionContext.builder()
|
||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
|
||||
triggerContext, trigger))
|
||||
.flow(flow)
|
||||
.build();
|
||||
}
|
||||
|
||||
private ConditionContext conditionContextWithMultiselectAutoSelectFirst(AbstractTrigger trigger) {
|
||||
Flow flow = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.tests")
|
||||
.labels(
|
||||
List.of(
|
||||
new Label("flow-label-1", "flow-label-1"),
|
||||
new Label("flow-label-2", "flow-label-2")))
|
||||
.variables(Map.of("custom_var", "VARIABLE VALUE"))
|
||||
.inputs(List.of(
|
||||
MultiselectInput.builder()
|
||||
.id("multiselectAutoSelect")
|
||||
.type(Type.MULTISELECT)
|
||||
.values(List.of("first", "second", "third"))
|
||||
.autoSelectFirst(true)
|
||||
.build()))
|
||||
.build();
|
||||
|
||||
TriggerContext triggerContext = TriggerContext.builder()
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.triggerId(trigger.getId())
|
||||
.build();
|
||||
|
||||
return ConditionContext.builder()
|
||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
|
||||
triggerContext, trigger))
|
||||
.flow(flow)
|
||||
.build();
|
||||
}
|
||||
|
||||
private ZonedDateTime dateFromVars(String date, ZonedDateTime expexted) {
|
||||
return ZonedDateTime.parse(date).withZoneSameInstant(expexted.getZone());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldGetNextExecutionDateWithConditionMatchingFutureDate() throws InternalException {
|
||||
|
||||
|
||||
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
|
||||
OffsetTime before = now.minusHours(1).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||
OffsetTime after = now.minusHours(4).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||
|
||||
|
||||
Schedule trigger = Schedule.builder()
|
||||
.id("schedule").type(Schedule.class.getName())
|
||||
.cron("0 * * * *") // every hour
|
||||
@@ -527,25 +667,25 @@ class ScheduleTest {
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
|
||||
|
||||
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
|
||||
|
||||
|
||||
ConditionContext conditionContext = ConditionContext.builder()
|
||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
|
||||
.build();
|
||||
|
||||
|
||||
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
|
||||
assertThat(result).isNotEmpty();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldGetNextExecutionDateWithConditionMatchingCurrentDate() throws InternalException {
|
||||
|
||||
|
||||
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
|
||||
|
||||
OffsetTime before = now.plusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||
OffsetTime after = now.minusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||
|
||||
|
||||
Schedule trigger = Schedule.builder()
|
||||
.id("schedule").type(Schedule.class.getName())
|
||||
.cron("*/30 * * * * *")
|
||||
@@ -558,13 +698,13 @@ class ScheduleTest {
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
|
||||
|
||||
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
|
||||
|
||||
|
||||
ConditionContext conditionContext = ConditionContext.builder()
|
||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
|
||||
.build();
|
||||
|
||||
|
||||
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
|
||||
assertThat(result).isNotEmpty();
|
||||
}
|
||||
|
||||
@@ -8,4 +8,4 @@ concurrency:
|
||||
tasks:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT10S
|
||||
duration: PT2S
|
||||
|
||||
@@ -402,10 +402,11 @@ public class ExecutorService {
|
||||
|
||||
if (flow.getOutputs() != null) {
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
var inputAndOutput = runContext.inputAndOutput();
|
||||
|
||||
try {
|
||||
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
|
||||
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
|
||||
Map<String, Object> outputs = inputAndOutput.renderOutputs(flow.getOutputs());
|
||||
outputs = inputAndOutput.typedOutputs(flow, executor.getExecution(), outputs);
|
||||
newExecution = newExecution.withOutputs(outputs);
|
||||
} catch (Exception e) {
|
||||
Logs.logExecution(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||
|
||||
public class H2RunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
||||
public class H2RunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||
|
||||
public class MysqlRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
||||
public class MysqlRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||
|
||||
public class PostgresRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
||||
public class PostgresRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||
}
|
||||
|
||||
@@ -44,9 +44,15 @@ public abstract class AbstractJdbcSettingRepository extends AbstractJdbcCrudRepo
|
||||
|
||||
@Override
|
||||
public Setting save(Setting setting) {
|
||||
this.eventPublisher.publishEvent(new CrudEvent<>(setting, CrudEventType.UPDATE));
|
||||
|
||||
return internalSave(setting);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting internalSave(Setting setting) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(setting);
|
||||
this.jdbcRepository.persist(setting, fields);
|
||||
this.eventPublisher.publishEvent(new CrudEvent<>(setting, CrudEventType.UPDATE));
|
||||
|
||||
return setting;
|
||||
}
|
||||
|
||||
@@ -72,12 +72,12 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
|
||||
|
||||
@Override
|
||||
public Optional<Trigger> findLast(TriggerContext trigger) {
|
||||
return findOne(DSL.trueCondition(), field("key").eq(trigger.uid()));
|
||||
return findByUid(trigger.uid());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Trigger> findByExecution(Execution execution) {
|
||||
return findOne(execution.getTenantId(), field("execution_id").eq(execution.getId()));
|
||||
public Optional<Trigger> findByUid(String uid) {
|
||||
return findOne(DSL.trueCondition(), field("key").eq(uid));
|
||||
}
|
||||
|
||||
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) {
|
||||
|
||||
@@ -74,15 +74,19 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
|
||||
* Decrement the concurrency limit counter.
|
||||
* Must only be called when a flow having concurrency limit ends.
|
||||
*/
|
||||
public void decrement(FlowInterface flow) {
|
||||
this.jdbcRepository
|
||||
public int decrement(FlowInterface flow) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
.transactionResult(configuration -> {
|
||||
var dslContext = DSL.using(configuration);
|
||||
|
||||
fetchOne(dslContext, flow).ifPresent(
|
||||
concurrencyLimit -> update(dslContext, concurrencyLimit.withRunning(concurrencyLimit.getRunning() == 0 ? 0 : concurrencyLimit.getRunning() - 1))
|
||||
);
|
||||
return fetchOne(dslContext, flow).map(
|
||||
concurrencyLimit -> {
|
||||
int newLimit = concurrencyLimit.getRunning() == 0 ? 0 : concurrencyLimit.getRunning() - 1;
|
||||
update(dslContext, concurrencyLimit.withRunning(newLimit));
|
||||
return newLimit;
|
||||
}
|
||||
).orElse(0);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
@@ -1138,9 +1139,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
execution.getTrigger().getId()
|
||||
);
|
||||
} else {
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
|
||||
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1210,24 +1209,30 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
// as we may receive multiple time killed execution (one when we kill it, then one for each running worker task), we limit to the first we receive: when the state transitionned from KILLING to KILLED
|
||||
boolean killingThenKilled = execution.getState().getCurrent().isKilled() && executor.getOriginalState() == State.Type.KILLING;
|
||||
if (!queuedThenKilled && !concurrencyShortCircuitState && (!execution.getState().getCurrent().isKilled() || killingThenKilled)) {
|
||||
// decrement execution concurrency limit and pop a new queued execution if needed
|
||||
concurrencyLimitStorage.decrement(executor.getFlow());
|
||||
int newLimit = concurrencyLimitStorage.decrement(executor.getFlow());
|
||||
|
||||
if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
|
||||
var finalFlow = executor.getFlow();
|
||||
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
|
||||
executor.getFlow().getNamespace(),
|
||||
executor.getFlow().getId(),
|
||||
throwBiConsumer((dslContext, queued) -> {
|
||||
var newExecution = queued.withState(State.Type.RUNNING);
|
||||
concurrencyLimitStorage.increment(dslContext, finalFlow);
|
||||
executionQueue.emit(newExecution);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
|
||||
// process flow triggers to allow listening on RUNNING state after a QUEUED state
|
||||
processFlowTriggers(newExecution);
|
||||
})
|
||||
);
|
||||
if (newLimit < finalFlow.getConcurrency().getLimit()) {
|
||||
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
|
||||
executor.getFlow().getNamespace(),
|
||||
executor.getFlow().getId(),
|
||||
throwBiConsumer((dslContext, queued) -> {
|
||||
var newExecution = queued.withState(State.Type.RUNNING);
|
||||
concurrencyLimitStorage.increment(dslContext, finalFlow);
|
||||
executionQueue.emit(newExecution);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
|
||||
// process flow triggers to allow listening on RUNNING state after a QUEUED state
|
||||
processFlowTriggers(newExecution);
|
||||
})
|
||||
);
|
||||
} else {
|
||||
log.error("Concurrency limit reached for flow {}.{} after decrementing the execution running count due to the terminated execution {}. No new executions will be dequeued.", executor.getFlow().getNamespace(), executor.getFlow().getId(), executor.getExecution().getId());
|
||||
}
|
||||
} else if (newLimit >= executor.getFlow().getConcurrency().getLimit()) {
|
||||
log.error("Concurrency limit reached for flow {}.{} after decrementing the execution running count due to the terminated execution {}. This should not happen.", executor.getFlow().getNamespace(), executor.getFlow().getId(), executor.getExecution().getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1235,11 +1240,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
// purge the trigger: reset scheduler trigger at end
|
||||
if (execution.getTrigger() != null) {
|
||||
FlowWithSource flow = executor.getFlow();
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.ifPresent(trigger -> {
|
||||
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
|
||||
});
|
||||
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
|
||||
}
|
||||
|
||||
// Purge the workerTaskResultQueue and the workerJobQueue
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
import io.kestra.core.runners.ConcurrencyLimit;
|
||||
import io.kestra.core.runners.TestRunnerUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class JdbcConcurrencyRunnerTest extends AbstractRunnerConcurrencyTest {
|
||||
public static final String NAMESPACE = "io.kestra.tests";
|
||||
|
||||
@Inject
|
||||
private AbstractJdbcConcurrencyLimitStorage concurrencyLimitStorage;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
private TestRunnerUtils runnerUtils;
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-queued-protection")
|
||||
void flowConcurrencyQueuedProtection() throws QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
|
||||
Flow flow = flowRepository
|
||||
.findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(State.Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
|
||||
|
||||
// manually update the concurrency count so that queued protection kicks in and no new execution would be popped
|
||||
ConcurrencyLimit concurrencyLimit = concurrencyLimitStorage.findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue").orElseThrow();
|
||||
concurrencyLimit = concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1);
|
||||
concurrencyLimitStorage.update(concurrencyLimit);
|
||||
|
||||
Execution executionResult1 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(State.Type.SUCCESS), execution1);
|
||||
assertThat(executionResult1.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
// we wait for a few ms and checked that the second execution is still queued
|
||||
Thread.sleep(500);
|
||||
Execution executionResult2 = executionRepository.findById("flow-concurrency-queued-protection", execution2.getId()).orElseThrow();
|
||||
assertThat(executionResult2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
|
||||
|
||||
// we manually reset the concurrency count to avoid messing with any other tests
|
||||
concurrencyLimitStorage.update(concurrencyLimit.withRunning(concurrencyLimit.getRunning() - 1));
|
||||
}
|
||||
}
|
||||
@@ -31,11 +31,11 @@ dependencies {
|
||||
api enforcedPlatform("com.fasterxml.jackson:jackson-bom:$jacksonVersion")
|
||||
api enforcedPlatform("org.slf4j:slf4j-api:$slf4jVersion")
|
||||
api platform("io.micronaut.platform:micronaut-platform:4.9.4")
|
||||
api platform("io.qameta.allure:allure-bom:2.31.0")
|
||||
api platform("io.qameta.allure:allure-bom:2.32.0")
|
||||
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
|
||||
api platform('com.google.cloud:libraries-bom:26.72.0')
|
||||
api platform('com.google.cloud:libraries-bom:26.73.0')
|
||||
api platform("com.azure:azure-sdk-bom:1.3.3")
|
||||
api platform('software.amazon.awssdk:bom:2.40.5')
|
||||
api platform('software.amazon.awssdk:bom:2.40.10')
|
||||
api platform("dev.langchain4j:langchain4j-bom:$langchain4jVersion")
|
||||
api platform("dev.langchain4j:langchain4j-community-bom:$langchain4jCommunityVersion")
|
||||
|
||||
@@ -77,12 +77,12 @@ dependencies {
|
||||
api "org.apache.kafka:kafka-clients:$kafkaVersion"
|
||||
api "org.apache.kafka:kafka-streams:$kafkaVersion"
|
||||
// AWS CRT is not included in the AWS BOM but needed for the S3 Transfer manager
|
||||
api 'software.amazon.awssdk.crt:aws-crt:0.40.3'
|
||||
api 'software.amazon.awssdk.crt:aws-crt:0.41.0'
|
||||
|
||||
// Other libs
|
||||
api("org.projectlombok:lombok:1.18.42")
|
||||
api("org.codehaus.janino:janino:3.1.12")
|
||||
api group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.25.2'
|
||||
api group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.25.3'
|
||||
api group: 'org.slf4j', name: 'jul-to-slf4j', version: slf4jVersion
|
||||
api group: 'org.slf4j', name: 'jcl-over-slf4j', version: slf4jVersion
|
||||
api group: 'org.fusesource.jansi', name: 'jansi', version: '2.4.2'
|
||||
@@ -99,11 +99,11 @@ dependencies {
|
||||
api group: 'org.apache.maven.resolver', name: 'maven-resolver-transport-file', version: mavenResolverVersion
|
||||
api group: 'org.apache.maven.resolver', name: 'maven-resolver-transport-apache', version: mavenResolverVersion
|
||||
api 'com.github.oshi:oshi-core:6.9.1'
|
||||
api 'io.pebbletemplates:pebble:4.0.0'
|
||||
api 'io.pebbletemplates:pebble:4.1.0'
|
||||
api group: 'co.elastic.logging', name: 'logback-ecs-encoder', version: '1.7.0'
|
||||
api group: 'de.focus-shift', name: 'jollyday-core', version: jollydayVersion
|
||||
api group: 'de.focus-shift', name: 'jollyday-jaxb', version: jollydayVersion
|
||||
api 'nl.basjes.gitignore:gitignore-reader:1.13.0'
|
||||
api 'nl.basjes.gitignore:gitignore-reader:1.14.1'
|
||||
api group: 'dev.failsafe', name: 'failsafe', version: '3.3.2'
|
||||
api group: 'com.cronutils', name: 'cron-utils', version: '9.2.1'
|
||||
api group: 'com.github.victools', name: 'jsonschema-generator', version: jsonschemaVersion
|
||||
|
||||
@@ -288,7 +288,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
disableInvalidTrigger(workerTriggerResult.getTriggerContext(), e);
|
||||
return;
|
||||
}
|
||||
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate);
|
||||
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate, workerTriggerResult.getTrigger());
|
||||
} else {
|
||||
ZonedDateTime nextExecutionDate;
|
||||
try {
|
||||
@@ -715,7 +715,8 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
Optional<SchedulerExecutionWithTrigger> schedulerExecutionWithTrigger = evaluateScheduleTrigger(f);
|
||||
if (schedulerExecutionWithTrigger.isPresent()) {
|
||||
this.handleEvaluateSchedulingTriggerResult(schedule, schedulerExecutionWithTrigger.get(), f.getConditionContext(), scheduleContext);
|
||||
} else {
|
||||
}
|
||||
else{
|
||||
// compute next date and save the trigger to avoid evaluating it each second
|
||||
Trigger trigger = Trigger.fromEvaluateFailed(
|
||||
f.getTriggerContext(),
|
||||
@@ -750,26 +751,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
// validate schedule condition can fail to render variables
|
||||
// in this case, we send a failed execution so the trigger is not evaluated each second.
|
||||
logger.error("Unable to evaluate the trigger '{}'", f.getAbstractTrigger().getId(), ie);
|
||||
Execution execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(f.getTriggerContext().getTenantId())
|
||||
.namespace(f.getTriggerContext().getNamespace())
|
||||
.flowId(f.getTriggerContext().getFlowId())
|
||||
.flowRevision(f.getFlow().getRevision())
|
||||
.labels(LabelService.labelsExcludingSystem(f.getFlow()))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
ZonedDateTime nextExecutionDate;
|
||||
try {
|
||||
nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
|
||||
} catch (InvalidTriggerConfigurationException e2) {
|
||||
logError(f, e2);
|
||||
disableInvalidTrigger(f, e2);
|
||||
return;
|
||||
}
|
||||
|
||||
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
|
||||
handleFailedEvaluatedTrigger(f, scheduleContext, ie);
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -786,7 +768,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
}
|
||||
|
||||
private void handleEvaluateWorkerTriggerResult(SchedulerExecutionWithTrigger result, ZonedDateTime
|
||||
nextExecutionDate) {
|
||||
nextExecutionDate, AbstractTrigger abstractTrigger) {
|
||||
Optional.ofNullable(result)
|
||||
.ifPresent(executionWithTrigger -> {
|
||||
log(executionWithTrigger);
|
||||
@@ -797,6 +779,12 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
nextExecutionDate
|
||||
);
|
||||
|
||||
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
|
||||
// i.e., the trigger will not be locked
|
||||
if (abstractTrigger.isAllowConcurrent()) {
|
||||
trigger = trigger.toBuilder().executionId(null).build();
|
||||
}
|
||||
|
||||
// Worker triggers result is evaluated in another thread with the workerTriggerResultQueue.
|
||||
// We can then update the trigger directly.
|
||||
this.saveLastTriggerAndEmitExecution(executionWithTrigger.getExecution(), trigger, triggerToSave -> this.triggerState.update(triggerToSave));
|
||||
@@ -818,6 +806,12 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
if (result.getExecution().getState().getCurrent() == State.Type.FAILED) {
|
||||
trigger = trigger.resetExecution(State.Type.FAILED);
|
||||
}
|
||||
|
||||
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
|
||||
// i.e., the trigger will not be locked
|
||||
if (((AbstractTrigger)schedule).isAllowConcurrent()) {
|
||||
trigger = trigger.toBuilder().executionId(null).build();
|
||||
}
|
||||
|
||||
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
|
||||
// So we must save them by passing the scheduleContext.
|
||||
@@ -983,11 +977,43 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
));
|
||||
} catch (Exception e) {
|
||||
logError(flowWithTrigger, e);
|
||||
Execution failedExecution = createFailedExecution( flowWithTrigger, e);
|
||||
this.emitExecution(failedExecution, flowWithTrigger.getTriggerContext());
|
||||
return Optional.empty();
|
||||
}
|
||||
});
|
||||
}
|
||||
private Execution createFailedExecution(FlowWithWorkerTrigger flowWithTrigger, Throwable e){
|
||||
Execution execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(flowWithTrigger.getTriggerContext().getTenantId())
|
||||
.namespace(flowWithTrigger.getTriggerContext().getNamespace())
|
||||
.flowId(flowWithTrigger.getTriggerContext().getFlowId())
|
||||
.flowRevision(flowWithTrigger.getFlow().getRevision())
|
||||
.labels(LabelService.labelsExcludingSystem(flowWithTrigger.getFlow()))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
Logger logger = runContextFactory.of(flowWithTrigger.getFlow(), execution).logger();
|
||||
logger.error("[trigger: {}] [date: {}] Evaluate Failed with error '{}'" , flowWithTrigger.getAbstractTrigger().getId(), now(), e.getMessage());
|
||||
return execution;
|
||||
}
|
||||
private void handleFailedEvaluatedTrigger(FlowWithWorkerTrigger flowWithTrigger, ScheduleContextInterface scheduleContext, Throwable e ){
|
||||
|
||||
Execution execution = createFailedExecution(flowWithTrigger, e);
|
||||
ZonedDateTime nextExecutionDate;
|
||||
try {
|
||||
nextExecutionDate = this.nextEvaluationDate(flowWithTrigger.getAbstractTrigger());
|
||||
} catch (InvalidTriggerConfigurationException e2) {
|
||||
logError(flowWithTrigger, e2);
|
||||
disableInvalidTrigger(flowWithTrigger, e2);
|
||||
return;
|
||||
}
|
||||
|
||||
var trigger = flowWithTrigger.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
|
||||
trigger = trigger.checkBackfill();
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
|
||||
|
||||
}
|
||||
private void logError(FlowWithWorkerTrigger flowWithWorkerTriggerNextDate, Throwable e) {
|
||||
Logger logger = flowWithWorkerTriggerNextDate.getConditionContext().getRunContext().logger();
|
||||
|
||||
|
||||
@@ -91,6 +91,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
assertThat(queueCount.getCount()).isEqualTo(0L);
|
||||
assertThat(last.get()).isNotNull();
|
||||
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,6 +137,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
assertThat(queueCount.getCount()).isEqualTo(0L);
|
||||
assertThat(last.get()).isNotNull();
|
||||
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
|
||||
// Assert that the trigger is now disabled.
|
||||
// It needs to await on assertion as it will be disabled AFTER we receive a success execution.
|
||||
|
||||
@@ -489,9 +489,8 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getDisabled()).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(10));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void failedEvaluationTest() {
|
||||
void failedEvaluationFromFailedCondition() {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
Schedule schedule = createScheduleTrigger("Europe/Paris", "* * * * *", "failedEvaluation", false)
|
||||
@@ -527,6 +526,61 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
assertThat(execution).isNotNull();
|
||||
assertThat(execution.getFlowId()).isEqualTo(flow.getId());
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
queueCount.countDown();
|
||||
});
|
||||
|
||||
scheduler.run();
|
||||
|
||||
queueCount.await(1, TimeUnit.MINUTES);
|
||||
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(queueCount.getCount()).isEqualTo(0L);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
@Test
|
||||
void failedEvaluationFromInvalidExpression() {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
Schedule schedule = createScheduleTrigger("Europe/Paris", "* * * * *", "failedEvaluation", false)
|
||||
.inputs(
|
||||
Map.of("invalidExpressionInput", Expression.builder()
|
||||
.type(Expression.class.getName())
|
||||
.expression(Property.ofExpression("{{ now().hour == 0 ? 3 : 2 }}"))
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.build();
|
||||
FlowWithSource flow = createFlow(this.tenantId,Collections.singletonList(schedule));
|
||||
doReturn(List.of(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// to avoid waiting too much before a trigger execution, we add a last trigger with a date now - 1m.
|
||||
Trigger lastTrigger = Trigger
|
||||
.builder()
|
||||
.triggerId("failedEvaluation")
|
||||
.tenantId(this.tenantId)
|
||||
.flowId(flow.getId())
|
||||
.namespace(flow.getNamespace())
|
||||
.date(ZonedDateTime.now().minusMinutes(1L))
|
||||
.build();
|
||||
triggerState.create(lastTrigger);
|
||||
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
assertThat(execution).isNotNull();
|
||||
assertThat(execution.getFlowId()).isEqualTo(flow.getId());
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
|
||||
@@ -104,6 +104,7 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
|
||||
assertThat(SchedulerStreamingTest.startedEvaluate.get(false), is(1));
|
||||
assertThat(last.getTrigger().getVariables().get("startedEvaluate"), is(1));
|
||||
assertTrue(last.getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(last.getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -149,8 +149,7 @@ public class CommandsWrapper implements TaskCommands {
|
||||
|
||||
public <T extends TaskRunnerDetailResult> ScriptOutput run() throws Exception {
|
||||
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
|
||||
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class);
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
||||
}
|
||||
|
||||
TaskRunner<T> realTaskRunner = this.getTaskRunner();
|
||||
@@ -158,9 +157,7 @@ public class CommandsWrapper implements TaskCommands {
|
||||
FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
|
||||
}
|
||||
|
||||
RunContextInitializer initializer = ((DefaultRunContext) runContext).getApplicationContext().getBean(RunContextInitializer.class);
|
||||
|
||||
RunContext taskRunnerRunContext = initializer.forPlugin(((DefaultRunContext) runContext).clone(), realTaskRunner);
|
||||
RunContext taskRunnerRunContext = runContext.cloneForPlugin(realTaskRunner);
|
||||
|
||||
List<String> renderedCommands = this.renderCommands(runContext, commands);
|
||||
List<String> renderedBeforeCommands = this.renderCommands(runContext, beforeCommands);
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
<meta http-equiv="X-UA-Compatible" content="IE=edge">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
|
||||
<link rel="icon" href="/favicon.ico">
|
||||
<link rel="icon" type="image/png" sizes="192x192" href="/favicon-192x192.png">
|
||||
<link rel="icon" type="image/png" sizes="192x192" href="/favicon-192x192.png">
|
||||
<meta name="msapplication-TileColor" content="#192A4E">
|
||||
<meta name="theme-color" content="#192A4E">
|
||||
<link rel="stylesheet" href="/loader.css" />
|
||||
@@ -26,26 +26,6 @@
|
||||
document.getElementsByTagName("html")[0].classList.add(localStorage.getItem("theme"));
|
||||
}
|
||||
</script>
|
||||
|
||||
<!-- Optional but recommended for faster connection -->
|
||||
<link rel="preconnect" href="https://fonts.googleapis.com">
|
||||
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
|
||||
|
||||
<!-- Load Google Fonts non-blocking -->
|
||||
<link
|
||||
rel="stylesheet"
|
||||
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;600;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
|
||||
media="print"
|
||||
onload="this.media='all'"
|
||||
>
|
||||
|
||||
<!-- Fallback for when JavaScript is disabled -->
|
||||
<noscript>
|
||||
<link
|
||||
rel="stylesheet"
|
||||
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;600;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
|
||||
>
|
||||
</noscript>
|
||||
</head>
|
||||
<body>
|
||||
<noscript>
|
||||
|
||||
1179
ui/package-lock.json
generated
1179
ui/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -24,11 +24,11 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@js-joda/core": "^5.6.5",
|
||||
"@kestra-io/ui-libs": "^0.0.264",
|
||||
"@kestra-io/ui-libs": "^0.0.268",
|
||||
"@vue-flow/background": "^1.3.2",
|
||||
"@vue-flow/controls": "^1.1.2",
|
||||
"@vue-flow/core": "^1.47.0",
|
||||
"@vueuse/core": "^14.0.0",
|
||||
"@vue-flow/core": "^1.48.0",
|
||||
"@vueuse/core": "^14.1.0",
|
||||
"ansi-to-html": "^0.7.2",
|
||||
"axios": "^1.13.2",
|
||||
"bootstrap": "^5.3.8",
|
||||
@@ -39,8 +39,8 @@
|
||||
"cytoscape": "^3.33.0",
|
||||
"dagre": "^0.8.5",
|
||||
"dotenv": "^17.2.3",
|
||||
"element-plus": "2.11.8",
|
||||
"humanize-duration": "^3.33.1",
|
||||
"element-plus": "2.12.0",
|
||||
"humanize-duration": "^3.33.2",
|
||||
"js-yaml": "^4.1.1",
|
||||
"lodash": "^4.17.21",
|
||||
"mailchecker": "^6.0.19",
|
||||
@@ -57,33 +57,33 @@
|
||||
"moment-timezone": "^0.5.46",
|
||||
"nprogress": "^0.2.0",
|
||||
"path-browserify": "^1.0.1",
|
||||
"pdfjs-dist": "^5.4.394",
|
||||
"pdfjs-dist": "^5.4.449",
|
||||
"pinia": "^3.0.4",
|
||||
"posthog-js": "^1.291.0",
|
||||
"posthog-js": "^1.308.0",
|
||||
"rapidoc": "^9.3.8",
|
||||
"semver": "^7.7.3",
|
||||
"shiki": "^3.15.0",
|
||||
"vue": "^3.5.24",
|
||||
"shiki": "^3.20.0",
|
||||
"vue": "^3.5.25",
|
||||
"vue-axios": "^3.5.2",
|
||||
"vue-chartjs": "^5.3.3",
|
||||
"vue-gtag": "^3.6.3",
|
||||
"vue-i18n": "^11.2.2",
|
||||
"vue-material-design-icons": "^5.3.1",
|
||||
"vue-router": "^4.6.3",
|
||||
"vue-sidebar-menu": "^5.8.0",
|
||||
"vue-router": "^4.6.4",
|
||||
"vue-sidebar-menu": "^5.9.1",
|
||||
"vue-virtual-scroller": "^2.0.0-beta.8",
|
||||
"vue3-popper": "^1.5.0",
|
||||
"vue3-tour": "github:kestra-io/vue3-tour",
|
||||
"xss": "^1.0.15",
|
||||
"yaml": "^2.8.1"
|
||||
"yaml": "^2.8.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@codecov/vite-plugin": "^1.9.1",
|
||||
"@esbuild-plugins/node-modules-polyfill": "^0.2.2",
|
||||
"@eslint/js": "^9.39.1",
|
||||
"@playwright/test": "^1.56.1",
|
||||
"@eslint/js": "^9.39.2",
|
||||
"@playwright/test": "^1.57.0",
|
||||
"@rushstack/eslint-patch": "^1.14.1",
|
||||
"@shikijs/markdown-it": "^3.15.0",
|
||||
"@shikijs/markdown-it": "^3.20.0",
|
||||
"@storybook/addon-themes": "^9.1.16",
|
||||
"@storybook/addon-vitest": "^9.1.16",
|
||||
"@storybook/test-runner": "^0.23.0",
|
||||
@@ -91,58 +91,58 @@
|
||||
"@types/humanize-duration": "^3.27.4",
|
||||
"@types/js-yaml": "^4.0.9",
|
||||
"@types/moment": "^2.13.0",
|
||||
"@types/node": "^24.10.2",
|
||||
"@types/node": "^25.0.3",
|
||||
"@types/nprogress": "^0.2.3",
|
||||
"@types/path-browserify": "^1.0.3",
|
||||
"@types/semver": "^7.7.1",
|
||||
"@types/testing-library__jest-dom": "^6.0.0",
|
||||
"@types/testing-library__user-event": "^4.2.0",
|
||||
"@typescript-eslint/parser": "^8.46.4",
|
||||
"@vitejs/plugin-vue": "^6.0.2",
|
||||
"@typescript-eslint/parser": "^8.50.0",
|
||||
"@vitejs/plugin-vue": "^6.0.3",
|
||||
"@vitejs/plugin-vue-jsx": "^5.1.2",
|
||||
"@vitest/browser": "^3.2.4",
|
||||
"@vitest/coverage-v8": "^3.2.4",
|
||||
"@vue/eslint-config-prettier": "^10.2.0",
|
||||
"@vue/test-utils": "^2.4.6",
|
||||
"@vueuse/router": "^14.0.0",
|
||||
"@vueuse/router": "^14.1.0",
|
||||
"change-case": "5.4.4",
|
||||
"cross-env": "^10.1.0",
|
||||
"eslint": "^9.39.1",
|
||||
"eslint": "^9.39.2",
|
||||
"eslint-plugin-storybook": "^9.1.16",
|
||||
"eslint-plugin-vue": "^9.33.0",
|
||||
"globals": "^16.5.0",
|
||||
"husky": "^9.1.7",
|
||||
"jsdom": "^27.1.0",
|
||||
"lint-staged": "^16.2.6",
|
||||
"jsdom": "^27.3.0",
|
||||
"lint-staged": "^16.2.7",
|
||||
"monaco-editor": "^0.52.2",
|
||||
"monaco-yaml": "5.3.1",
|
||||
"patch-package": "^8.0.1",
|
||||
"playwright": "^1.55.0",
|
||||
"prettier": "^3.6.2",
|
||||
"rimraf": "^6.1.0",
|
||||
"rolldown-vite": "^7.2.5",
|
||||
"prettier": "^3.7.4",
|
||||
"rimraf": "^6.1.2",
|
||||
"rolldown-vite": "^7.2.11",
|
||||
"rollup-plugin-copy": "^3.5.0",
|
||||
"sass": "^1.93.3",
|
||||
"sass": "^1.97.0",
|
||||
"storybook": "^9.1.16",
|
||||
"storybook-vue3-router": "^6.0.2",
|
||||
"ts-node": "^10.9.2",
|
||||
"typescript": "^5.9.3",
|
||||
"typescript-eslint": "^8.46.4",
|
||||
"typescript-eslint": "^8.50.0",
|
||||
"uuid": "^13.0.0",
|
||||
"vite": "npm:rolldown-vite@latest",
|
||||
"vitest": "^3.2.4",
|
||||
"vue-tsc": "^3.1.4"
|
||||
"vue-tsc": "^3.1.8"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@esbuild/darwin-arm64": "^0.27.1",
|
||||
"@esbuild/darwin-x64": "^0.27.1",
|
||||
"@esbuild/linux-x64": "^0.27.1",
|
||||
"@rollup/rollup-darwin-arm64": "^4.53.3",
|
||||
"@rollup/rollup-darwin-x64": "^4.53.3",
|
||||
"@rollup/rollup-linux-x64-gnu": "^4.53.3",
|
||||
"@swc/core-darwin-arm64": "^1.15.3",
|
||||
"@swc/core-darwin-x64": "^1.15.3",
|
||||
"@swc/core-linux-x64-gnu": "^1.15.3"
|
||||
"@esbuild/darwin-arm64": "^0.27.2",
|
||||
"@esbuild/darwin-x64": "^0.27.2",
|
||||
"@esbuild/linux-x64": "^0.27.2",
|
||||
"@rollup/rollup-darwin-arm64": "^4.53.5",
|
||||
"@rollup/rollup-darwin-x64": "^4.53.5",
|
||||
"@rollup/rollup-linux-x64-gnu": "^4.53.5",
|
||||
"@swc/core-darwin-arm64": "^1.15.5",
|
||||
"@swc/core-darwin-x64": "^1.15.5",
|
||||
"@swc/core-linux-x64-gnu": "^1.15.5"
|
||||
},
|
||||
"overrides": {
|
||||
"bootstrap": {
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
title?: string;
|
||||
message?: string;
|
||||
content?: {
|
||||
message: string;
|
||||
message?: string;
|
||||
_embedded?: {
|
||||
errors?: any[];
|
||||
};
|
||||
@@ -19,8 +19,8 @@
|
||||
response?: {
|
||||
status: number;
|
||||
config: {
|
||||
url: string;
|
||||
method: string;
|
||||
url?: string;
|
||||
method?: string;
|
||||
};
|
||||
};
|
||||
variant?: "success" | "warning" | "info" | "error" | "primary";
|
||||
@@ -97,8 +97,8 @@
|
||||
if (props.message.response) {
|
||||
error.error.response = {};
|
||||
error.error.request = {
|
||||
method: props.message.response.config.method,
|
||||
url: props.message.response.config.url,
|
||||
method: props.message.response.config.method ?? "GET",
|
||||
url: props.message.response.config.url ?? "unknown url",
|
||||
};
|
||||
|
||||
if (props.message.response.status) {
|
||||
@@ -111,7 +111,11 @@
|
||||
notifications.value = ElNotification({
|
||||
title: title.value || "Error",
|
||||
message: h(ErrorToastContainer, {
|
||||
message: props.message,
|
||||
message: {
|
||||
content:{
|
||||
message: props.message?.content?.message ?? ""
|
||||
}
|
||||
},
|
||||
items: items.value,
|
||||
onClose: () => close()
|
||||
}),
|
||||
|
||||
@@ -204,24 +204,26 @@
|
||||
className="row-action"
|
||||
>
|
||||
<template #default="scope">
|
||||
<el-button v-if="scope.row.executionId || scope.row.evaluateRunningDate">
|
||||
<Kicon
|
||||
:tooltip="$t(`unlock trigger.tooltip.${scope.row.executionId ? 'execution' : 'evaluation'}`)"
|
||||
placement="left"
|
||||
@click="triggerToUnlock = scope.row"
|
||||
>
|
||||
<LockOff />
|
||||
</Kicon>
|
||||
</el-button>
|
||||
<el-button>
|
||||
<Kicon
|
||||
:tooltip="$t('delete trigger')"
|
||||
placement="left"
|
||||
@click="confirmDeleteTrigger(scope.row)"
|
||||
>
|
||||
<Delete />
|
||||
</Kicon>
|
||||
</el-button>
|
||||
<div class="action-container">
|
||||
<el-button v-if="scope.row.executionId || scope.row.evaluateRunningDate">
|
||||
<Kicon
|
||||
:tooltip="$t(`unlock trigger.tooltip.${scope.row.executionId ? 'execution' : 'evaluation'}`)"
|
||||
placement="left"
|
||||
@click="triggerToUnlock = scope.row"
|
||||
>
|
||||
<LockOff />
|
||||
</Kicon>
|
||||
</el-button>
|
||||
<el-button>
|
||||
<Kicon
|
||||
:tooltip="$t('delete trigger')"
|
||||
placement="left"
|
||||
@click="confirmDeleteTrigger(scope.row)"
|
||||
>
|
||||
<Delete />
|
||||
</Kicon>
|
||||
</el-button>
|
||||
</div>
|
||||
</template>
|
||||
</el-table-column>
|
||||
<el-table-column :label="$t('backfill')" columnKey="backfill">
|
||||
@@ -855,6 +857,12 @@
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
.action-container {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 5px;
|
||||
}
|
||||
|
||||
.statusIcon {
|
||||
font-size: large;
|
||||
}
|
||||
@@ -927,4 +935,4 @@
|
||||
}
|
||||
}
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
@@ -271,7 +271,9 @@
|
||||
width: 100%;
|
||||
max-width: 400px;
|
||||
padding: 1rem;
|
||||
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
justify-content: center;
|
||||
.logo {
|
||||
width: 250px;
|
||||
margin-bottom: 40px;
|
||||
|
||||
@@ -7,35 +7,27 @@
|
||||
<el-steps :space="60" direction="vertical" :active="activeStep" finishStatus="success">
|
||||
<el-step :icon="activeStep > 0 ? CheckBold : AccountPlus" :title="t('setup.steps.user')" :class="{'primary-icon': activeStep <= 0}" />
|
||||
<el-step
|
||||
:icon="activeStep > 1 ? CheckBold : Cogs"
|
||||
:title="t('setup.steps.config')"
|
||||
:class="{'primary-icon': activeStep <= 1}"
|
||||
/>
|
||||
<el-step
|
||||
:icon="activeStep > 2 ? CheckBold : MessageOutline"
|
||||
:icon="activeStep > 1 ? CheckBold : MessageOutline"
|
||||
:title="t('setup.steps.survey')"
|
||||
:class="{'primary-icon': activeStep <= 2}"
|
||||
:class="{'primary-icon': activeStep <= 1}"
|
||||
/>
|
||||
<el-step :icon="LightningBolt" :title="t('setup.steps.complete')" class="primary-icon" />
|
||||
</el-steps>
|
||||
</el-col>
|
||||
<el-col :xs="24" :md="16" class="setup-main">
|
||||
<el-card class="setup-card">
|
||||
<template #header v-if="activeStep !== 3">
|
||||
<template #header v-if="activeStep !== 2">
|
||||
<div class="card-header">
|
||||
<el-text size="large" class="header-title" v-if="activeStep === 0">
|
||||
{{ t('setup.titles.user') }}
|
||||
</el-text>
|
||||
<el-text size="large" class="header-title" v-else-if="activeStep === 1">
|
||||
Welcome {{ userFormData.firstName }}
|
||||
</el-text>
|
||||
<el-text size="large" class="header-title" v-else-if="activeStep === 2">
|
||||
{{ t('setup.titles.survey') }}
|
||||
</el-text>
|
||||
<el-text class="d-block mt-4">
|
||||
{{ subtitles[activeStep] }}
|
||||
</el-text>
|
||||
<el-button v-if="activeStep === 2" class="skip-button" @click="handleSurveySkip()">
|
||||
<el-button v-if="activeStep === 1" class="skip-button" @click="handleSurveySkip()">
|
||||
{{ t('setup.survey.skip') }}
|
||||
</el-button>
|
||||
</div>
|
||||
@@ -98,45 +90,7 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="d-flex flex-column gap-4" v-else-if="activeStep === 1">
|
||||
<el-card v-if="isLoading">
|
||||
<el-text>Loading configuration...</el-text>
|
||||
</el-card>
|
||||
<el-card v-else-if="setupConfigurationLines.length > 0">
|
||||
<el-row
|
||||
v-for="config in setupConfigurationLines"
|
||||
:key="config.name"
|
||||
class="lh-lg mt-1 mb-1 align-items-center gap-2"
|
||||
>
|
||||
<component :is="config.icon" />
|
||||
<el-text size="small">
|
||||
{{ t("setup.config." + config.name) }}
|
||||
</el-text>
|
||||
<el-divider class="m-auto" />
|
||||
<Check class="text-success" v-if="config.value === true" />
|
||||
<Close class="text-danger" v-else-if="config.value === false" />
|
||||
<el-text v-else size="small">
|
||||
{{ config.value === "NOT SETUP" ? config.value : config.value.toString().capitalize() }}
|
||||
</el-text>
|
||||
</el-row>
|
||||
</el-card>
|
||||
<el-card v-else>
|
||||
<el-text>No configuration data available</el-text>
|
||||
</el-card>
|
||||
<el-text class="align-self-start">
|
||||
{{ t("setup.confirm.config_title") }}
|
||||
</el-text>
|
||||
<div class="d-flex align-self-start">
|
||||
<el-button @click="previousStep()">
|
||||
{{ t("setup.confirm.not_valid") }}
|
||||
</el-button>
|
||||
<el-button type="primary" @click="initBasicAuth()">
|
||||
{{ t("setup.confirm.valid") }}
|
||||
</el-button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div v-else-if="activeStep === 2">
|
||||
<div v-else-if="activeStep === 1">
|
||||
<el-form ref="surveyForm" labelPosition="top" :model="surveyData" :showMessage="false">
|
||||
<el-form-item :label="t('setup.survey.company_size')">
|
||||
<el-radio-group v-model="surveyData.companySize" class="survey-radio-group">
|
||||
@@ -183,7 +137,7 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div v-else-if="activeStep === 3" class="success-step">
|
||||
<div v-else-if="activeStep === 2" class="success-step">
|
||||
<img :src="success" alt="success" class="success-img">
|
||||
<div class="success-content">
|
||||
<h1 class="success-title">
|
||||
@@ -212,19 +166,12 @@
|
||||
import {useSurveySkip} from "../../composables/useSurveyData"
|
||||
import {initPostHogForSetup, trackSetupEvent} from "../../composables/usePosthog"
|
||||
|
||||
import Cogs from "vue-material-design-icons/Cogs.vue"
|
||||
import AccountPlus from "vue-material-design-icons/AccountPlus.vue"
|
||||
import LightningBolt from "vue-material-design-icons/LightningBolt.vue"
|
||||
import MessageOutline from "vue-material-design-icons/MessageOutline.vue"
|
||||
import Logo from "../home/Logo.vue"
|
||||
import Check from "vue-material-design-icons/Check.vue"
|
||||
import Close from "vue-material-design-icons/Close.vue"
|
||||
import CheckBold from "vue-material-design-icons/CheckBold.vue"
|
||||
import InformationOutline from "vue-material-design-icons/InformationOutline.vue"
|
||||
import Database from "vue-material-design-icons/Database.vue"
|
||||
import CurrentDc from "vue-material-design-icons/CurrentDc.vue"
|
||||
import CloudOutline from "vue-material-design-icons/CloudOutline.vue"
|
||||
import Lock from "vue-material-design-icons/Lock.vue"
|
||||
import success from "../../assets/success.svg"
|
||||
import * as BasicAuth from "../../utils/basicAuth";
|
||||
|
||||
@@ -241,12 +188,6 @@
|
||||
newsletter: boolean
|
||||
}
|
||||
|
||||
interface ConfigLine {
|
||||
name: string
|
||||
icon: any
|
||||
value: any
|
||||
}
|
||||
|
||||
interface CompanySizeOption {
|
||||
value: string
|
||||
label: string
|
||||
@@ -258,8 +199,6 @@
|
||||
const {storeSurveySkipData} = useSurveySkip()
|
||||
|
||||
const activeStep = ref(0)
|
||||
const isLoading = ref(true)
|
||||
const usageData = ref<any>(null)
|
||||
const userForm: Ref<any> = ref(null)
|
||||
const surveyForm: Ref<any> = ref(null)
|
||||
|
||||
@@ -277,7 +216,6 @@
|
||||
})
|
||||
|
||||
const formData = computed(() => userFormData.value)
|
||||
const setupConfiguration = computed(() => usageData.value?.configurations ?? {})
|
||||
|
||||
const initializeSetup = async () => {
|
||||
try {
|
||||
@@ -294,12 +232,8 @@
|
||||
|
||||
localStorage.setItem("basicAuthSetupInProgress", "true")
|
||||
localStorage.setItem("setupStartTime", Date.now().toString())
|
||||
|
||||
usageData.value = await miscStore.loadAllUsages()
|
||||
} catch {
|
||||
/* Silently handle usage data loading errors */
|
||||
} finally {
|
||||
isLoading.value = false
|
||||
/* Silently handle config loading errors */
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,23 +245,8 @@
|
||||
}
|
||||
})
|
||||
|
||||
const setupConfigurationLines = computed<ConfigLine[]>(() => {
|
||||
if (!setupConfiguration.value) return []
|
||||
const configs = miscStore.configs
|
||||
|
||||
const basicAuthValue = activeStep.value >= 1 || configs?.isBasicAuthInitialized
|
||||
|
||||
return [
|
||||
{name: "repository", icon: Database, value: setupConfiguration.value.repositoryType || "NOT SETUP"},
|
||||
{name: "queue", icon: CurrentDc, value: setupConfiguration.value.queueType || "NOT SETUP"},
|
||||
{name: "storage", icon: CloudOutline, value: setupConfiguration.value.storageType || "NOT SETUP"},
|
||||
{name: "basicauth", icon: Lock, value: basicAuthValue}
|
||||
]
|
||||
})
|
||||
|
||||
const subtitles = computed(() => [
|
||||
t("setup.subtitles.user"),
|
||||
t("setup.subtitles.config"),
|
||||
t("setup.subtitles.survey"),
|
||||
])
|
||||
|
||||
@@ -389,14 +308,6 @@
|
||||
return field?.validateState === "error" ? field.validateMessage : null
|
||||
}
|
||||
|
||||
const nextStep = () => {
|
||||
activeStep.value++
|
||||
}
|
||||
|
||||
const previousStep = () => {
|
||||
activeStep.value--
|
||||
}
|
||||
|
||||
const handleUserFormSubmit = async () => {
|
||||
try {
|
||||
await miscStore.addBasicAuth({
|
||||
@@ -410,12 +321,6 @@
|
||||
|
||||
await miscStore.loadConfigs()
|
||||
|
||||
try {
|
||||
usageData.value = await miscStore.loadAllUsages()
|
||||
} catch {
|
||||
/* Silently handle usage data loading */
|
||||
}
|
||||
|
||||
trackSetupEvent("setup_flow:account_created", {
|
||||
user_firstname: userFormData.value.firstName,
|
||||
user_lastname: userFormData.value.lastName,
|
||||
@@ -425,7 +330,7 @@
|
||||
|
||||
localStorage.setItem("basicAuthUserCreated", "true")
|
||||
|
||||
nextStep()
|
||||
activeStep.value = 1
|
||||
} catch (error: any) {
|
||||
trackSetupEvent("setup_flow:account_creation_failed", {
|
||||
error_message: error.message || "Unknown error"
|
||||
@@ -434,10 +339,6 @@
|
||||
}
|
||||
}
|
||||
|
||||
const initBasicAuth = () => {
|
||||
nextStep()
|
||||
}
|
||||
|
||||
const handleSurveyContinue = () => {
|
||||
localStorage.setItem("basicAuthSurveyData", JSON.stringify(surveyData.value))
|
||||
|
||||
@@ -459,7 +360,7 @@
|
||||
...surveySelections
|
||||
}, userFormData.value)
|
||||
|
||||
nextStep()
|
||||
activeStep.value = 2
|
||||
}
|
||||
|
||||
const handleSurveySkip = () => {
|
||||
@@ -481,7 +382,7 @@
|
||||
...surveySelections
|
||||
}, userFormData.value)
|
||||
|
||||
nextStep()
|
||||
activeStep.value = 2
|
||||
}
|
||||
|
||||
const completeSetup = () => {
|
||||
|
||||
@@ -24,6 +24,7 @@ $checkbox-checked-color: #8405FF;
|
||||
width: 100%;
|
||||
margin: 0 auto;
|
||||
padding-top: 2rem;
|
||||
height: fit-content;
|
||||
|
||||
@media screen and (min-width: 992px) {
|
||||
gap: 3rem;
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
/>
|
||||
</el-form-item>
|
||||
<el-form-item>
|
||||
<el-button @click="expandCollapseAll()" :disabled="raw_view">
|
||||
<el-button @click="expandCollapseAll()" :disabled="raw_view" :icon="logDisplayButtonIcon">
|
||||
{{ logDisplayButtonText }}
|
||||
</el-button>
|
||||
</el-form-item>
|
||||
@@ -32,7 +32,7 @@
|
||||
<el-tooltip
|
||||
:content="!raw_view ? $t('logs_view.raw_details') : $t('logs_view.compact_details')"
|
||||
>
|
||||
<el-button @click="toggleViewType">
|
||||
<el-button @click="toggleViewType" :icon="logViewTypeButtonIcon">
|
||||
{{ !raw_view ? $t('logs_view.raw') : $t('logs_view.compact') }}
|
||||
</el-button>
|
||||
</el-tooltip>
|
||||
@@ -121,6 +121,10 @@
|
||||
import TaskRunDetails from "../logs/TaskRunDetails.vue";
|
||||
import Download from "vue-material-design-icons/Download.vue";
|
||||
import ContentCopy from "vue-material-design-icons/ContentCopy.vue";
|
||||
import UnfoldMoreHorizontal from "vue-material-design-icons/UnfoldMoreHorizontal.vue";
|
||||
import UnfoldLessHorizontal from "vue-material-design-icons/UnfoldLessHorizontal.vue";
|
||||
import ViewList from "vue-material-design-icons/ViewList.vue";
|
||||
import ViewGrid from "vue-material-design-icons/ViewGrid.vue";
|
||||
import Kicon from "../Kicon.vue";
|
||||
import LogLevelNavigator from "../logs/LogLevelNavigator.vue";
|
||||
import {DynamicScroller, DynamicScrollerItem} from "vue-virtual-scroller";
|
||||
@@ -135,6 +139,7 @@
|
||||
import {mapStores} from "pinia";
|
||||
import {useExecutionsStore} from "../../stores/executions";
|
||||
import KSFilter from "../filter/components/KSFilter.vue";
|
||||
import {storageKeys} from "../../utils/constants";
|
||||
|
||||
export default {
|
||||
components: {
|
||||
@@ -157,7 +162,7 @@
|
||||
level: undefined,
|
||||
filter: undefined,
|
||||
openedTaskrunsCount: 0,
|
||||
raw_view: false,
|
||||
raw_view: (localStorage.getItem(storageKeys.LOGS_VIEW_TYPE) ?? "false").toLowerCase() === "true",
|
||||
logIndicesByLevel: Object.fromEntries(LogUtils.levelOrLower(undefined).map(level => [level, []])),
|
||||
logCursor: undefined
|
||||
};
|
||||
@@ -209,6 +214,12 @@
|
||||
logDisplayButtonText() {
|
||||
return this.openedTaskrunsCount === 0 ? this.$t("expand all") : this.$t("collapse all")
|
||||
},
|
||||
logDisplayButtonIcon() {
|
||||
return this.openedTaskrunsCount === 0 ? UnfoldMoreHorizontal : UnfoldLessHorizontal;
|
||||
},
|
||||
logViewTypeButtonIcon() {
|
||||
return this.raw_view ? ViewGrid : ViewList;
|
||||
},
|
||||
currentLevelOrLower() {
|
||||
return LogUtils.levelOrLower(this.level);
|
||||
},
|
||||
@@ -292,6 +303,7 @@
|
||||
toggleViewType() {
|
||||
this.logCursor = undefined;
|
||||
this.raw_view = !this.raw_view;
|
||||
localStorage.setItem(storageKeys.LOGS_VIEW_TYPE, String(this.raw_view));
|
||||
},
|
||||
sortLogsByViewOrder(a, b) {
|
||||
const aSplit = a.split("/");
|
||||
|
||||
@@ -18,10 +18,10 @@
|
||||
</template>
|
||||
|
||||
<template #footer>
|
||||
<el-button @click="isOpen = false">
|
||||
<el-button @click="onCancel">
|
||||
{{ t("cancel") }}
|
||||
</el-button>
|
||||
<el-button type="primary" @click="setLabels()">
|
||||
<el-button type="primary" :loading="isSaving" @click="setLabels()">
|
||||
{{ t("ok") }}
|
||||
</el-button>
|
||||
</template>
|
||||
@@ -32,7 +32,7 @@
|
||||
<el-form-item :label="t('execution labels')">
|
||||
<LabelInput
|
||||
v-model:labels="executionLabels"
|
||||
:existingLabels="execution.labels"
|
||||
:existingLabels="executionLabels"
|
||||
/>
|
||||
</el-form-item>
|
||||
</el-form>
|
||||
@@ -86,6 +86,7 @@
|
||||
|
||||
const isOpen = ref(false);
|
||||
const executionLabels = ref<Label[]>([]);
|
||||
const isSaving = ref(false);
|
||||
|
||||
const enabled = computed(() => {
|
||||
if (
|
||||
@@ -100,6 +101,12 @@
|
||||
return !State.isRunning(props.execution.state.current);
|
||||
});
|
||||
|
||||
const onCancel = () => {
|
||||
// discard temp and close dialog without mutating parent
|
||||
isOpen.value = false;
|
||||
executionLabels.value = [];
|
||||
};
|
||||
|
||||
const setLabels = async () => {
|
||||
const filtered = filterValidLabels(executionLabels.value);
|
||||
|
||||
@@ -108,31 +115,42 @@
|
||||
return;
|
||||
}
|
||||
|
||||
isOpen.value = false;
|
||||
isSaving.value = true;
|
||||
try {
|
||||
const response = await executionsStore.setLabels({
|
||||
labels: filtered.labels,
|
||||
executionId: props.execution.id,
|
||||
});
|
||||
executionsStore.execution = response.data;
|
||||
|
||||
if (response && response.data) {
|
||||
executionsStore.execution = response.data;
|
||||
}
|
||||
|
||||
toast.success(t("Set labels done"));
|
||||
|
||||
// close and clear only after success
|
||||
isOpen.value = false;
|
||||
executionLabels.value = [];
|
||||
} catch (err) {
|
||||
console.error(err); // Error handling is done by the store/interceptor
|
||||
console.error(err); // keep dialog open so user can fix / retry
|
||||
} finally {
|
||||
isSaving.value = false;
|
||||
}
|
||||
};
|
||||
|
||||
watch(isOpen, () => {
|
||||
executionLabels.value = [];
|
||||
// initialize the temp clone only when opening the dialog
|
||||
watch(isOpen, (open) => {
|
||||
if (open) {
|
||||
const toIgnore = miscStore.configs?.hiddenLabelsPrefixes || [];
|
||||
const source = props.execution.labels || [];
|
||||
|
||||
const toIgnore = miscStore.configs?.hiddenLabelsPrefixes || [];
|
||||
// deep clone so child edits never mutate the original
|
||||
executionLabels.value = JSON.parse(JSON.stringify(source || []))
|
||||
.filter((label: Label) => !toIgnore.some((prefix: string) => label.key?.startsWith(prefix)));
|
||||
|
||||
if (props.execution.labels) {
|
||||
executionLabels.value = props.execution.labels.filter(
|
||||
(label) =>
|
||||
!toIgnore.some((prefix: string) =>
|
||||
label.key?.startsWith(prefix),
|
||||
),
|
||||
);
|
||||
} else {
|
||||
// when dialog closed, clear temp state (safe-guard)
|
||||
executionLabels.value = [];
|
||||
}
|
||||
});
|
||||
</script>
|
||||
|
||||
@@ -478,8 +478,6 @@
|
||||
<style scoped lang="scss">
|
||||
@import "@kestra-io/ui-libs/src/scss/variables";
|
||||
|
||||
$font-size-sm: $font-size-base * 0.875; // TODO: Move it into varaibles file of ui-libs
|
||||
|
||||
#overview {
|
||||
:deep(.el-splitter-panel:has(> .sidebar:first-child)) {
|
||||
background-color: var(--ks-background-table-row);
|
||||
|
||||
@@ -25,22 +25,18 @@ export function applyDefaultFilters(
|
||||
includeScope,
|
||||
legacyQuery,
|
||||
}: DefaultFilterOptions = {}): { query: LocationQuery, change: boolean } {
|
||||
|
||||
if(currentQuery && Object.keys(currentQuery).length > 0) {
|
||||
return {
|
||||
query: currentQuery,
|
||||
change: false,
|
||||
}
|
||||
}
|
||||
|
||||
const query = {...currentQuery};
|
||||
let change = false;
|
||||
|
||||
if (namespace === undefined && defaultNamespace() && !hasFilterKey(query, NAMESPACE_FILTER_PREFIX)) {
|
||||
query[legacyQuery ? "namespace" : `${NAMESPACE_FILTER_PREFIX}[PREFIX]`] = defaultNamespace();
|
||||
change = true;
|
||||
}
|
||||
|
||||
if (includeScope && !hasFilterKey(query, SCOPE_FILTER_PREFIX)) {
|
||||
query[legacyQuery ? "scope" : `${SCOPE_FILTER_PREFIX}[EQUALS]`] = "USER";
|
||||
change = true;
|
||||
}
|
||||
|
||||
const TIME_FILTER_KEYS = /startDate|endDate|timeRange/;
|
||||
@@ -48,9 +44,10 @@ export function applyDefaultFilters(
|
||||
if (includeTimeRange && !Object.keys(query).some(key => TIME_FILTER_KEYS.test(key))) {
|
||||
const defaultDuration = useMiscStore().configs?.chartDefaultDuration ?? "P30D";
|
||||
query[legacyQuery ? "timeRange" : `${TIME_RANGE_FILTER_PREFIX}[EQUALS]`] = defaultDuration;
|
||||
change = true;
|
||||
}
|
||||
|
||||
return {query, change: true};
|
||||
return {query, change};
|
||||
}
|
||||
|
||||
export function useDefaultFilter(
|
||||
|
||||
@@ -20,6 +20,9 @@
|
||||
import {useVueTour} from "../../composables/useVueTour";
|
||||
|
||||
import type {BlueprintType} from "../../stores/blueprints"
|
||||
import {useAuthStore} from "../../override/stores/auth";
|
||||
import permission from "../../models/permission";
|
||||
import action from "../../models/action";
|
||||
|
||||
const route = useRoute();
|
||||
const {t} = useI18n();
|
||||
@@ -29,13 +32,21 @@
|
||||
const blueprintsStore = useBlueprintsStore();
|
||||
const coreStore = useCoreStore();
|
||||
const flowStore = useFlowStore();
|
||||
const authStore = useAuthStore();
|
||||
|
||||
const setupFlow = async () => {
|
||||
const blueprintId = route.query.blueprintId as string;
|
||||
const blueprintSource = route.query.blueprintSource as BlueprintType;
|
||||
const implicitDefaultNamespace = authStore.user.getNamespacesForAction(
|
||||
permission.FLOW,
|
||||
action.CREATE,
|
||||
)[0];
|
||||
let flowYaml = "";
|
||||
const id = getRandomID();
|
||||
const selectedNamespace = (route.query.namespace as string) || defaultNamespace() || "company.team";
|
||||
const selectedNamespace = (route.query.namespace as string)
|
||||
?? defaultNamespace()
|
||||
?? implicitDefaultNamespace
|
||||
?? "company.team";
|
||||
|
||||
if (route.query.copy && flowStore.flow) {
|
||||
flowYaml = flowStore.flow.source;
|
||||
|
||||
@@ -10,15 +10,15 @@
|
||||
</el-alert>
|
||||
</div>
|
||||
<el-form labelPosition="top" :model="inputs" ref="form" @submit.prevent="false">
|
||||
<InputsForm
|
||||
:initialInputs="flow.inputs"
|
||||
:selectedTrigger="selectedTrigger"
|
||||
:flow="flow"
|
||||
<InputsForm
|
||||
:initialInputs="flow.inputs"
|
||||
:selectedTrigger="selectedTrigger"
|
||||
:flow="flow"
|
||||
v-model="inputs"
|
||||
:executeClicked="executeClicked"
|
||||
@confirm="onSubmit($refs.form)"
|
||||
@update:model-value-no-default="values => inputsNoDefaults=values"
|
||||
@update:checks="values => checks=values"
|
||||
@update:model-value-no-default="values => inputsNoDefaults=values"
|
||||
@update:checks="values => checks=values"
|
||||
/>
|
||||
|
||||
<el-collapse v-model="collapseName">
|
||||
@@ -192,6 +192,7 @@
|
||||
if (formRef && this.flowCanBeExecuted) {
|
||||
this.checks = [];
|
||||
this.executeClicked = false;
|
||||
this.coreStore.message = null;
|
||||
formRef.validate((valid) => {
|
||||
if (!valid) {
|
||||
return false;
|
||||
@@ -207,7 +208,7 @@
|
||||
this.executionLabels
|
||||
.filter(label => label.key && label.value)
|
||||
.map(label => `${label.key}:${label.value}`)
|
||||
)],
|
||||
), "system.from:ui"],
|
||||
scheduleDate: this.scheduleDate
|
||||
});
|
||||
} else {
|
||||
@@ -220,7 +221,7 @@
|
||||
this.executionLabels
|
||||
.filter(label => label.key && label.value)
|
||||
.map(label => `${label.key}:${label.value}`)
|
||||
)],
|
||||
), "system.from:ui"],
|
||||
scheduleDate: this.$moment(this.scheduleDate).tz(localStorage.getItem(storageKeys.TIMEZONE_STORAGE_KEY) ?? moment.tz.guess()).toISOString(true),
|
||||
nextStep: true,
|
||||
});
|
||||
|
||||
@@ -176,13 +176,20 @@
|
||||
:label="t('last execution date')"
|
||||
>
|
||||
<template #default="scope">
|
||||
<DateAgo
|
||||
v-if="lastExecutionByFlowReady"
|
||||
:inverted="true"
|
||||
:date="getLastExecution(scope.row)
|
||||
?.startDate
|
||||
"
|
||||
/>
|
||||
<router-link
|
||||
v-if="lastExecutionByFlowReady && getLastExecution(scope.row)"
|
||||
:to="{
|
||||
name: 'executions/update',
|
||||
params: {
|
||||
namespace: scope.row.namespace,
|
||||
flowId: scope.row.id,
|
||||
id: getLastExecution(scope.row).id
|
||||
}
|
||||
}"
|
||||
class="table-link"
|
||||
>
|
||||
<DateAgo :date="getLastExecution(scope.row)?.startDate" inverted />
|
||||
</router-link>
|
||||
</template>
|
||||
</el-table-column>
|
||||
|
||||
@@ -193,10 +200,22 @@
|
||||
>
|
||||
<template #default="scope">
|
||||
<div
|
||||
v-if="lastExecutionByFlowReady && getLastExecution(scope.row)?.status"
|
||||
v-if="lastExecutionByFlowReady && getLastExecution(scope.row)"
|
||||
class="d-flex justify-content-between align-items-center"
|
||||
>
|
||||
<Status :status="getLastExecution(scope.row)?.status" size="small" />
|
||||
<router-link
|
||||
:to="{
|
||||
name: 'executions/update',
|
||||
params: {
|
||||
namespace: scope.row.namespace,
|
||||
flowId: scope.row.id,
|
||||
id: getLastExecution(scope.row).id
|
||||
}
|
||||
}"
|
||||
class="table-link"
|
||||
>
|
||||
<Status :status="getLastExecution(scope.row).status" size="small" />
|
||||
</router-link>
|
||||
</div>
|
||||
</template>
|
||||
</el-table-column>
|
||||
@@ -691,4 +710,16 @@
|
||||
align-items: flex-end;
|
||||
}
|
||||
}
|
||||
|
||||
.table-link {
|
||||
cursor: pointer;
|
||||
|
||||
& :deep(button) {
|
||||
cursor: pointer !important;
|
||||
}
|
||||
|
||||
&:hover {
|
||||
text-decoration: none;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
@@ -13,9 +13,7 @@
|
||||
@show="handlePopoverShow"
|
||||
>
|
||||
<template #reference>
|
||||
<el-button class="trigger-icon" @click="copyLink(trigger)" size="small">
|
||||
<TaskIcon :onlyIcon="true" :cls="trigger?.type" :icons="pluginsStore.icons" />
|
||||
</el-button>
|
||||
<TaskIcon :onlyIcon="true" :cls="trigger?.type" :icons="pluginsStore.icons" />
|
||||
</template>
|
||||
<template #default>
|
||||
<TriggerVars :data="trigger" :execution="execution" @on-copy="copyLink(trigger)" />
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
<template>
|
||||
<span ref="rootContainer">
|
||||
<!-- Valid -->
|
||||
<el-button v-if="!errors && !warnings &&!infos" v-bind="$attrs" :link="link" :size="size" type="default" class="success square" disabled>
|
||||
<el-button v-if="!errors && !warnings && !infos" v-bind="$attrs" :link="link" :size="size" type="default" class="success square" disabled>
|
||||
<CheckBoldIcon class="text-success" />
|
||||
</el-button>
|
||||
|
||||
@@ -157,6 +157,7 @@
|
||||
}
|
||||
|
||||
&.success {
|
||||
cursor: default;
|
||||
border-color: var(--ks-border-success);
|
||||
}
|
||||
|
||||
|
||||
@@ -5,19 +5,19 @@
|
||||
<ValidationError
|
||||
class="validation"
|
||||
tooltipPlacement="bottom-start"
|
||||
:errors="flowErrors"
|
||||
:errors="flowStore.flowErrors"
|
||||
:warnings="flowWarnings"
|
||||
:infos="flowInfos"
|
||||
:infos="flowStore.flowInfos"
|
||||
/>
|
||||
|
||||
<EditorButtons
|
||||
:isCreating="flowStore.isCreating"
|
||||
:isReadOnly="isReadOnly"
|
||||
:isReadOnly="flowStore.isReadOnly"
|
||||
:canDelete="true"
|
||||
:isAllowedEdit="isAllowedEdit"
|
||||
:isAllowedEdit="flowStore.isAllowedEdit"
|
||||
:haveChange="haveChange"
|
||||
:flowHaveTasks="Boolean(flowHaveTasks)"
|
||||
:errors="flowErrors"
|
||||
:flowHaveTasks="Boolean(flowStore.flowHaveTasks)"
|
||||
:errors="flowStore.flowErrors"
|
||||
:warnings="flowWarnings"
|
||||
@save="save"
|
||||
@copy="
|
||||
@@ -49,7 +49,6 @@
|
||||
import ValidationError from "../flows/ValidationError.vue";
|
||||
|
||||
import localUtils from "../../utils/utils";
|
||||
import {useFlowOutdatedErrors} from "./flowOutdatedErrors";
|
||||
import {useFlowStore} from "../../stores/flow";
|
||||
import {useToast} from "../../utils/toast";
|
||||
|
||||
@@ -73,22 +72,14 @@
|
||||
const route = useRoute()
|
||||
const routeParams = computed(() => route.params)
|
||||
|
||||
const {translateError, translateErrorWithKey} = useFlowOutdatedErrors();
|
||||
|
||||
// If playground is not defined, enable it by default
|
||||
const isSettingsPlaygroundEnabled = computed(() => localStorage.getItem("editorPlayground") === "false" ? false : true);
|
||||
|
||||
const isReadOnly = computed(() => flowStore.isReadOnly)
|
||||
const isAllowedEdit = computed(() => flowStore.isAllowedEdit)
|
||||
const flowHaveTasks = computed(() => flowStore.flowHaveTasks)
|
||||
const flowErrors = computed(() => flowStore.flowErrors?.map(translateError));
|
||||
const flowInfos = computed(() => flowStore.flowInfos)
|
||||
const toast = useToast();
|
||||
const flowWarnings = computed(() => {
|
||||
|
||||
const outdatedWarning =
|
||||
flowStore.flowValidation?.outdated && !flowStore.isCreating
|
||||
? [translateErrorWithKey(flowStore.flowValidation?.constraints ?? "")]
|
||||
? flowStore.flowValidation?.constraints?.split(", ") ?? []
|
||||
: [];
|
||||
|
||||
const deprecationWarnings =
|
||||
|
||||
@@ -86,6 +86,7 @@
|
||||
import {computed, onActivated, onMounted, ref, provide, onBeforeUnmount, watch, InjectionKey, inject} from "vue";
|
||||
import {useRoute, useRouter} from "vue-router";
|
||||
import {apiUrl} from "override/utils/route";
|
||||
import type * as monaco from "monaco-editor/esm/vs/editor/editor.api";
|
||||
|
||||
import {EDITOR_CURSOR_INJECTION_KEY, EDITOR_WRAPPER_INJECTION_KEY} from "../no-code/injectionKeys";
|
||||
import {usePluginsStore} from "../../stores/plugins";
|
||||
@@ -165,7 +166,6 @@
|
||||
pluginsStore.lazyLoadSchemaType({type: "flow"});
|
||||
}
|
||||
loadFile();
|
||||
loadPluginsHash();
|
||||
window.addEventListener("keydown", handleGlobalSave);
|
||||
window.addEventListener("keydown", toggleAiShortcut);
|
||||
if(route.query.ai === "open") {
|
||||
@@ -215,15 +215,15 @@
|
||||
const isCreating = computed(() => flowStore.isCreating);
|
||||
|
||||
const timeout = ref<any>(null);
|
||||
const hash = ref<any>(null);
|
||||
|
||||
|
||||
const editorContent = computed(() => {
|
||||
return draftSource.value ?? source.value;
|
||||
});
|
||||
|
||||
|
||||
const pluginsStore = usePluginsStore();
|
||||
const namespacesStore = useNamespacesStore();
|
||||
const miscStore = useMiscStore();
|
||||
const hash = computed<number>(() => miscStore.configs?.pluginsHash ?? 0);
|
||||
|
||||
const editorScrollKey = computed(() => {
|
||||
if (props.flow) {
|
||||
@@ -238,11 +238,6 @@
|
||||
return undefined;
|
||||
});
|
||||
|
||||
function loadPluginsHash() {
|
||||
miscStore.loadConfigs().then(config => {
|
||||
hash.value = config.pluginsHash;
|
||||
});
|
||||
}
|
||||
|
||||
const updateContent = inject(FILES_UPDATE_CONTENT_INJECTION_KEY);
|
||||
|
||||
@@ -279,34 +274,10 @@
|
||||
clearTimeout(timeout.value);
|
||||
});
|
||||
|
||||
|
||||
function updatePluginDocumentation(event: any) {
|
||||
const source = event.model.getValue();
|
||||
const cursorOffset = event.model.getOffsetAt(event.position);
|
||||
|
||||
const isPlugin = (type: string) => pluginsStore.allTypes.includes(type);
|
||||
const isInRange = (range: [number, number, number]) =>
|
||||
cursorOffset >= range[0] && cursorOffset <= range[2];
|
||||
const getRangeSize = (range: [number, number, number]) => range[2] - range[0];
|
||||
|
||||
const getElementFromRange = (typeElement: any) => {
|
||||
const wrapper = YAML_UTILS.localizeElementAtIndex(source, typeElement.range[0]);
|
||||
return wrapper?.value?.type && isPlugin(wrapper.value.type)
|
||||
? wrapper.value
|
||||
: {type: typeElement.type};
|
||||
};
|
||||
|
||||
const selectedElement = YAML_UTILS.extractFieldFromMaps(source, "type", () => true, isPlugin)
|
||||
.filter(el => el.range && isInRange(el.range))
|
||||
.reduce((closest, current) =>
|
||||
!closest || getRangeSize(current.range) < getRangeSize(closest.range)
|
||||
? current
|
||||
: closest
|
||||
, null as any);
|
||||
|
||||
let result = selectedElement ? getElementFromRange(selectedElement) : undefined;
|
||||
result = {...result, hash: hash.value, forceRefresh: true};
|
||||
pluginsStore.updateDocumentation(result as Parameters<typeof pluginsStore.updateDocumentation>[0]);
|
||||
function updatePluginDocumentation(event: {position: monaco.Position, model: monaco.editor.ITextModel}) {
|
||||
const cls = YAML_UTILS.getTypeAtPosition(source.value, event.position, pluginsStore.allTypes);
|
||||
const version = YAML_UTILS.getVersionAtPosition(source.value, event.position);
|
||||
pluginsStore.updateDocumentation({cls, version, hash: hash.value});
|
||||
};
|
||||
|
||||
const saveFlowYaml = async () => {
|
||||
|
||||
@@ -86,8 +86,8 @@
|
||||
flowStore.flowYaml = source
|
||||
const result = await flowStore.onEdit({
|
||||
source,
|
||||
currentIsFlow,
|
||||
editorViewType: "YAML",
|
||||
topologyVisible: true,
|
||||
})
|
||||
|
||||
if (currentIsFlow && source) {
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
import {useI18n} from "vue-i18n";
|
||||
|
||||
export function useFlowOutdatedErrors(){
|
||||
const {t} = useI18n();
|
||||
function translateError(error: string): string {
|
||||
if(error.startsWith(">>>>")){
|
||||
const key = error.substring(4).trim();
|
||||
return translateErrorWithKey(key);
|
||||
} else {
|
||||
return error;
|
||||
}
|
||||
}
|
||||
|
||||
function translateErrorWithKey(key: string): string {
|
||||
return `${t(key + ".description")} ${t(key + ".details")}`
|
||||
}
|
||||
|
||||
return {
|
||||
translateError,
|
||||
translateErrorWithKey
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,7 @@
|
||||
overflow-y: auto;
|
||||
|
||||
@media screen and (max-width: 992px) {
|
||||
align-items: flex-start;
|
||||
align-items: stretch;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
@@ -4,8 +4,8 @@
|
||||
<div class="d-flex flex-column align-items-center gap-2 px-2">
|
||||
<img :src>
|
||||
|
||||
<h2>{{ t(`empty.${props.type}.title`) }}</h2>
|
||||
<p v-html="t(`empty.${props.type}.content`)" />
|
||||
<h2>{{ $t(`empty.${props.type}.title`) }}</h2>
|
||||
<p v-html="$t(`empty.${props.type}.content`)" />
|
||||
|
||||
<slot name="button" />
|
||||
</div>
|
||||
@@ -16,13 +16,11 @@
|
||||
|
||||
<script setup lang="ts">
|
||||
import {computed} from "vue";
|
||||
|
||||
import {images} from "./images";
|
||||
|
||||
const props = defineProps({type: {type: String, required: true}});
|
||||
|
||||
import {useI18n} from "vue-i18n";
|
||||
const {t} = useI18n({useScope: "global"});
|
||||
|
||||
import {images} from "./images";
|
||||
const src = computed((): string => images[props.type]);
|
||||
</script>
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
filter=""
|
||||
:excludeMetas="isFlowEdit ? ['namespace', 'flowId'] : []"
|
||||
:log="log"
|
||||
:class="{'log-0': i === 0}"
|
||||
/>
|
||||
</div>
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
remote
|
||||
remoteShowSuffix
|
||||
:remoteMethod="onSearch"
|
||||
:placeholder="t('namespaces')"
|
||||
:placeholder="$t('namespaces')"
|
||||
:suffixIcon="readOnly ? Lock : undefined"
|
||||
>
|
||||
<template #tag>
|
||||
@@ -37,14 +37,11 @@
|
||||
|
||||
<script setup lang="ts">
|
||||
import {computed, onMounted} from "vue"
|
||||
import {useI18n} from "vue-i18n"
|
||||
import {useNamespacesStore} from "override/stores/namespaces"
|
||||
import DotsSquare from "vue-material-design-icons/DotsSquare.vue"
|
||||
import Lock from "vue-material-design-icons/Lock.vue";
|
||||
import {defaultNamespace} from "../../../composables/useNamespaces";
|
||||
|
||||
const {t} = useI18n();
|
||||
|
||||
withDefaults(defineProps<{
|
||||
multiple?: boolean,
|
||||
readOnly?: boolean,
|
||||
|
||||
@@ -2,16 +2,13 @@
|
||||
<button @click="emit('add', what)" class="py-2 adding" type="button">
|
||||
{{
|
||||
what
|
||||
? t("no_code.adding", {what})
|
||||
: t("no_code.adding_default")
|
||||
? $t("no_code.adding", {what})
|
||||
: $t("no_code.adding_default")
|
||||
}}
|
||||
</button>
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {useI18n} from "vue-i18n";
|
||||
const {t} = useI18n({useScope: "global"});
|
||||
|
||||
const emit = defineEmits<{
|
||||
(e: "add", what: string | undefined): void;
|
||||
}>();
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
</el-form>
|
||||
<div @click="() => onTaskEditorClick(taskModel)">
|
||||
<TaskObject
|
||||
v-loading="isLoading"
|
||||
v-loading="isLoading || isPluginSchemaLoading"
|
||||
v-if="(selectedTaskType || !isTaskDefinitionBasedOnType) && schema"
|
||||
name="root"
|
||||
:modelValue="taskModel"
|
||||
@@ -51,6 +51,7 @@
|
||||
import {getValueAtJsonPath, resolve$ref} from "../../../utils/utils";
|
||||
import PlaygroundRunTaskButton from "../../inputs/PlaygroundRunTaskButton.vue";
|
||||
import isEqual from "lodash/isEqual";
|
||||
import {useMiscStore} from "../../../override/stores/misc";
|
||||
|
||||
const {t} = useI18n();
|
||||
|
||||
@@ -164,7 +165,7 @@
|
||||
// when tab is opened, load the documentation
|
||||
onActivated(() => {
|
||||
if(selectedTaskType.value && parentPath !== "inputs"){
|
||||
pluginsStore.updateDocumentation(taskModel.value as Parameters<typeof pluginsStore.updateDocumentation>[0]);
|
||||
pluginsStore.updateDocumentation({type: selectedTaskType.value, ...taskModel.value});
|
||||
}
|
||||
});
|
||||
|
||||
@@ -216,6 +217,24 @@
|
||||
return typeMap.value[selectedTaskType.value ?? ""] || [];
|
||||
});
|
||||
|
||||
const versionedSchema = ref<Schemas|undefined>()
|
||||
const isPluginSchemaLoading = ref(false)
|
||||
|
||||
watch([selectedTaskType, resolvedTypes], async ([val, types]) => {
|
||||
if(types.length > 1 && val){
|
||||
isPluginSchemaLoading.value = true;
|
||||
try{
|
||||
const {schema} = await pluginsStore.load({
|
||||
cls: val,
|
||||
version: taskModel.value?.version,
|
||||
})
|
||||
versionedSchema.value = schema?.properties
|
||||
} finally {
|
||||
isPluginSchemaLoading.value = false;
|
||||
}
|
||||
}
|
||||
}, {immediate: true});
|
||||
|
||||
const resolvedType = computed<string>(() => {
|
||||
if(resolvedTypes.value.length > 1 && selectedTaskType.value){
|
||||
// find the resolvedType that match the current dataType
|
||||
@@ -261,9 +280,9 @@
|
||||
});
|
||||
|
||||
const resolvedLocalSchema = computed(() => {
|
||||
return isTaskDefinitionBasedOnType.value
|
||||
return versionedSchema.value ?? (isTaskDefinitionBasedOnType.value
|
||||
? definitions.value?.[resolvedType.value] ?? {}
|
||||
: schemaAtBlockPath.value
|
||||
: schemaAtBlockPath.value)
|
||||
});
|
||||
|
||||
const resolvedProperties = computed<Schemas["properties"] | undefined>(() => {
|
||||
@@ -370,10 +389,12 @@
|
||||
onTaskInput(value);
|
||||
}
|
||||
|
||||
const miscStore = useMiscStore();
|
||||
const hash = computed(() => miscStore.configs?.pluginsHash ?? 0);
|
||||
|
||||
const onTaskEditorClick = inject(ON_TASK_EDITOR_CLICK_INJECTION_KEY, (elt?: PartialNoCodeElement) => {
|
||||
const type = elt?.type;
|
||||
if(isPlugin.value && type){
|
||||
pluginsStore.updateDocumentation({type});
|
||||
if(isPlugin.value && elt?.type){
|
||||
pluginsStore.updateDocumentation({cls: elt.type, version: elt.version, hash: hash.value});
|
||||
}else{
|
||||
pluginsStore.updateDocumentation();
|
||||
}
|
||||
|
||||
@@ -4,14 +4,12 @@
|
||||
type="button"
|
||||
@click="emit('click')"
|
||||
>
|
||||
<CloseIcon class="clear-icon" />{{ t("no_code.clearSelection") }}
|
||||
<CloseIcon class="clear-icon" />{{ $t("no_code.clearSelection") }}
|
||||
</button>
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import CloseIcon from "vue-material-design-icons/Close.vue";
|
||||
import {useI18n} from "vue-i18n";
|
||||
const {t} = useI18n();
|
||||
const emit = defineEmits(["click"]);
|
||||
</script>
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user