mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
Compare commits
95 Commits
plugin/tem
...
refactor/v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27c35a9be9 | ||
|
|
4784e459d6 | ||
|
|
2abea0fcde | ||
|
|
5d5165b7b9 | ||
|
|
44d0c10713 | ||
|
|
167734e32a | ||
|
|
24e61c81c0 | ||
|
|
379764a033 | ||
|
|
d55dd275c3 | ||
|
|
f409657e8a | ||
|
|
22f0b3ffdf | ||
|
|
0d99dc6862 | ||
|
|
fd3adc48b8 | ||
|
|
1a8a47c8cd | ||
|
|
7ea95f393e | ||
|
|
6935900699 | ||
|
|
0bc8e8d74a | ||
|
|
7f77b24ae0 | ||
|
|
ec6820dc25 | ||
|
|
d94193c143 | ||
|
|
c9628047fa | ||
|
|
4cbc069af4 | ||
|
|
eabe573fe6 | ||
|
|
ecd64617c3 | ||
|
|
a5650bca0f | ||
|
|
ed59e262d4 | ||
|
|
a5f9d54f7d | ||
|
|
47f4f43198 | ||
|
|
5d31c97f7f | ||
|
|
f8107285c4 | ||
|
|
8dc8dc1796 | ||
|
|
834dfd2947 | ||
|
|
6edb88841f | ||
|
|
5653531628 | ||
|
|
ee61276106 | ||
|
|
abcf76f7b4 | ||
|
|
67ada7f61b | ||
|
|
0c13633f77 | ||
|
|
a6cf2015ff | ||
|
|
2f9216c70b | ||
|
|
1903e6fac5 | ||
|
|
2d2cb00cab | ||
|
|
01b5441d16 | ||
|
|
efc778e294 | ||
|
|
60235a4e73 | ||
|
|
b167c52e76 | ||
|
|
216b124294 | ||
|
|
b6e4df8de2 | ||
|
|
429e7c7945 | ||
|
|
e302b4be4a | ||
|
|
8e7ad9ae25 | ||
|
|
41a11abf16 | ||
|
|
1be16d5e9d | ||
|
|
e263224d7b | ||
|
|
12b89588a6 | ||
|
|
eae5eb80cb | ||
|
|
c0f6298484 | ||
|
|
ba1d6b2232 | ||
|
|
048dcb80cc | ||
|
|
a81de811d7 | ||
|
|
a960a9f982 | ||
|
|
c4d4fd935f | ||
|
|
f063a5a2d9 | ||
|
|
ac91d5605f | ||
|
|
e3d3c3651b | ||
|
|
5b6836237e | ||
|
|
2f8284b133 | ||
|
|
42992fd7c3 | ||
|
|
3a481f93d3 | ||
|
|
7e964ae563 | ||
|
|
25e54edbc9 | ||
|
|
e88dc7af76 | ||
|
|
b7a027f0dc | ||
|
|
98141d6010 | ||
|
|
bf119ab6df | ||
|
|
9bd6353b77 | ||
|
|
c0ab581cf1 | ||
|
|
0f38e19663 | ||
|
|
0c14ea621c | ||
|
|
fb14e57a7c | ||
|
|
09c707d865 | ||
|
|
86e08d71dd | ||
|
|
94c00cedeb | ||
|
|
eb12832b1e | ||
|
|
687cefdfb9 | ||
|
|
8eae8aba72 | ||
|
|
abdbb8d364 | ||
|
|
8a55ab3af6 | ||
|
|
b7cb933e1e | ||
|
|
3af003e5e4 | ||
|
|
c3861a5532 | ||
|
|
ae1f10f45a | ||
|
|
612dccfb8c | ||
|
|
2ae8df2f5f | ||
|
|
1abfa74a16 |
6
.github/dependabot.yml
vendored
6
.github/dependabot.yml
vendored
@@ -51,7 +51,7 @@ updates:
|
||||
|
||||
storybook:
|
||||
applies-to: version-updates
|
||||
patterns: ["storybook*", "@storybook/*"]
|
||||
patterns: ["storybook*", "@storybook/*", "eslint-plugin-storybook"]
|
||||
|
||||
vitest:
|
||||
applies-to: version-updates
|
||||
@@ -67,10 +67,10 @@ updates:
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of these packages from major updates
|
||||
"eslint-plugin-storybook",
|
||||
"eslint-plugin-vue",
|
||||
]
|
||||
|
||||
@@ -84,6 +84,7 @@ updates:
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of these packages from minor updates
|
||||
@@ -102,6 +103,7 @@ updates:
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
]
|
||||
|
||||
2
.github/workflows/vulnerabilities-check.yml
vendored
2
.github/workflows/vulnerabilities-check.yml
vendored
@@ -43,7 +43,7 @@ jobs:
|
||||
|
||||
# Upload dependency check report
|
||||
- name: Upload dependency check report
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: dependency-check-report
|
||||
|
||||
@@ -29,8 +29,8 @@ start_time2=$(date +%s)
|
||||
|
||||
echo "cd ./ui"
|
||||
cd ./ui
|
||||
echo "npm i"
|
||||
npm i
|
||||
echo "npm ci"
|
||||
npm ci
|
||||
|
||||
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
||||
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
|
||||
@@ -21,7 +21,7 @@ plugins {
|
||||
|
||||
// test
|
||||
id "com.adarshr.test-logger" version "4.0.0"
|
||||
id "org.sonarqube" version "7.2.0.6526"
|
||||
id "org.sonarqube" version "7.2.1.6560"
|
||||
id 'jacoco-report-aggregation'
|
||||
|
||||
// helper
|
||||
@@ -331,7 +331,7 @@ subprojects {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
agent "org.aspectj:aspectjweaver:1.9.25"
|
||||
agent "org.aspectj:aspectjweaver:1.9.25.1"
|
||||
}
|
||||
|
||||
test {
|
||||
|
||||
@@ -42,7 +42,7 @@ import picocli.CommandLine.Option;
|
||||
@Introspected
|
||||
public abstract class AbstractCommand implements Callable<Integer> {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
protected ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
private EndpointDefaultConfiguration endpointConfiguration;
|
||||
|
||||
@@ -18,7 +18,8 @@ import picocli.CommandLine;
|
||||
FlowDotCommand.class,
|
||||
FlowExportCommand.class,
|
||||
FlowUpdateCommand.class,
|
||||
FlowUpdatesCommand.class
|
||||
FlowUpdatesCommand.class,
|
||||
FlowsSyncFromSourceCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.List;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "syncFromSource",
|
||||
description = "Update a single flow",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
public class FlowsSyncFromSourceCommand extends AbstractApiCommand {
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
FlowRepositoryInterface repository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
String tenant = tenantService.getTenantId(tenantId);
|
||||
|
||||
List<FlowWithSource> persistedFlows = repository.findAllWithSource(tenant);
|
||||
|
||||
int count = 0;
|
||||
for (FlowWithSource persistedFlow : persistedFlows) {
|
||||
// Ensure exactly one trailing newline. We need this new line
|
||||
// because when we update a flow from its source,
|
||||
// we don't update it if no change is detected.
|
||||
// The goal here is to force an update from the source for every flows
|
||||
GenericFlow flow = GenericFlow.fromYaml(tenant,persistedFlow.getSource() + System.lineSeparator());
|
||||
repository.update(flow, persistedFlow);
|
||||
stdOut("- %s.%s".formatted(flow.getNamespace(), flow.getId()));
|
||||
count++;
|
||||
}
|
||||
stdOut("%s flow(s) successfully updated!".formatted(count));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected boolean loadExternalPlugins() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class FlowsSyncFromSourceCommandTest {
|
||||
@Test
|
||||
void updateAllFlowsFromSource() {
|
||||
URL directory = FlowUpdatesCommandTest.class.getClassLoader().getResource("flows");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"--delete",
|
||||
directory.getPath(),
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("successfully updated !");
|
||||
out.reset();
|
||||
|
||||
FlowRepositoryInterface repository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
List<Flow> flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(1);
|
||||
}
|
||||
|
||||
args = new String[]{
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word"
|
||||
|
||||
};
|
||||
PicocliRunner.call(FlowsSyncFromSourceCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("4 flow(s) successfully updated!");
|
||||
assertThat(out.toString()).contains("- io.kestra.outsider.quattro");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.second");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.third");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.first");
|
||||
|
||||
flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -82,8 +82,8 @@ dependencies {
|
||||
testImplementation "io.micronaut:micronaut-http-server-netty"
|
||||
testImplementation "io.micronaut:micronaut-management"
|
||||
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.3"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.4"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.4"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on"
|
||||
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.docs;
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@@ -117,10 +118,17 @@ public class Plugin {
|
||||
.filter(not(io.kestra.core.models.Plugin::isInternal))
|
||||
.filter(clazzFilter)
|
||||
.filter(c -> !c.getName().startsWith("org.kestra."))
|
||||
.map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null))
|
||||
.map(c -> {
|
||||
Schema schema = c.getAnnotation(Schema.class);
|
||||
|
||||
var title = Optional.ofNullable(schema).map(Schema::title).filter(t -> !t.isEmpty()).orElse(null);
|
||||
var description = Optional.ofNullable(schema).map(Schema::description).filter(d -> !d.isEmpty()).orElse(null);
|
||||
var deprecated = io.kestra.core.models.Plugin.isDeprecated(c) ? true : null;
|
||||
|
||||
return new PluginElementMetadata(c.getName(), deprecated, title, description);
|
||||
})
|
||||
.toList();
|
||||
}
|
||||
|
||||
public record PluginElementMetadata(String cls, Boolean deprecated) {
|
||||
}
|
||||
public record PluginElementMetadata(String cls, Boolean deprecated, String title, String description) {}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Exception that can be thrown when Inputs/Outputs have validation problems.
|
||||
*/
|
||||
public class InputOutputValidationException extends KestraRuntimeException {
|
||||
public InputOutputValidationException(String message) {
|
||||
super(message);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Input<?> input){
|
||||
String inputMessage = "Invalid value for input" + " `" + input.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(inputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Output output){
|
||||
String outputMessage = "Invalid value for output" + " `" + output.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(outputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of(String message){
|
||||
return new InputOutputValidationException(message);
|
||||
}
|
||||
|
||||
public static InputOutputValidationException merge(Set<InputOutputValidationException> exceptions){
|
||||
String combinedMessage = exceptions.stream()
|
||||
.map(InputOutputValidationException::getMessage)
|
||||
.collect(Collectors.joining(System.lineSeparator()));
|
||||
throw new InputOutputValidationException(combinedMessage);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
|
||||
|
||||
@@ -26,6 +26,7 @@ public record Label(
|
||||
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
|
||||
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
|
||||
public static final String TEST = SYSTEM_PREFIX + "test";
|
||||
public static final String FROM = SYSTEM_PREFIX + "from";
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a nested map.
|
||||
|
||||
@@ -16,6 +16,7 @@ import jakarta.validation.constraints.NotNull;
|
||||
public class Setting {
|
||||
public static final String INSTANCE_UUID = "instance.uuid";
|
||||
public static final String INSTANCE_VERSION = "instance.version";
|
||||
public static final String INSTANCE_EDITION = "instance.edition";
|
||||
|
||||
@NotNull
|
||||
private String key;
|
||||
|
||||
@@ -3,9 +3,7 @@ package io.kestra.core.models.executions;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
@@ -95,8 +93,16 @@ public class TaskRun implements TenantInterface {
|
||||
this.forceExecution
|
||||
);
|
||||
}
|
||||
public TaskRun withStateAndAttempt(State.Type state) {
|
||||
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
|
||||
|
||||
if (newAttempts.isEmpty()) {
|
||||
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
|
||||
} else {
|
||||
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
|
||||
newAttempts.set(newAttempts.size() - 1, updatedLast);
|
||||
}
|
||||
|
||||
public TaskRun replaceState(State newState) {
|
||||
return new TaskRun(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
@@ -106,9 +112,9 @@ public class TaskRun implements TenantInterface {
|
||||
this.taskId,
|
||||
this.parentTaskRunId,
|
||||
this.value,
|
||||
this.attempts,
|
||||
newAttempts,
|
||||
this.outputs,
|
||||
newState,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
/**
|
||||
* Interface for defining an identifiable and typed data.
|
||||
@@ -29,16 +27,4 @@ public interface Data {
|
||||
*/
|
||||
String getDisplayName();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
default ConstraintViolationException toConstraintViolationException(String message, Object value) {
|
||||
Class<Data> cls = (Class<Data>) this.getClass();
|
||||
|
||||
return ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid " + (this instanceof Output ? "output" : "input") + " for `" + getId() + "`, " + message + ", but received `" + value + "`",
|
||||
this,
|
||||
cls,
|
||||
this.getId(),
|
||||
value
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
@@ -130,7 +129,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
@Valid
|
||||
@PluginProperty
|
||||
List<SLA> sla;
|
||||
|
||||
|
||||
@Schema(
|
||||
title = "Conditions evaluated before the flow is executed.",
|
||||
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
|
||||
@@ -355,7 +354,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
* To be conservative a flow MUST not return any source.
|
||||
*/
|
||||
@Override
|
||||
@JsonIgnore
|
||||
@Schema(hidden = true)
|
||||
public String getSource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@@ -48,7 +46,7 @@ public class FlowWithSource extends Flow {
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore(value = false)
|
||||
@Schema(hidden = false)
|
||||
public String getSource() {
|
||||
return this.source;
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Represents an input along with its associated value and validation state.
|
||||
*
|
||||
@@ -12,15 +14,15 @@ import jakarta.validation.constraints.NotNull;
|
||||
* @param value The provided value for the input.
|
||||
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
|
||||
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
|
||||
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
|
||||
* @param exceptions The validation exceptions, if the input value is invalid; {@code null} otherwise.
|
||||
*/
|
||||
public record InputAndValue(
|
||||
Input<?> input,
|
||||
Object value,
|
||||
boolean enabled,
|
||||
boolean isDefault,
|
||||
ConstraintViolationException exception) {
|
||||
|
||||
Set<InputOutputValidationException> exceptions) {
|
||||
|
||||
/**
|
||||
* Creates a new {@link InputAndValue} instance.
|
||||
*
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.validations.Regex;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
@@ -14,10 +15,7 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
@SuperBuilder
|
||||
@@ -77,30 +75,35 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
@Override
|
||||
public void validate(List<String> inputs) throws ConstraintViolationException {
|
||||
Set<ConstraintViolation<?>> violations = new HashSet<>();
|
||||
|
||||
if (values != null && options != null) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
violations.add( ManualConstraintViolation.of(
|
||||
"you can't define both `values` and `options`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
""
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
if (!this.getAllowCustomValue()) {
|
||||
for (String input : inputs) {
|
||||
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
|
||||
if (!finalValues.contains(input)) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"it must match the values `" + finalValues + "`",
|
||||
violations.add(ManualConstraintViolation.of(
|
||||
"value `" + input + "` doesn't match the values `" + finalValues + "`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
input
|
||||
);
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!violations.isEmpty()) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(violations);
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@@ -145,7 +148,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -125,7 +125,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
SelectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextProperty;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -156,9 +157,9 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class)
|
||||
* @see RunContextProperty#as(Class)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return as(property, context, clazz, Map.of());
|
||||
@@ -167,25 +168,57 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
||||
* @see RunContextProperty#as(Class, Map)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.skipCache || property.value == null) {
|
||||
String rendered = context.render(property.expression, variables);
|
||||
property.value = MAPPER.convertValue(rendered, clazz);
|
||||
property.value = deserialize(rendered, clazz);
|
||||
}
|
||||
|
||||
return property.value;
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, clazz);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, clazz);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, JavaType type) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, type);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, type);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
|
||||
* @see RunContextProperty#asList(Class)
|
||||
*/
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
return asList(property, context, itemClazz, Map.of());
|
||||
@@ -194,37 +227,39 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
||||
* @see RunContextProperty#asList(Class, Map)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.skipCache || property.value == null) {
|
||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||
try {
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = MAPPER.readValue(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
if (item instanceof String str) {
|
||||
return MAPPER.convertValue(context.render(str, variables), itemClazz);
|
||||
} else if (item instanceof Map map) {
|
||||
return MAPPER.convertValue(context.render(map, variables), itemClazz);
|
||||
}
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = deserialize(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = deserialize(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
Object rendered = null;
|
||||
if (item instanceof String str) {
|
||||
rendered = context.render(str, variables);
|
||||
} else if (item instanceof Map map) {
|
||||
rendered = context.render(map, variables);
|
||||
}
|
||||
|
||||
if (rendered != null) {
|
||||
return deserialize(rendered, itemClazz);
|
||||
}
|
||||
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,9 +269,9 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property then convert it as a map of target types.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
|
||||
* @see RunContextProperty#asMap(Class, Class)
|
||||
*/
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
return asMap(property, runContext, keyClass, valueClass, Map.of());
|
||||
@@ -248,7 +283,7 @@ public class Property<T> {
|
||||
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
|
||||
* @see RunContextProperty#asMap(Class, Class, Map)
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
@@ -260,12 +295,12 @@ public class Property<T> {
|
||||
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a map.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(runContext.render(property.expression, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(property.expression, variables), targetMapType);
|
||||
}
|
||||
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
|
||||
else {
|
||||
Map asRawMap = MAPPER.readValue(property.expression, Map.class);
|
||||
property.value = MAPPER.convertValue(runContext.render(asRawMap, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(asRawMap, variables), targetMapType);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
|
||||
@@ -82,6 +82,12 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean failOnTriggerError = false;
|
||||
|
||||
@PluginProperty(group = PluginProperty.CORE_GROUP)
|
||||
@Schema(
|
||||
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
|
||||
)
|
||||
private boolean allowConcurrent = false;
|
||||
|
||||
/**
|
||||
* For backward compatibility: we rename minLogLevel to logLevel.
|
||||
* @deprecated use {@link #logLevel} instead
|
||||
|
||||
@@ -1,22 +1,37 @@
|
||||
package io.kestra.core.models.triggers;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
public interface Schedulable extends PollingTriggerInterface{
|
||||
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
Map<String, Object> getInputs();
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
/**
|
||||
* Compute the previous evaluation of a trigger.
|
||||
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
|
||||
*/
|
||||
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
|
||||
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
|
||||
/**
|
||||
* Load the default RecoverMissedSchedules from plugin property, or else ALL.
|
||||
*/
|
||||
|
||||
@@ -172,7 +172,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
|
||||
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
|
||||
try {
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.empty());
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, lastTrigger);
|
||||
} catch (InvalidTriggerConfigurationException e) {
|
||||
disabled = true;
|
||||
}
|
||||
|
||||
@@ -6,12 +6,9 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
|
||||
public abstract class TriggerService {
|
||||
@@ -51,58 +48,6 @@ public abstract class TriggerService {
|
||||
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
|
||||
}
|
||||
|
||||
public static Execution generateScheduledExecution(
|
||||
AbstractTrigger trigger,
|
||||
ConditionContext conditionContext,
|
||||
TriggerContext context,
|
||||
List<Label> labels,
|
||||
Map<String, Object> inputs,
|
||||
Map<String, Object> variables,
|
||||
Optional<ZonedDateTime> scheduleDate
|
||||
) {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
|
||||
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
||||
}
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(context.getTenantId())
|
||||
.namespace(context.getNamespace())
|
||||
.flowId(context.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.variables(conditionContext.getFlow().getVariables())
|
||||
.labels(executionLabels)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
|
||||
.build();
|
||||
|
||||
Map<String, Object> allInputs = new HashMap<>();
|
||||
// add flow inputs with default value
|
||||
var flow = conditionContext.getFlow();
|
||||
if (flow.getInputs() != null) {
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null)
|
||||
.forEach(input -> allInputs.put(input.getId(), input.getDefaults()));
|
||||
}
|
||||
|
||||
if (inputs != null) {
|
||||
allInputs.putAll(inputs);
|
||||
}
|
||||
|
||||
// add inputs and inject defaults
|
||||
if (!allInputs.isEmpty()) {
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
|
||||
}
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
private static Execution generateExecution(
|
||||
String id,
|
||||
AbstractTrigger trigger,
|
||||
@@ -111,6 +56,7 @@ public abstract class TriggerService {
|
||||
ConditionContext conditionContext
|
||||
) {
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
|
||||
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, id));
|
||||
|
||||
@@ -67,6 +67,11 @@ public class ManualConstraintViolation<T> implements ConstraintViolation<T> {
|
||||
invalidValue
|
||||
)));
|
||||
}
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(
|
||||
Set<? extends ConstraintViolation<?>> constraintViolations
|
||||
) {
|
||||
return new ConstraintViolationException(constraintViolations);
|
||||
}
|
||||
|
||||
public String getMessageTemplate() {
|
||||
return "{messageTemplate}";
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
public interface SettingRepositoryInterface {
|
||||
Optional<Setting> findByKey(String key);
|
||||
@@ -13,5 +13,7 @@ public interface SettingRepositoryInterface {
|
||||
|
||||
Setting save(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting internalSave(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting delete(Setting setting);
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@ import java.util.function.Function;
|
||||
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
|
||||
Optional<Trigger> findLast(TriggerContext trigger);
|
||||
|
||||
Optional<Trigger> findByExecution(Execution execution);
|
||||
|
||||
Optional<Trigger> findByUid(String uid);
|
||||
|
||||
List<Trigger> findAll(String tenantId);
|
||||
|
||||
List<Trigger> findAllForAllTenants();
|
||||
|
||||
@@ -6,10 +6,12 @@ import com.google.common.base.CaseFormat;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
import io.kestra.core.services.KVStoreService;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -235,6 +237,14 @@ public class DefaultRunContext extends RunContext {
|
||||
return runContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RunContext cloneForPlugin(Plugin plugin) {
|
||||
PluginConfigurations pluginConfigurations = applicationContext.getBean(PluginConfigurations.class);
|
||||
DefaultRunContext runContext = clone();
|
||||
runContext.pluginConfiguration = pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass());
|
||||
return runContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@@ -589,6 +599,11 @@ public class DefaultRunContext extends RunContext {
|
||||
return localPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputAndOutput inputAndOutput() {
|
||||
return new InputAndOutputImpl(this.applicationContext, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder class for constructing new {@link DefaultRunContext} objects.
|
||||
*/
|
||||
|
||||
@@ -189,12 +189,11 @@ public final class ExecutableUtils {
|
||||
variables.put("taskRunIteration", currentTaskRun.getIteration());
|
||||
}
|
||||
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
|
||||
Execution execution = Execution
|
||||
.newExecution(
|
||||
flow,
|
||||
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
|
||||
(f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs),
|
||||
newLabels,
|
||||
Optional.empty())
|
||||
.withTrigger(ExecutionTrigger.builder()
|
||||
|
||||
@@ -3,13 +3,13 @@ package io.kestra.core.runners;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.flows.RenderableInput;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
@@ -19,7 +19,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.property.URIFetcher;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -209,8 +208,8 @@ public class FlowInputOutput {
|
||||
.filter(InputAndValue::enabled)
|
||||
.map(it -> {
|
||||
//TODO check to return all exception at-once.
|
||||
if (it.exception() != null) {
|
||||
throw it.exception();
|
||||
if (it.exceptions() != null && !it.exceptions().isEmpty()) {
|
||||
throw InputOutputValidationException.merge(it.exceptions());
|
||||
}
|
||||
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
|
||||
})
|
||||
@@ -294,13 +293,9 @@ public class FlowInputOutput {
|
||||
try {
|
||||
isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get()));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
resolvable.resolveWithError(ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid condition: " + e.getMessage(),
|
||||
input,
|
||||
(Class<Input>)input.getClass(),
|
||||
input.getId(),
|
||||
this
|
||||
));
|
||||
resolvable.resolveWithError(
|
||||
InputOutputValidationException.of("Invalid condition: " + e.getMessage())
|
||||
);
|
||||
isInputEnabled = false;
|
||||
}
|
||||
}
|
||||
@@ -333,7 +328,7 @@ public class FlowInputOutput {
|
||||
// validate and parse input value
|
||||
if (value == null) {
|
||||
if (input.getRequired()) {
|
||||
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
|
||||
resolvable.resolveWithError(InputOutputValidationException.of("Missing required input:" + input.getId()));
|
||||
} else {
|
||||
resolvable.resolveWithValue(null);
|
||||
}
|
||||
@@ -343,17 +338,18 @@ public class FlowInputOutput {
|
||||
parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue()));
|
||||
parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue()));
|
||||
} catch (ConstraintViolationException e) {
|
||||
ConstraintViolationException exception = e.getConstraintViolations().size() == 1 ?
|
||||
input.toConstraintViolationException(List.copyOf(e.getConstraintViolations()).getFirst().getMessage(), value) :
|
||||
input.toConstraintViolationException(e.getMessage(), value);
|
||||
resolvable.resolveWithError(exception);
|
||||
Input<?> finalInput = input;
|
||||
Set<InputOutputValidationException> exceptions = e.getConstraintViolations().stream()
|
||||
.map(c-> InputOutputValidationException.of(c.getMessage(), finalInput))
|
||||
.collect(Collectors.toSet());
|
||||
resolvable.resolveWithError(exceptions);
|
||||
}
|
||||
}
|
||||
} catch (ConstraintViolationException e) {
|
||||
resolvable.resolveWithError(e);
|
||||
} catch (Exception e) {
|
||||
ConstraintViolationException exception = input.toConstraintViolationException(e instanceof IllegalArgumentException ? e.getMessage() : e.toString(), resolvable.get().value());
|
||||
resolvable.resolveWithError(exception);
|
||||
} catch (IllegalArgumentException e){
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage(), input));
|
||||
}
|
||||
catch (Exception e) {
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage()));
|
||||
}
|
||||
|
||||
return resolvable.get();
|
||||
@@ -441,8 +437,12 @@ public class FlowInputOutput {
|
||||
}
|
||||
return entry;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw output.toConstraintViolationException(e.getMessage(), current);
|
||||
}
|
||||
catch (IllegalArgumentException e){
|
||||
throw InputOutputValidationException.of(e.getMessage(), output);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw InputOutputValidationException.of(e.getMessage());
|
||||
}
|
||||
})
|
||||
.filter(Optional::isPresent)
|
||||
@@ -505,7 +505,7 @@ public class FlowInputOutput {
|
||||
if (matcher.matches()) {
|
||||
yield current.toString();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Expected `URI` but received `" + current + "`");
|
||||
throw new IllegalArgumentException("Invalid URI format.");
|
||||
}
|
||||
}
|
||||
case ARRAY, MULTISELECT -> {
|
||||
@@ -535,34 +535,10 @@ public class FlowInputOutput {
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw e;
|
||||
} catch (Throwable e) {
|
||||
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
|
||||
throw new Exception(" errors:\n```\n" + e.getMessage() + "\n```");
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutable wrapper to hold a flow's input, and it's resolved value.
|
||||
*/
|
||||
@@ -591,27 +567,30 @@ public class FlowInputOutput {
|
||||
}
|
||||
|
||||
public void isDefault(boolean isDefault) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exceptions());
|
||||
}
|
||||
|
||||
public void setInput(final Input<?> input) {
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
}
|
||||
|
||||
public void resolveWithEnabled(boolean enabled) {
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithValue(@Nullable Object value) {
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithError(@Nullable ConstraintViolationException exception) {
|
||||
public void resolveWithError(@Nullable Set<InputOutputValidationException> exception) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
|
||||
markAsResolved();
|
||||
}
|
||||
private void resolveWithError(@Nullable InputOutputValidationException exception){
|
||||
resolveWithError(Collections.singleton(exception));
|
||||
}
|
||||
|
||||
private void markAsResolved() {
|
||||
this.isResolved = true;
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* InputAndOutput could be used to work with flow execution inputs and outputs.
|
||||
*/
|
||||
public interface InputAndOutput {
|
||||
/**
|
||||
* Reads the inputs of a flow execution.
|
||||
*/
|
||||
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
|
||||
|
||||
/**
|
||||
* Processes the outputs of a flow execution (parse them based on their types).
|
||||
*/
|
||||
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
|
||||
|
||||
/**
|
||||
* Render flow execution outputs.
|
||||
*/
|
||||
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
class InputAndOutputImpl implements InputAndOutput {
|
||||
private final FlowInputOutput flowInputOutput;
|
||||
private final RunContext runContext;
|
||||
|
||||
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
|
||||
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
|
||||
this.runContext = runContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
|
||||
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
|
||||
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
@@ -204,4 +205,15 @@ public abstract class RunContext implements PropertyContext {
|
||||
* when Namespace ACLs are used (EE).
|
||||
*/
|
||||
public abstract AclChecker acl();
|
||||
|
||||
/**
|
||||
* Clone this run context for a specific plugin.
|
||||
* @return a new run context with the plugin configuration of the given plugin.
|
||||
*/
|
||||
public abstract RunContext cloneForPlugin(Plugin plugin);
|
||||
|
||||
/**
|
||||
* @return an InputAndOutput that can be used to work with inputs and outputs.
|
||||
*/
|
||||
public abstract InputAndOutput inputAndOutput();
|
||||
}
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
@@ -53,20 +51,6 @@ public class RunContextInitializer {
|
||||
@Value("${kestra.encryption.secret-key}")
|
||||
protected Optional<String> secretKey;
|
||||
|
||||
/**
|
||||
* Initializes the given {@link RunContext} for the given {@link Plugin}.
|
||||
*
|
||||
* @param runContext The {@link RunContext} to initialize.
|
||||
* @param plugin The {@link TaskRunner} used for initialization.
|
||||
* @return The {@link RunContext} to initialize
|
||||
*/
|
||||
public DefaultRunContext forPlugin(final DefaultRunContext runContext,
|
||||
final Plugin plugin) {
|
||||
runContext.init(applicationContext);
|
||||
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass()));
|
||||
return runContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor.
|
||||
*
|
||||
|
||||
@@ -55,11 +55,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
|
||||
public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) {
|
||||
if (logEntry.getTaskId() != null) {
|
||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTaskId();
|
||||
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTaskId();
|
||||
} else if (logEntry.getTriggerId() != null) {
|
||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTriggerId();
|
||||
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTriggerId();
|
||||
} else {
|
||||
this.loggerName = "flow." + logEntry.getFlowId();
|
||||
this.loggerName = baseLoggerName(logEntry);
|
||||
}
|
||||
|
||||
this.logQueue = logQueue;
|
||||
@@ -68,6 +68,10 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
this.logToFile = logToFile;
|
||||
}
|
||||
|
||||
private String baseLoggerName(LogEntry logEntry) {
|
||||
return "flow." + logEntry.getTenantId() + "." + logEntry.getNamespace() + "." + logEntry.getFlowId();
|
||||
}
|
||||
|
||||
private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) {
|
||||
Iterable<String> split;
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.micronaut.core.annotation.Nullable;
|
||||
import io.pebbletemplates.pebble.PebbleEngine;
|
||||
import io.pebbletemplates.pebble.extension.Extension;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.lexer.Syntax;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -37,6 +38,13 @@ public class PebbleEngineFactory {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithCustomSyntax(Syntax syntax, Class<? extends Extension> extension) {
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder()
|
||||
.syntax(syntax);
|
||||
this.applicationContext.getBeansOfType(extension).forEach(builder::extension);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
|
||||
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
|
||||
@@ -35,6 +35,10 @@ public final class YamlParser {
|
||||
return read(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(String input, Class<T> cls, Boolean strict) {
|
||||
return strict ? read(input, cls, type(cls)) : readNonStrict(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(Map<String, Object> input, Class<T> cls, Boolean strict) {
|
||||
ObjectMapper currentMapper = strict ? STRICT_MAPPER : NON_STRICT_MAPPER;
|
||||
|
||||
@@ -81,7 +85,31 @@ public final class YamlParser {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T readNonStrict(String input, Class<T> objectClass, String resource) {
|
||||
try {
|
||||
return NON_STRICT_MAPPER.readValue(input, objectClass);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
private static String formatYamlErrorMessage(String originalMessage, JsonProcessingException e) {
|
||||
StringBuilder friendlyMessage = new StringBuilder();
|
||||
if (originalMessage.contains("Expected a field name")) {
|
||||
friendlyMessage.append("YAML syntax error: Invalid structure. Check indentation and ensure all fields are properly formatted.");
|
||||
} else if (originalMessage.contains("MappingStartEvent")) {
|
||||
friendlyMessage.append("YAML syntax error: Unexpected mapping start. Verify that scalar values are properly quoted if needed.");
|
||||
} else if (originalMessage.contains("Scalar value")) {
|
||||
friendlyMessage.append("YAML syntax error: Expected a simple value but found complex structure. Check for unquoted special characters.");
|
||||
} else {
|
||||
friendlyMessage.append("YAML parsing error: ").append(originalMessage.replaceAll("org\\.yaml\\.snakeyaml.*", "").trim());
|
||||
}
|
||||
if (e.getLocation() != null) {
|
||||
int line = e.getLocation().getLineNr();
|
||||
friendlyMessage.append(String.format(" (at line %d)", line));
|
||||
}
|
||||
// Return a generic but cleaner message for other YAML errors
|
||||
return friendlyMessage.toString();
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
|
||||
@@ -121,11 +149,12 @@ public final class YamlParser {
|
||||
)
|
||||
));
|
||||
} else {
|
||||
String userFriendlyMessage = formatYamlErrorMessage(e.getMessage(), e);
|
||||
return new ConstraintViolationException(
|
||||
"Illegal " + resource + " source: " + e.getMessage(),
|
||||
"Illegal " + resource + " source: " + userFriendlyMessage,
|
||||
Collections.singleton(
|
||||
ManualConstraintViolation.of(
|
||||
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(),
|
||||
userFriendlyMessage,
|
||||
target,
|
||||
(Class<T>) target.getClass(),
|
||||
"yaml",
|
||||
@@ -136,4 +165,3 @@ public final class YamlParser {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.cronutils.utils.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
@@ -65,16 +64,6 @@ public class ConditionService {
|
||||
return this.valid(flow, conditions, conditionContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
*/
|
||||
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
|
||||
return conditions
|
||||
.stream()
|
||||
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
|
||||
@@ -754,7 +754,7 @@ public class ExecutionService {
|
||||
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
|
||||
Execution newExecution = execution;
|
||||
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED));
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withStateAndAttempt(State.Type.KILLED));
|
||||
}
|
||||
if (parentTaskRun.getParentTaskRunId() != null) {
|
||||
return killParentTaskruns(parentTaskRun, newExecution);
|
||||
|
||||
@@ -92,7 +92,14 @@ public class FlowService {
|
||||
return flowRepository
|
||||
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
|
||||
}
|
||||
|
||||
private static String formatValidationError(String message) {
|
||||
if (message.startsWith("Illegal flow source:")) {
|
||||
// Already formatted by YamlParser, return as-is
|
||||
return message;
|
||||
}
|
||||
// For other validation errors, provide context
|
||||
return "Validation error: " + message;
|
||||
}
|
||||
/**
|
||||
* Evaluates all checks defined in the given flow using the provided inputs.
|
||||
* <p>
|
||||
@@ -174,10 +181,12 @@ public class FlowService {
|
||||
modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false));
|
||||
|
||||
} catch (ConstraintViolationException e) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
String friendlyMessage = formatValidationError(e.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} catch (FlowProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
if (e.getCause() instanceof ConstraintViolationException cve) {
|
||||
String friendlyMessage = formatValidationError(cve.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} else {
|
||||
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
||||
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
|
||||
@@ -579,4 +588,4 @@ public class FlowService {
|
||||
private IllegalStateException noRepositoryException() {
|
||||
return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
@@ -272,7 +271,13 @@ public class InternalStorage implements Storage {
|
||||
return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<StorageContext.Task> getTaskStorageContext() {
|
||||
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FileAttributes> list(URI uri) throws IOException {
|
||||
return this.storage.list(context.getTenantId(), context.getNamespace(), uri);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,4 +173,6 @@ public interface Storage {
|
||||
* @return the task storage context
|
||||
*/
|
||||
Optional<StorageContext.Task> getTaskStorageContext();
|
||||
|
||||
List<FileAttributes> list(URI uri) throws IOException;
|
||||
}
|
||||
|
||||
@@ -1,13 +1,39 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
public class EditionProvider {
|
||||
public Edition get() {
|
||||
return Edition.OSS;
|
||||
}
|
||||
|
||||
@Inject
|
||||
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
|
||||
|
||||
@PostConstruct
|
||||
void start() {
|
||||
// check the edition in the settings and update if needed, we didn't use it would allow us to detect incompatible update later if needed
|
||||
settingRepository.ifPresent(settingRepositoryInterface -> persistEdition(settingRepositoryInterface, get()));
|
||||
}
|
||||
|
||||
private void persistEdition(SettingRepositoryInterface settingRepositoryInterface, Edition edition) {
|
||||
Optional<Setting> versionSetting = settingRepositoryInterface.findByKey(Setting.INSTANCE_EDITION);
|
||||
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(edition)) {
|
||||
settingRepositoryInterface.save(Setting.builder()
|
||||
.key(Setting.INSTANCE_EDITION)
|
||||
.value(edition)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public enum Edition {
|
||||
OSS,
|
||||
EE
|
||||
|
||||
@@ -11,6 +11,11 @@ import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Utility class to create {@link java.util.concurrent.ExecutorService} with {@link java.util.concurrent.ExecutorService} instances.
|
||||
* WARNING: those instances will use the {@link ThreadUncaughtExceptionHandler} which terminates Kestra if an error occurs in any thread,
|
||||
* so it should not be used inside plugins.
|
||||
*/
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ExecutorsUtils {
|
||||
|
||||
@@ -70,4 +70,12 @@ public class ListUtils {
|
||||
.map(Object::toString)
|
||||
.toList();
|
||||
}
|
||||
|
||||
public static <T> List<List<T>> partition(List<T> list, int size) {
|
||||
List<List<T>> parts = new ArrayList<>();
|
||||
for (int i = 0; i < list.size(); i += size) {
|
||||
parts.add(list.subList(i, Math.min(i + size, list.size())));
|
||||
}
|
||||
return parts;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
/**
|
||||
* Utility class for logging
|
||||
* Utility class for server logging
|
||||
*/
|
||||
public final class Logs {
|
||||
|
||||
@@ -18,7 +18,7 @@ public final class Logs {
|
||||
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
|
||||
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
|
||||
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
|
||||
|
||||
|
||||
private Logs() {}
|
||||
|
||||
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
|
||||
@@ -29,7 +29,7 @@ public final class Logs {
|
||||
}
|
||||
|
||||
/**
|
||||
* Log an {@link Execution} via the execution logger named: 'execution.{flowId}'.
|
||||
* Log an {@link Execution} via the executor logger named: 'executor.{tenantId}.{namespace}.{flowId}'.
|
||||
*/
|
||||
public static void logExecution(Execution execution, Level level, String message, Object... args) {
|
||||
Logger logger = logger(execution);
|
||||
@@ -43,7 +43,7 @@ public final class Logs {
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a {@link TriggerContext} via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
|
||||
* Log a {@link TriggerContext} via the scheduler logger named: 'trigger.{tenantId}.{namespace}.{flowId}.{triggerId}'.
|
||||
*/
|
||||
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
|
||||
Logger logger = logger(triggerContext);
|
||||
@@ -57,7 +57,7 @@ public final class Logs {
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a {@link TaskRun} via the taskRun logger named: 'task.{flowId}.{taskId}'.
|
||||
* Log a {@link TaskRun} via the worker logger named: 'worker.{tenantId}.{namespace}.{flowId}.{taskId}'.
|
||||
*/
|
||||
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
|
||||
String prefix = TASKRUN_PREFIX_WITH_TENANT;
|
||||
@@ -73,19 +73,19 @@ public final class Logs {
|
||||
|
||||
private static Logger logger(TaskRun taskRun) {
|
||||
return LoggerFactory.getLogger(
|
||||
"task." + taskRun.getFlowId() + "." + taskRun.getTaskId()
|
||||
"worker." + taskRun.getTenantId() + "." + taskRun.getNamespace() + "." + taskRun.getFlowId() + "." + taskRun.getTaskId()
|
||||
);
|
||||
}
|
||||
|
||||
private static Logger logger(TriggerContext triggerContext) {
|
||||
return LoggerFactory.getLogger(
|
||||
"trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
|
||||
"scheduler." + triggerContext.getTenantId() + "." + triggerContext.getNamespace() + "." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
|
||||
);
|
||||
}
|
||||
|
||||
private static Logger logger(Execution execution) {
|
||||
return LoggerFactory.getLogger(
|
||||
"execution." + execution.getFlowId()
|
||||
"executor." + execution.getTenantId() + "." + execution.getNamespace() + "." + execution.getFlowId()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +120,10 @@ public class MapUtils {
|
||||
private static Collection<?> mergeCollections(Collection<?> colA, Collection<?> colB) {
|
||||
List<Object> merged = new ArrayList<>(colA.size() + colB.size());
|
||||
merged.addAll(colA);
|
||||
merged.addAll(colB);
|
||||
if (!colB.isEmpty()) {
|
||||
List<?> filtered = colB.stream().filter(it -> !colA.contains(it)).toList();
|
||||
merged.addAll(filtered);
|
||||
}
|
||||
return merged;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import io.kestra.core.models.executions.metrics.Counter;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.kestra.core.models.tasks.FileExistComportment;
|
||||
import io.kestra.core.models.tasks.NamespaceFiles;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.time.DurationFormatUtils;
|
||||
import org.apache.commons.lang3.time.StopWatch;
|
||||
@@ -19,28 +17,27 @@ import java.io.InputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
@Singleton
|
||||
public class NamespaceFilesUtils {
|
||||
@Inject
|
||||
private ExecutorsUtils executorsUtils;
|
||||
public final class NamespaceFilesUtils {
|
||||
private static final int maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
|
||||
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
|
||||
0,
|
||||
maxThreads,
|
||||
60L,
|
||||
TimeUnit.SECONDS,
|
||||
new SynchronousQueue<>(),
|
||||
new ThreadFactoryBuilder().setNameFormat("namespace-files").build()
|
||||
);;
|
||||
|
||||
private ExecutorService executorService;
|
||||
private int maxThreads;
|
||||
|
||||
@PostConstruct
|
||||
public void postConstruct() {
|
||||
this.maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
|
||||
this.executorService = executorsUtils.maxCachedThreadPool(maxThreads, "namespace-file");
|
||||
private NamespaceFilesUtils() {
|
||||
// utility class pattern
|
||||
}
|
||||
|
||||
public void loadNamespaceFiles(
|
||||
public static void loadNamespaceFiles(
|
||||
RunContext runContext,
|
||||
NamespaceFiles namespaceFiles
|
||||
)
|
||||
@@ -69,7 +66,7 @@ public class NamespaceFilesUtils {
|
||||
int parallelism = maxThreads / 2;
|
||||
Flux.fromIterable(matchedNamespaceFiles)
|
||||
.parallel(parallelism)
|
||||
.runOn(Schedulers.fromExecutorService(executorService))
|
||||
.runOn(Schedulers.fromExecutorService(EXECUTOR_SERVICE))
|
||||
.doOnNext(throwConsumer(nsFile -> {
|
||||
InputStream content = runContext.storage().getFile(nsFile.uri());
|
||||
Path path = folderPerNamespace ?
|
||||
|
||||
@@ -23,7 +23,6 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
import io.kestra.core.services.StorageService;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.StorageSplitInterface;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.validations.NoSystemLabelValidation;
|
||||
@@ -540,7 +539,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
|
||||
|
||||
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
|
||||
FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext));
|
||||
FileSerde.write(bos, runContext.inputAndOutput().renderOutputs(flow.getOutputs()));
|
||||
URI uri = runContext.storage().putFile(
|
||||
new ByteArrayInputStream(bos.toByteArray()),
|
||||
URI.create((String) taskRun.getOutputs().get("uri"))
|
||||
@@ -602,9 +601,8 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
String subflowOutputsBase = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);
|
||||
URI subflowOutputsBaseUri = URI.create(StorageContext.KESTRA_PROTOCOL + subflowOutputsBase + "/");
|
||||
|
||||
StorageInterface storage = ((DefaultRunContext) runContext).getApplicationContext().getBean(StorageInterface.class);
|
||||
if (storage.exists(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri)) {
|
||||
List<FileAttributes> list = storage.list(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri);
|
||||
if (runContext.storage().isFileExist(subflowOutputsBaseUri)) {
|
||||
List<FileAttributes> list = runContext.storage().list(subflowOutputsBaseUri);;
|
||||
|
||||
if (!list.isEmpty()) {
|
||||
// Merge outputs from each sub-flow into a single stored in the internal storage.
|
||||
|
||||
@@ -63,7 +63,8 @@ import java.util.*;
|
||||
|
||||
- id: run_post_approval
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
runner: PROCESS
|
||||
taskRunner:
|
||||
type: io.kestra.plugin.core.runner.Process
|
||||
commands:
|
||||
- echo "Manual approval received! Continuing the execution..."
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.ExecutableUtils;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.SubflowExecution;
|
||||
@@ -38,7 +37,6 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Collections;
|
||||
@@ -246,11 +244,11 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
|
||||
if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
|
||||
try {
|
||||
Map<String, Object> rOutputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext);
|
||||
var inputAndOutput = runContext.inputAndOutput();
|
||||
Map<String, Object> rOutputs = inputAndOutput.renderOutputs(subflowOutputs);
|
||||
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
rOutputs = flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
||||
if (flow.getOutputs() != null) {
|
||||
rOutputs = inputAndOutput.typedOutputs(flow, execution, rOutputs);
|
||||
}
|
||||
builder.outputs(rOutputs);
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -260,8 +260,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
||||
}
|
||||
|
||||
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
|
||||
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class);
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
||||
}
|
||||
|
||||
if (this.inputFiles != null) {
|
||||
|
||||
@@ -2,10 +2,8 @@ package io.kestra.plugin.core.namespace;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.plugin.core.kv.Version;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@@ -32,17 +32,9 @@ import lombok.experimental.SuperBuilder;
|
||||
examples = {
|
||||
@Example(
|
||||
code = """
|
||||
id: templated_task
|
||||
namespace: company.team
|
||||
variables:
|
||||
property: uri
|
||||
value: https://kestra.io
|
||||
tasks:
|
||||
- id: templated_task
|
||||
type: io.kestra.plugin.core.templating.TemplatedTask
|
||||
spec: |
|
||||
type: io.kestra.plugin.core.http.Download
|
||||
{{ vars.property }}: {{ vars.value }}
|
||||
spec: |
|
||||
type: io.kestra.plugin.core.http.Download
|
||||
{{ task.property }}: {{ task.value }}
|
||||
"""
|
||||
)
|
||||
},
|
||||
|
||||
@@ -0,0 +1,107 @@
|
||||
package io.kestra.plugin.core.trigger;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.Backfill;
|
||||
import io.kestra.core.models.triggers.Schedulable;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.chrono.ChronoZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Factory class for constructing a new {@link Execution} from a {@link Schedulable} trigger.
|
||||
*
|
||||
* @see io.kestra.plugin.core.trigger.Schedule
|
||||
* @see io.kestra.plugin.core.trigger.ScheduleOnDates
|
||||
*/
|
||||
final class SchedulableExecutionFactory {
|
||||
|
||||
static Execution createFailedExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext) throws IllegalVariableEvaluationException {
|
||||
return Execution.builder()
|
||||
.id(conditionContext.getRunContext().getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.labels(SchedulableExecutionFactory.getLabels(trigger, conditionContext.getRunContext(), triggerContext.getBackfill(), conditionContext.getFlow()))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
}
|
||||
|
||||
static Execution createExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext, Map<String, Object> variables, ZonedDateTime scheduleDate) throws IllegalVariableEvaluationException {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of((AbstractTrigger) trigger, variables);
|
||||
|
||||
List<Label> labels = getLabels(trigger, runContext, triggerContext.getBackfill(), conditionContext.getFlow());
|
||||
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
||||
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
||||
}
|
||||
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.variables(conditionContext.getFlow().getVariables())
|
||||
.labels(executionLabels)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
.scheduleDate(Optional.ofNullable(scheduleDate).map(ChronoZonedDateTime::toInstant).orElse(null))
|
||||
.build();
|
||||
|
||||
Map<String, Object> allInputs = getInputs(trigger, runContext, triggerContext.getBackfill());
|
||||
|
||||
// add inputs and inject defaults (FlowInputOutput handles defaults internally)
|
||||
execution = execution.withInputs(runContext.inputAndOutput().readInputs(conditionContext.getFlow(), execution, allInputs));
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
private static Map<String, Object> getInputs(Schedulable trigger, RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
Map<String, Object> inputs = new HashMap<>();
|
||||
|
||||
if (trigger.getInputs() != null) {
|
||||
inputs.putAll(runContext.render(trigger.getInputs()));
|
||||
}
|
||||
|
||||
if (backfill != null && backfill.getInputs() != null) {
|
||||
inputs.putAll(runContext.render(backfill.getInputs()));
|
||||
}
|
||||
|
||||
return inputs;
|
||||
}
|
||||
|
||||
private static List<Label> getLabels(Schedulable trigger, RunContext runContext, Backfill backfill, FlowInterface flow) throws IllegalVariableEvaluationException {
|
||||
List<Label> labels = LabelService.fromTrigger(runContext, flow, (AbstractTrigger) trigger);
|
||||
|
||||
if (backfill != null && backfill.getLabels() != null) {
|
||||
for (Label label : backfill.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
return labels;
|
||||
}
|
||||
}
|
||||
@@ -6,9 +6,7 @@ import com.cronutils.model.time.ExecutionTime;
|
||||
import com.cronutils.parser.CronParser;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
@@ -16,12 +14,8 @@ import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.*;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.ConditionService;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.validations.ScheduleValidation;
|
||||
import io.kestra.core.validations.TimezoneId;
|
||||
@@ -29,6 +23,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Null;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -40,6 +35,8 @@ import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
|
||||
@Slf4j
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@@ -224,11 +221,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private List<ScheduleCondition> scheduleConditions;
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
|
||||
private Map<String, Object> inputs;
|
||||
|
||||
@Schema(
|
||||
@@ -248,13 +241,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private Map<String, Object> backfill;
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
|
||||
private RecoverMissedSchedules recoverMissedSchedules;
|
||||
|
||||
@Override
|
||||
@@ -403,20 +390,11 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
if (!conditionResults) {
|
||||
return Optional.empty();
|
||||
}
|
||||
} catch(InternalException ie) {
|
||||
} catch (InternalException ie) {
|
||||
// validate schedule condition can fail to render variables
|
||||
// in this case, we return a failed execution so the trigger is not evaluated each second
|
||||
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.labels(generateLabels(runContext, conditionContext, backfill))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
return Optional.of(execution);
|
||||
return Optional.of(SchedulableExecutionFactory.createFailedExecution(this, conditionContext, triggerContext));
|
||||
}
|
||||
|
||||
// recalculate true output for previous and next based on conditions
|
||||
@@ -430,14 +408,12 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
variables = scheduleDates.toMap();
|
||||
}
|
||||
|
||||
Execution execution = TriggerService.generateScheduledExecution(
|
||||
Execution execution = SchedulableExecutionFactory.createExecution(
|
||||
this,
|
||||
conditionContext,
|
||||
triggerContext,
|
||||
generateLabels(runContext, conditionContext, backfill),
|
||||
generateInputs(runContext, backfill),
|
||||
variables,
|
||||
Optional.empty()
|
||||
null
|
||||
);
|
||||
|
||||
return Optional.of(execution);
|
||||
@@ -448,34 +424,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
return parser.parse(this.cron);
|
||||
}
|
||||
|
||||
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
List<Label> labels = LabelService.fromTrigger(runContext, conditionContext.getFlow(), this);
|
||||
|
||||
if (backfill != null && backfill.getLabels() != null) {
|
||||
for (Label label : backfill.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return labels;
|
||||
}
|
||||
|
||||
private Map<String, Object> generateInputs(RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
Map<String, Object> inputs = new HashMap<>();
|
||||
|
||||
if (this.inputs != null) {
|
||||
inputs.putAll(runContext.render(this.inputs));
|
||||
}
|
||||
|
||||
if (backfill != null && backfill.getInputs() != null) {
|
||||
inputs.putAll(runContext.render(backfill.getInputs()));
|
||||
}
|
||||
|
||||
return inputs;
|
||||
}
|
||||
private Optional<Output> scheduleDates(ExecutionTime executionTime, ZonedDateTime date) {
|
||||
Optional<ZonedDateTime> next = executionTime.nextExecution(date.minus(Duration.ofSeconds(1)));
|
||||
|
||||
@@ -549,9 +497,9 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, ZonedDateTime toTestDate, boolean next) throws InternalException {
|
||||
int upperYearBound = ZonedDateTime.now().getYear() + 10;
|
||||
int lowerYearBound = ZonedDateTime.now().getYear() - 10;
|
||||
|
||||
|
||||
while ((next && toTestDate.getYear() < upperYearBound) || (!next && toTestDate.getYear() > lowerYearBound)) {
|
||||
|
||||
|
||||
Optional<ZonedDateTime> currentDate = next ?
|
||||
executionTime.nextExecution(toTestDate) :
|
||||
executionTime.lastExecution(toTestDate);
|
||||
@@ -607,11 +555,10 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
|
||||
private boolean validateScheduleCondition(ConditionContext conditionContext) throws InternalException {
|
||||
if (conditions != null) {
|
||||
ConditionService conditionService = ((DefaultRunContext)conditionContext.getRunContext()).getApplicationContext().getBean(ConditionService.class);
|
||||
return conditionService.isValid(
|
||||
conditions.stream().filter(c -> c instanceof ScheduleCondition).map(c -> (ScheduleCondition) c).toList(),
|
||||
conditionContext
|
||||
);
|
||||
return conditions.stream()
|
||||
.filter(c -> c instanceof ScheduleCondition)
|
||||
.map(c -> (ScheduleCondition) c)
|
||||
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
@@ -10,7 +10,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.VoidOutput;
|
||||
import io.kestra.core.models.triggers.*;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.validations.TimezoneId;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -23,7 +22,10 @@ import java.time.Duration;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@@ -45,11 +47,7 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
@Builder.Default
|
||||
@Null
|
||||
private final Duration interval = null;
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
|
||||
private Map<String, Object> inputs;
|
||||
|
||||
@TimezoneId
|
||||
@@ -63,31 +61,24 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
@NotNull
|
||||
private Property<List<ZonedDateTime>> dates;
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
private RecoverMissedSchedules recoverMissedSchedules;
|
||||
|
||||
@Override
|
||||
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext triggerContext) throws Exception {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
|
||||
ZonedDateTime lastEvaluation = triggerContext.getDate();
|
||||
Optional<ZonedDateTime> nextDate = nextDate(runContext, date -> date.isEqual(lastEvaluation) || date.isAfter(lastEvaluation));
|
||||
|
||||
if (nextDate.isPresent()) {
|
||||
log.info("Schedule execution on {}", nextDate.get());
|
||||
|
||||
Execution execution = TriggerService.generateScheduledExecution(
|
||||
Execution execution = SchedulableExecutionFactory.createExecution(
|
||||
this,
|
||||
conditionContext,
|
||||
triggerContext,
|
||||
LabelService.fromTrigger(runContext, conditionContext.getFlow(), this),
|
||||
this.inputs != null ? runContext.render(this.inputs) : Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
nextDate
|
||||
nextDate.orElse(null)
|
||||
);
|
||||
|
||||
return Optional.of(execution);
|
||||
@@ -97,29 +88,21 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
|
||||
try {
|
||||
return last
|
||||
.map(throwFunction(context ->
|
||||
nextDate(conditionContext.getRunContext(), date -> date.isAfter(context.getDate()))
|
||||
.orElse(ZonedDateTime.now().plusYears(1))
|
||||
))
|
||||
.orElse(conditionContext.getRunContext()
|
||||
.render(dates)
|
||||
.asList(ZonedDateTime.class)
|
||||
.stream()
|
||||
.sorted()
|
||||
.findFirst()
|
||||
.orElse(ZonedDateTime.now()))
|
||||
.truncatedTo(ChronoUnit.SECONDS);
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
|
||||
return ZonedDateTime.now().plusYears(1);
|
||||
}
|
||||
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> triggerContext) {
|
||||
return triggerContext
|
||||
.map(ctx -> ctx.getBackfill() != null ? ctx.getBackfill().getCurrentDate() : ctx.getDate())
|
||||
.map(this::withTimeZone)
|
||||
.or(() -> Optional.of(ZonedDateTime.now()))
|
||||
.flatMap(dt -> {
|
||||
try {
|
||||
return nextDate(conditionContext.getRunContext(), date -> date.isAfter(dt));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
|
||||
throw new InvalidTriggerConfigurationException("Failed to evaluate schedule 'dates'. Cause: " + e.getMessage());
|
||||
}
|
||||
}).orElseGet(() -> ZonedDateTime.now().plusYears(1));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public ZonedDateTime nextEvaluationDate() {
|
||||
// TODO this may be the next date from now?
|
||||
@@ -139,9 +122,17 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
return previousDates.isEmpty() ? ZonedDateTime.now() : previousDates.getFirst();
|
||||
}
|
||||
|
||||
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> filter) throws IllegalVariableEvaluationException {
|
||||
return runContext.render(dates).asList(ZonedDateTime.class).stream().sorted()
|
||||
.filter(date -> filter.test(date))
|
||||
private ZonedDateTime withTimeZone(ZonedDateTime date) {
|
||||
if (this.timezone == null) {
|
||||
return date;
|
||||
}
|
||||
return date.withZoneSameInstant(ZoneId.of(this.timezone));
|
||||
}
|
||||
|
||||
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> predicate) throws IllegalVariableEvaluationException {
|
||||
return runContext.render(dates)
|
||||
.asList(ZonedDateTime.class).stream().sorted()
|
||||
.filter(predicate)
|
||||
.map(throwFunction(date -> timezone == null ? date : date.withZoneSameInstant(ZoneId.of(runContext.render(timezone)))))
|
||||
.findFirst()
|
||||
.map(date -> date.truncatedTo(ChronoUnit.SECONDS));
|
||||
|
||||
@@ -9,10 +9,14 @@
|
||||
<property name="pattern" value="%date{HH:mm:ss}.%ms %highlight(%-5.5level) %magenta(%-12.36thread) %cyan(%-12.36logger{36}) %msg%n" />
|
||||
|
||||
<logger name="io.kestra" level="INFO" />
|
||||
<logger name="flow" level="INFO" />
|
||||
<logger name="task" level="INFO" />
|
||||
<logger name="execution" level="INFO" />
|
||||
<logger name="trigger" level="INFO" />
|
||||
|
||||
<!-- Flow execution logs - disabled by default -->
|
||||
<logger name="flow" level="OFF" />
|
||||
|
||||
<!-- Server loggers -->
|
||||
<logger name="worker" level="INFO" />
|
||||
<logger name="executor" level="INFO" />
|
||||
<logger name="scheduler" level="INFO" />
|
||||
|
||||
<logger name="io.kestra.ee.runner.kafka.services.KafkaConsumerService" level="WARN" />
|
||||
<logger name="io.kestra.ee.runner.kafka.services.KafkaProducerService" level="WARN" />
|
||||
|
||||
@@ -170,10 +170,11 @@ class JsonSchemaGeneratorTest {
|
||||
|
||||
Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class);
|
||||
assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf(
|
||||
Matchers.aMapWithSize(3),
|
||||
Matchers.aMapWithSize(4),
|
||||
hasKey("conditions"),
|
||||
hasKey("stopAfter"),
|
||||
hasKey("type")
|
||||
hasKey("type"),
|
||||
hasKey("allowConcurrent")
|
||||
));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,24 +1,36 @@
|
||||
package io.kestra.core.models.property;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.serializers.FileSerde;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.plugin.core.namespace.Version;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.event.Level;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static java.util.Map.entry;
|
||||
@@ -362,10 +374,43 @@ class PropertyTest {
|
||||
assertThat(output.getMessages().getFirst().getValue()).isEqualTo("value1");
|
||||
}
|
||||
|
||||
@Test
|
||||
void jsonSubtype() throws JsonProcessingException, IllegalVariableEvaluationException {
|
||||
Optional<WithSubtype> rendered = runContextFactory.of().render(
|
||||
Property.<WithSubtype>ofExpression(JacksonMapper.ofJson().writeValueAsString(new MySubtype()))
|
||||
).as(WithSubtype.class);
|
||||
|
||||
assertThat(rendered).isPresent();
|
||||
assertThat(rendered.get()).isInstanceOf(MySubtype.class);
|
||||
|
||||
List<WithSubtype> renderedList = runContextFactory.of().render(
|
||||
Property.<List<WithSubtype>>ofExpression(JacksonMapper.ofJson().writeValueAsString(List.of(new MySubtype())))
|
||||
).asList(WithSubtype.class);
|
||||
assertThat(renderedList).hasSize(1);
|
||||
assertThat(renderedList.get(0)).isInstanceOf(MySubtype.class);
|
||||
}
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(value = MySubtype.class, name = "mySubtype")
|
||||
})
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
public abstract static class WithSubtype {
|
||||
abstract public String getType();
|
||||
}
|
||||
|
||||
@Getter
|
||||
public static class MySubtype extends WithSubtype {
|
||||
private final String type = "mySubtype";
|
||||
}
|
||||
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
private static class TestObj {
|
||||
private String key;
|
||||
private String value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,15 @@ class SystemInformationReportTest {
|
||||
return setting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting internalSave(Setting setting) throws ConstraintViolationException {
|
||||
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
|
||||
UUID = setting.getValue();
|
||||
}
|
||||
|
||||
return setting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting delete(Setting setting) {
|
||||
return setting;
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import com.devskiller.friendly_id.FriendlyId;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
@@ -24,7 +24,6 @@ import io.kestra.core.models.flows.State.Type;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
@@ -42,10 +41,9 @@ import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.*;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
@@ -185,6 +183,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
@FlakyTest(description = "Filtering tests are sometimes returning 0")
|
||||
void should_find_all(QueryFilter filter, int expectedSize){
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant, "executionTriggerId");
|
||||
|
||||
@@ -10,82 +10,84 @@ import org.junit.jupiter.api.TestInstance;
|
||||
@KestraTest(startRunner = true)
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
public abstract class AbstractRunnerConcurrencyTest {
|
||||
public static final String TENANT_1 = "tenant1";
|
||||
|
||||
@Inject
|
||||
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel.yml"}, tenantId = "concurrency-cancel")
|
||||
void concurrencyCancel() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancel();
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancel("concurrency-cancel");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-fail.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-fail.yml"}, tenantId = "concurrency-fail")
|
||||
void concurrencyFail() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyFail();
|
||||
flowConcurrencyCaseTest.flowConcurrencyFail("concurrency-fail");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "concurrency-queue")
|
||||
void concurrencyQueue() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueue();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueue("concurrency-queue");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-pause.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-pause.yml"}, tenantId = "concurrency-queue-pause")
|
||||
protected void concurrencyQueuePause() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueuePause();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueuePause("concurrency-queue-pause");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-cancel-pause.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel-pause.yml"}, tenantId = "concurrency-cancel-pause")
|
||||
protected void concurrencyCancelPause() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancelPause("concurrency-cancel-pause");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-with-for-each-item")
|
||||
protected void flowConcurrencyWithForEachItem() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem("flow-concurrency-with-for-each-item");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-fail.yml"}, tenantId = "concurrency-queue-restarted")
|
||||
protected void concurrencyQueueRestarted() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted("concurrency-queue-restarted");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-after-execution.yml"}, tenantId = "concurrency-queue-after-execution")
|
||||
void concurrencyQueueAfterExecution() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution("concurrency-queue-after-execution");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = TENANT_1)
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = "flow-concurrency-subflow")
|
||||
void flowConcurrencySubflow() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencySubflow(TENANT_1);
|
||||
flowConcurrencyCaseTest.flowConcurrencySubflow("flow-concurrency-subflow");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FlakyTest(description = "Only flaky in CI")
|
||||
@LoadFlows({"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"})
|
||||
@LoadFlows(
|
||||
value = {"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"},
|
||||
tenantId = "flow-concurrency-parallel-subflow-kill"
|
||||
)
|
||||
protected void flowConcurrencyParallelSubflowKill() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
|
||||
void flowConcurrencyKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyKilled();
|
||||
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill("flow-concurrency-parallel-subflow-kill");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FlakyTest(description = "Only flaky in CI")
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-killed")
|
||||
void flowConcurrencyKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyKilled("flow-concurrency-killed");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FlakyTest(description = "Only flaky in CI")
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-queue-killed")
|
||||
void flowConcurrencyQueueKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueKilled();
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueKilled("flow-concurrency-queue-killed");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
@@ -466,4 +467,20 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.getTaskRunList()).hasSize(2);
|
||||
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/each-pause.yaml"})
|
||||
void killExecutionWithFlowableTask() throws Exception {
|
||||
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "each-pause");
|
||||
|
||||
TaskRun childTaskRun = execution.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("pause")).toList().getFirst();
|
||||
|
||||
Execution killed = executionService.killParentTaskruns(childTaskRun,execution);
|
||||
|
||||
TaskRun parentTaskRun = killed.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("each_task")).toList().getFirst();
|
||||
|
||||
assertThat(parentTaskRun.getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(parentTaskRun.getAttempts().getLast().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -31,7 +31,6 @@ import java.util.Optional;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Singleton
|
||||
@@ -57,12 +56,12 @@ public class FlowConcurrencyCaseTest {
|
||||
@Named(QueueFactoryInterface.KILL_NAMED)
|
||||
protected QueueInterface<ExecutionKilled> killQueue;
|
||||
|
||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
public void flowConcurrencyCancel(String tenantId) throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
try {
|
||||
List<Execution> shouldFailExecutions = List.of(
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel"),
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel")
|
||||
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel"),
|
||||
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel")
|
||||
);
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
|
||||
@@ -73,12 +72,12 @@ public class FlowConcurrencyCaseTest {
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyFail() throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
|
||||
public void flowConcurrencyFail(String tenantId) throws TimeoutException, QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
|
||||
try {
|
||||
List<Execution> shouldFailExecutions = List.of(
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail"),
|
||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail")
|
||||
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail"),
|
||||
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail")
|
||||
);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
@@ -89,10 +88,10 @@ public class FlowConcurrencyCaseTest {
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueue() throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
||||
public void flowConcurrencyQueue(String tenantId) throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
||||
@@ -108,10 +107,10 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueuePause() throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause");
|
||||
public void flowConcurrencyQueuePause(String tenantId) throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-queue-pause");
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
||||
@@ -127,10 +126,10 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(secondExecutionResult.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencyCancelPause() throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause");
|
||||
public void flowConcurrencyCancelPause(String tenantId) throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-cancel-pause");
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.CANCELLED), execution2);
|
||||
@@ -166,11 +165,11 @@ public class FlowConcurrencyCaseTest {
|
||||
.toList()).contains(Type.QUEUED);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueRestarted() throws Exception {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE,
|
||||
public void flowConcurrencyQueueRestarted(String tenantId) throws Exception {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE,
|
||||
"flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
|
||||
@@ -179,7 +178,10 @@ public class FlowConcurrencyCaseTest {
|
||||
// we restart the first one, it should be queued then fail again.
|
||||
Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1);
|
||||
Execution restarted = executionService.restart(failedExecution, null);
|
||||
Execution executionResult1 = runnerUtils.restartExecution(e -> e.getState().getCurrent().equals(Type.FAILED), restarted);
|
||||
Execution executionResult1 = runnerUtils.restartExecution(
|
||||
e -> e.getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED) && e.getState().getCurrent().equals(Type.FAILED),
|
||||
restarted
|
||||
);
|
||||
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2);
|
||||
|
||||
assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
@@ -193,10 +195,10 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueAfterExecution() throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
||||
public void flowConcurrencyQueueAfterExecution(String tenantId) throws QueueException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
||||
@@ -216,15 +218,15 @@ public class FlowConcurrencyCaseTest {
|
||||
List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel");
|
||||
assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED);
|
||||
|
||||
// run another execution to be sure that everything work (purge is correctly done)
|
||||
// run another execution to be sure that everything works (purge is correctly done)
|
||||
Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
|
||||
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");
|
||||
}
|
||||
|
||||
public void flowConcurrencyParallelSubflowKill() throws QueueException {
|
||||
Execution parent = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
|
||||
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
|
||||
public void flowConcurrencyParallelSubflowKill(String tenantId) throws QueueException {
|
||||
Execution parent = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
|
||||
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
|
||||
|
||||
// Kill the parent
|
||||
killQueue.emit(ExecutionKilledExecution
|
||||
@@ -232,7 +234,7 @@ public class FlowConcurrencyCaseTest {
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.executionId(parent.getId())
|
||||
.isOnKillCascade(true)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.build()
|
||||
);
|
||||
|
||||
@@ -242,11 +244,11 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(terminated.getTaskRunList()).isNull();
|
||||
}
|
||||
|
||||
public void flowConcurrencyKilled() throws QueueException, InterruptedException {
|
||||
public void flowConcurrencyKilled(String tenantId) throws QueueException, InterruptedException {
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
|
||||
@@ -261,7 +263,7 @@ public class FlowConcurrencyCaseTest {
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.executionId(execution1.getId())
|
||||
.isOnKillCascade(true)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.build()
|
||||
);
|
||||
|
||||
@@ -279,20 +281,19 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||
} finally {
|
||||
// kill everything to avoid dangling executions
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.killExecution(execution2);
|
||||
runnerUtils.killExecution(execution3);
|
||||
|
||||
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
||||
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueKilled() throws QueueException, InterruptedException {
|
||||
public void flowConcurrencyQueueKilled(String tenantId) throws QueueException, InterruptedException {
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
|
||||
@@ -307,7 +308,7 @@ public class FlowConcurrencyCaseTest {
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.executionId(execution2.getId())
|
||||
.isOnKillCascade(true)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tenantId(tenantId)
|
||||
.build()
|
||||
);
|
||||
|
||||
@@ -322,11 +323,10 @@ public class FlowConcurrencyCaseTest {
|
||||
} finally {
|
||||
// kill everything to avoid dangling executions
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.killExecution(execution2);
|
||||
runnerUtils.killExecution(execution3);
|
||||
|
||||
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
||||
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -239,7 +239,7 @@ class FlowInputOutputTest {
|
||||
// Then
|
||||
Assertions.assertEquals(2, values.size());
|
||||
Assertions.assertFalse(values.get(1).enabled());
|
||||
Assertions.assertNotNull(values.get(1).exception());
|
||||
Assertions.assertNotNull(values.get(1).exceptions());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -257,7 +257,7 @@ class FlowInputOutputTest {
|
||||
List<InputAndValue> values = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, data).block();
|
||||
|
||||
// Then
|
||||
Assertions.assertNull(values.getFirst().exception());
|
||||
Assertions.assertNull(values.getFirst().exceptions());
|
||||
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.CharStreams;
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -137,8 +138,8 @@ public class InputsTest {
|
||||
void missingRequired() {
|
||||
HashMap<String, Object> inputs = new HashMap<>(InputsTest.inputs);
|
||||
inputs.put("string", null);
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(inputs, MAIN_TENANT));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `string`, missing required input, but received `null`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(inputs, MAIN_TENANT));
|
||||
assertThat(e.getMessage()).contains("Missing required input:string");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -232,9 +233,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("validatedString", "foo");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant4"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant4"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedString`, it must match the pattern");
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `validatedString`. Cause: it must match the pattern");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -242,15 +243,15 @@ public class InputsTest {
|
||||
void inputValidatedIntegerBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedInt", "9");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant5"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be more than `10`, but received `9`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant5"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be more than `10`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedInt", "21");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant5"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant5"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be less than `20`, but received `21`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be less than `20`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -258,15 +259,15 @@ public class InputsTest {
|
||||
void inputValidatedDateBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedDate", "2022-01-01");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant6"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be after `2023-01-01`, but received `2022-01-01`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant6"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be after `2023-01-01`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedDate", "2024-01-01");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant6"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant6"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be before `2023-12-31`, but received `2024-01-01`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be before `2023-12-31`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -274,15 +275,15 @@ public class InputsTest {
|
||||
void inputValidatedDateTimeBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedDateTime", "2022-01-01T00:00:00Z");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant7"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be after `2023-01-01T00:00:00Z`, but received `2022-01-01T00:00:00Z`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant7"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be after `2023-01-01T00:00:00Z`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedDateTime", "2024-01-01T00:00:00Z");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant7"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant7"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be before `2023-12-31T23:59:59Z`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be before `2023-12-31T23:59:59Z`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -290,15 +291,15 @@ public class InputsTest {
|
||||
void inputValidatedDurationBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedDuration", "PT1S");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant8"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be more than `PT10S`, but received `PT1S`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant8"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be more than `PT10S`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedDuration", "PT30S");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant8"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant8"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be less than `PT20S`, but received `PT30S`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be less than `PT20S`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -306,15 +307,15 @@ public class InputsTest {
|
||||
void inputValidatedFloatBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedFloat", "0.01");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant9"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be more than `0.1`, but received `0.01`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant9"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be more than `0.1`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedFloat", "1.01");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant9"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant9"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be less than `0.5`, but received `1.01`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be less than `0.5`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -322,15 +323,15 @@ public class InputsTest {
|
||||
void inputValidatedTimeBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedTime", "00:00:01");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant10"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be after `01:00`, but received `00:00:01`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant10"));
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `validatedTime`. Cause: it must be after `01:00`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedTime", "14:00:00");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant10"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant10"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be before `11:59:59`, but received `14:00:00`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedTime`. Cause: it must be before `11:59:59`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -339,9 +340,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("uri", "http:/bla");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant11"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant11"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `uri`, Expected `URI` but received `http:/bla`, but received `http:/bla`");
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `uri`. Cause: Invalid URI format." );
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -350,9 +351,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("enum", "INVALID");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant12"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant12"));
|
||||
|
||||
assertThat(e.getMessage()).isEqualTo("enum: Invalid input for `enum`, it must match the values `[ENUM_VALUE, OTHER_ONE]`, but received `INVALID`");
|
||||
assertThat(e.getMessage()).isEqualTo("Invalid value for input `enum`. Cause: it must match the values `[ENUM_VALUE, OTHER_ONE]`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -361,9 +362,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("array", "[\"s1\", \"s2\"]");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant13"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant13"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `array`, Unable to parse array element as `INT` on `s1`, but received `[\"s1\", \"s2\"]`");
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `array`. Cause: Unable to parse array element as `INT` on `s1`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -467,7 +468,20 @@ public class InputsTest {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat((String) execution.findTaskRunsByTaskId("file").getFirst().getOutputs().get("value")).isEqualTo(file.toString());
|
||||
}
|
||||
@Test
|
||||
@LoadFlows(value = "flows/invalids/inputs-with-multiple-constraint-violations.yaml")
|
||||
void multipleConstraintViolations() {
|
||||
InputOutputValidationException ex = assertThrows(InputOutputValidationException.class, ()-> runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "inputs-with-multiple-constraint-violations", null,
|
||||
(f, e) ->flowIO.readExecutionInputs(f, e , Map.of("multi", List.of("F", "H")) )));
|
||||
|
||||
List<String> messages = Arrays.asList(ex.getMessage().split(System.lineSeparator()));
|
||||
|
||||
assertThat(messages).containsExactlyInAnyOrder(
|
||||
"Invalid value for input `multi`. Cause: you can't define both `values` and `options`",
|
||||
"Invalid value for input `multi`. Cause: value `F` doesn't match the values `[A, B, C]`",
|
||||
"Invalid value for input `multi`. Cause: value `H` doesn't match the values `[A, B, C]`"
|
||||
);
|
||||
}
|
||||
private URI createFile() throws IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
Files.write(tempFile.toPath(), "Hello World".getBytes());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -71,6 +72,6 @@ public class NoEncryptionConfiguredTest implements TestPropertyProvider {
|
||||
.flowId(flow.getId())
|
||||
.build();
|
||||
|
||||
assertThrows(ConstraintViolationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs));
|
||||
assertThrows(InputOutputValidationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,24 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@KestraTest
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@MicronautTest
|
||||
public class EditionProviderTest {
|
||||
@Inject
|
||||
private EditionProvider editionProvider;
|
||||
|
||||
@Inject
|
||||
private SettingRepositoryInterface settingRepository;
|
||||
|
||||
protected EditionProvider.Edition expectedEdition() {
|
||||
return EditionProvider.Edition.OSS;
|
||||
}
|
||||
@@ -17,5 +26,10 @@ public class EditionProviderTest {
|
||||
@Test
|
||||
void shouldReturnCurrentEdition() {
|
||||
Assertions.assertEquals(expectedEdition(), editionProvider.get());
|
||||
|
||||
// check that the edition is persisted in settings
|
||||
Optional<Setting> editionSettings = settingRepository.findByKey(Setting.INSTANCE_EDITION);
|
||||
assertThat(editionSettings).isPresent();
|
||||
assertThat(editionSettings.get().getValue()).isEqualTo(expectedEdition().name());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,48 +1,107 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import ch.qos.logback.classic.Logger;
|
||||
import ch.qos.logback.classic.LoggerContext;
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
import ch.qos.logback.core.AppenderBase;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Slf4j
|
||||
class LogsTest {
|
||||
|
||||
|
||||
private static final InMemoryAppender MEMORY_APPENDER = new InMemoryAppender();
|
||||
|
||||
@BeforeAll
|
||||
static void setupLogger() {
|
||||
Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
|
||||
MEMORY_APPENDER.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
|
||||
MEMORY_APPENDER.start();
|
||||
logger.addAppender(MEMORY_APPENDER);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void clearLogs() {
|
||||
MEMORY_APPENDER.clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
void logFlow() {
|
||||
var flow = Flow.builder().namespace("namespace").id("flow").build();
|
||||
var flow = Flow.builder().tenantId("tenant").namespace("namespace").id("flow").build();
|
||||
Logs.logExecution(flow, log, Level.INFO, "Some log");
|
||||
Logs.logExecution(flow, log, Level.INFO, "Some log with an {}", "attribute");
|
||||
Logs.logExecution(flow, log, Level.ERROR, "Some log with an {} and an error", "attribute", new RuntimeException("Test Exception"));
|
||||
|
||||
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||
assertThat(logs).hasSize(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
void logExecution() {
|
||||
var execution = Execution.builder().namespace("namespace").flowId("flow").id("execution").build();
|
||||
Logs.logExecution(execution, log, Level.INFO, "Some log");
|
||||
Logs.logExecution(execution, log, Level.INFO, "Some log with an {}", "attribute");
|
||||
var execution = Execution.builder().tenantId("tenant").namespace("namespace").flowId("flow").id("execution").build();
|
||||
Logs.logExecution(execution, Level.INFO, "Some log");
|
||||
Logs.logExecution(execution, Level.INFO, "Some log with an {}", "attribute");
|
||||
Logs.logExecution(execution, Level.INFO, "Some log");
|
||||
|
||||
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||
assertThat(logs).hasSize(3);
|
||||
assertThat(logs.getFirst().getLoggerName()).isEqualTo("executor.tenant.namespace.flow");
|
||||
}
|
||||
|
||||
@Test
|
||||
void logTrigger() {
|
||||
var trigger = TriggerContext.builder().namespace("namespace").flowId("flow").triggerId("trigger").build();
|
||||
Logs.logTrigger(trigger, log, Level.INFO, "Some log");
|
||||
Logs.logTrigger(trigger, log, Level.INFO, "Some log with an {}", "attribute");
|
||||
var trigger = TriggerContext.builder().tenantId("tenant").namespace("namespace").flowId("flow").triggerId("trigger").build();
|
||||
Logs.logTrigger(trigger, Level.INFO, "Some log");
|
||||
Logs.logTrigger(trigger, Level.INFO, "Some log with an {}", "attribute");
|
||||
Logs.logTrigger(trigger, Level.INFO, "Some log");
|
||||
|
||||
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||
assertThat(logs).hasSize(3);
|
||||
assertThat(logs.getFirst().getLoggerName()).isEqualTo("scheduler.tenant.namespace.flow.trigger");
|
||||
}
|
||||
|
||||
@Test
|
||||
void logTaskRun() {
|
||||
var taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
|
||||
var taskRun = TaskRun.builder().tenantId("tenant").namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
|
||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
|
||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
|
||||
|
||||
taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").value("value").build();
|
||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
|
||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
|
||||
|
||||
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||
assertThat(logs).hasSize(4);
|
||||
assertThat(logs.getFirst().getLoggerName()).isEqualTo("worker.tenant.namespace.flow.task");
|
||||
}
|
||||
|
||||
private static class InMemoryAppender extends AppenderBase<ILoggingEvent> {
|
||||
private final List<ILoggingEvent> logs = new CopyOnWriteArrayList<>();
|
||||
|
||||
@Override
|
||||
protected void append(ILoggingEvent event) {
|
||||
logs.add(event);
|
||||
}
|
||||
|
||||
public List<ILoggingEvent> getLogs() {
|
||||
return logs;
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
logs.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,4 +216,23 @@ class MapUtilsTest {
|
||||
"k1.k4", "v2"
|
||||
));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void mergeShouldNotDuplicateListElements() {
|
||||
Map<String, Object> first = Map.of(
|
||||
"key1", "value1",
|
||||
"key2", List.of("something", "else")
|
||||
);
|
||||
Map<String, Object> second = Map.of(
|
||||
"key2", List.of("something", "other"),
|
||||
"key3", "value3"
|
||||
);
|
||||
|
||||
Map<String, Object> results = MapUtils.merge(first, second);
|
||||
|
||||
assertThat(results).hasSize(3);
|
||||
List<String> list = (List<String>) results.get("key2");
|
||||
assertThat(list).hasSize(3);
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
@@ -45,9 +44,6 @@ class NamespaceFilesUtilsTest {
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
QueueInterface<LogEntry> workerTaskLogQueue;
|
||||
|
||||
@Inject
|
||||
NamespaceFilesUtils namespaceFilesUtils;
|
||||
|
||||
@Inject
|
||||
NamespaceFactory namespaceFactory;
|
||||
|
||||
@@ -66,7 +62,7 @@ class NamespaceFilesUtilsTest {
|
||||
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
|
||||
}
|
||||
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
|
||||
|
||||
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
||||
receive.blockLast();
|
||||
@@ -91,7 +87,7 @@ class NamespaceFilesUtilsTest {
|
||||
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
|
||||
}
|
||||
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||
|
||||
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
||||
receive.blockLast();
|
||||
@@ -116,7 +112,7 @@ class NamespaceFilesUtilsTest {
|
||||
namespaceStorage.putFile(Path.of("/folder2/test.txt"), data);
|
||||
namespaceStorage.putFile(Path.of("/test.txt"), data);
|
||||
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||
|
||||
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
||||
receive.blockLast();
|
||||
@@ -141,7 +137,7 @@ class NamespaceFilesUtilsTest {
|
||||
namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data);
|
||||
namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data);
|
||||
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
|
||||
.namespaces(Property.ofValue(List.of(ns1, ns2)))
|
||||
.folderPerNamespace(Property.ofValue(true))
|
||||
.build());
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@MicronautTest
|
||||
class VersionProviderTest {
|
||||
@Inject
|
||||
private VersionProvider versionProvider;
|
||||
|
||||
@Inject
|
||||
private SettingRepositoryInterface settingRepository;
|
||||
|
||||
@Test
|
||||
void shouldResolveVersion() {
|
||||
assertThat(versionProvider.getVersion()).endsWith("-SNAPSHOT");
|
||||
|
||||
// check that the version is persisted in settings
|
||||
Optional<Setting> versionSettings = settingRepository.findByKey(Setting.INSTANCE_VERSION);
|
||||
assertThat(versionSettings).isPresent();
|
||||
assertThat(versionSettings.get().getValue()).isEqualTo(versionProvider.getVersion());
|
||||
}
|
||||
}
|
||||
@@ -9,9 +9,15 @@ import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import io.kestra.core.models.validations.ValidateConstraintViolation;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.JsonLocation;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.util.List;
|
||||
import java.io.File;
|
||||
import java.net.URL;
|
||||
import java.util.Optional;
|
||||
@@ -23,6 +29,107 @@ class FlowValidationTest {
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
// Helper class to create JsonProcessingException with location
|
||||
private static class TestJsonProcessingException extends JsonProcessingException {
|
||||
public TestJsonProcessingException(String msg, JsonLocation location) {
|
||||
super(msg, location);
|
||||
}
|
||||
public TestJsonProcessingException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_WithExpectedFieldName() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("Expected a field name", new JsonLocation(null, 100, 5, 10));
|
||||
Object dummyTarget = new Object(); // Dummy target for toConstraintViolationException
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").contains("(at line 5)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_WithMappingStartEvent() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("MappingStartEvent", new JsonLocation(null, 200, 3, 5));
|
||||
Object dummyTarget = new Object();
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML syntax error: Unexpected mapping start").contains("(at line 3)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_WithScalarValue() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("Scalar value", new JsonLocation(null, 150, 7, 12));
|
||||
Object dummyTarget = new Object();
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML syntax error: Expected a simple value").contains("(at line 7)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_GenericError() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("Some other error", new JsonLocation(null, 50, 2, 8));
|
||||
Object dummyTarget = new Object();
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML parsing error: Some other error").contains("(at line 2)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFormatYamlErrorMessage_NoLocation() throws JsonProcessingException {
|
||||
JsonProcessingException e = new TestJsonProcessingException("Expected a field name");
|
||||
Object dummyTarget = new Object();
|
||||
|
||||
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||
|
||||
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").doesNotContain("at line");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void testValidateFlowWithYamlSyntaxError() {
|
||||
String invalidYaml = """
|
||||
id: test-flow
|
||||
namespace: io.kestra.unittest
|
||||
tasks:
|
||||
- id:hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: {{ abc }}
|
||||
|
||||
""";
|
||||
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", invalidYaml);
|
||||
|
||||
assertThat(results).hasSize(1);
|
||||
assertThat(results.getFirst().getConstraints()).contains("YAML parsing error").contains("at line");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testValidateFlowWithUndefinedVariable() {
|
||||
String yamlWithUndefinedVar = """
|
||||
id: test-flow
|
||||
namespace: io.kestra.unittest
|
||||
tasks:
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: {{ undefinedVar }}
|
||||
""";
|
||||
|
||||
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", yamlWithUndefinedVar);
|
||||
|
||||
assertThat(results).hasSize(1);
|
||||
assertThat(results.getFirst().getConstraints()).contains("Validation error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void invalidRecursiveFlow() {
|
||||
Flow flow = this.parse("flows/invalids/recursive-flow.yaml");
|
||||
@@ -130,4 +237,4 @@ class FlowValidationTest {
|
||||
|
||||
return YamlParser.parse(file, Flow.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import com.google.common.io.CharStreams;
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
@@ -328,12 +329,12 @@ public class PauseTest {
|
||||
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.PAUSED);
|
||||
|
||||
ConstraintViolationException e = assertThrows(
|
||||
ConstraintViolationException.class,
|
||||
InputOutputValidationException e = assertThrows(
|
||||
InputOutputValidationException.class,
|
||||
() -> executionService.resume(execution, flow, State.Type.RUNNING, Mono.empty(), Pause.Resumed.now()).block()
|
||||
);
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `asked`, missing required input, but received `null`");
|
||||
assertThat(e.getMessage()).contains( "Missing required input:asked");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.State.History;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.InputAndOutput;
|
||||
import io.kestra.core.runners.SubflowExecutionResult;
|
||||
import io.kestra.core.services.VariablesService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -46,11 +47,15 @@ class SubflowTest {
|
||||
@Mock
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Mock
|
||||
private InputAndOutput inputAndOutput;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() {
|
||||
Mockito.when(applicationContext.getBean(VariablesService.class)).thenReturn(new VariablesService());
|
||||
Mockito.when(runContext.logger()).thenReturn(LOG);
|
||||
Mockito.when(runContext.getApplicationContext()).thenReturn(applicationContext);
|
||||
Mockito.when(runContext.inputAndOutput()).thenReturn(inputAndOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -118,7 +123,7 @@ class SubflowTest {
|
||||
|
||||
Map<String, Object> outputs = Map.of("key", "value");
|
||||
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(outputs);
|
||||
|
||||
Mockito.when(inputAndOutput.renderOutputs(Mockito.anyList())).thenReturn(Map.of("key", "value"));
|
||||
|
||||
Subflow subflow = Subflow.builder()
|
||||
.outputs(outputs)
|
||||
@@ -159,6 +164,7 @@ class SubflowTest {
|
||||
|
||||
Output output = Output.builder().id("key").value("value").build();
|
||||
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue()));
|
||||
Mockito.when(inputAndOutput.typedOutputs(Mockito.any(), Mockito.any(), Mockito.anyMap())).thenReturn(Map.of("key", "value"));
|
||||
Flow flow = Flow.builder()
|
||||
.outputs(List.of(output))
|
||||
.build();
|
||||
|
||||
@@ -57,7 +57,7 @@ class ScheduleOnDatesTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() throws Exception {
|
||||
public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() {
|
||||
// given
|
||||
var now = ZonedDateTime.now();
|
||||
var before = now.minusMinutes(1).truncatedTo(ChronoUnit.SECONDS);
|
||||
@@ -75,7 +75,7 @@ class ScheduleOnDatesTest {
|
||||
ZonedDateTime nextDate = scheduleOnDates.nextEvaluationDate(conditionContext, Optional.empty());
|
||||
|
||||
// then
|
||||
assertThat(nextDate).isEqualTo(before);
|
||||
assertThat(nextDate).isEqualTo(after);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -13,6 +13,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.flows.input.MultiselectInput;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
@@ -103,8 +104,9 @@ class ScheduleTest {
|
||||
);
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
assertThat(evaluate.get().getLabels()).hasSize(3);
|
||||
assertThat(evaluate.get().getLabels()).hasSize(4);
|
||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
||||
var vars = evaluate.get().getTrigger().getVariables();
|
||||
var inputs = evaluate.get().getInputs();
|
||||
@@ -137,8 +139,9 @@ class ScheduleTest {
|
||||
);
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
assertThat(evaluate.get().getLabels()).hasSize(3);
|
||||
assertThat(evaluate.get().getLabels()).hasSize(4);
|
||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
||||
var inputs = evaluate.get().getInputs();
|
||||
|
||||
@@ -475,6 +478,81 @@ class ScheduleTest {
|
||||
assertThat(result.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
||||
}
|
||||
|
||||
@Test
|
||||
void successWithMultiselectInputDefaults() throws Exception {
|
||||
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
|
||||
|
||||
ZonedDateTime date = ZonedDateTime.now()
|
||||
.withDayOfMonth(1)
|
||||
.withHour(0)
|
||||
.withMinute(0)
|
||||
.withSecond(0)
|
||||
.truncatedTo(ChronoUnit.SECONDS)
|
||||
.minusMonths(1);
|
||||
|
||||
Optional<Execution> evaluate = trigger.evaluate(
|
||||
conditionContextWithMultiselectInput(trigger),
|
||||
triggerContext(date, trigger));
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
var inputs = evaluate.get().getInputs();
|
||||
|
||||
// Verify MULTISELECT input with explicit defaults works correctly
|
||||
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option1", "option2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void successWithMultiselectInputAutoSelectFirst() throws Exception {
|
||||
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
|
||||
|
||||
ZonedDateTime date = ZonedDateTime.now()
|
||||
.withDayOfMonth(1)
|
||||
.withHour(0)
|
||||
.withMinute(0)
|
||||
.withSecond(0)
|
||||
.truncatedTo(ChronoUnit.SECONDS)
|
||||
.minusMonths(1);
|
||||
|
||||
Optional<Execution> evaluate = trigger.evaluate(
|
||||
conditionContextWithMultiselectAutoSelectFirst(trigger),
|
||||
triggerContext(date, trigger));
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
var inputs = evaluate.get().getInputs();
|
||||
|
||||
// Verify MULTISELECT input with autoSelectFirst defaults to first option
|
||||
assertThat(inputs.get("multiselectAutoSelect")).isEqualTo(List.of("first"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void successWithMultiselectInputProvidedValue() throws Exception {
|
||||
// Test that provided values override defaults for MULTISELECT
|
||||
Schedule trigger = Schedule.builder()
|
||||
.id("schedule")
|
||||
.type(Schedule.class.getName())
|
||||
.cron("0 0 1 * *")
|
||||
.inputs(Map.of("multiselectInput", List.of("option3")))
|
||||
.build();
|
||||
|
||||
ZonedDateTime date = ZonedDateTime.now()
|
||||
.withDayOfMonth(1)
|
||||
.withHour(0)
|
||||
.withMinute(0)
|
||||
.withSecond(0)
|
||||
.truncatedTo(ChronoUnit.SECONDS)
|
||||
.minusMonths(1);
|
||||
|
||||
Optional<Execution> evaluate = trigger.evaluate(
|
||||
conditionContextWithMultiselectInput(trigger),
|
||||
triggerContext(date, trigger));
|
||||
|
||||
assertThat(evaluate.isPresent()).isTrue();
|
||||
var inputs = evaluate.get().getInputs();
|
||||
|
||||
// Verify provided value overrides defaults
|
||||
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option3"));
|
||||
}
|
||||
|
||||
private ConditionContext conditionContext(AbstractTrigger trigger) {
|
||||
Flow flow = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
@@ -504,17 +582,79 @@ class ScheduleTest {
|
||||
.build();
|
||||
}
|
||||
|
||||
private ConditionContext conditionContextWithMultiselectInput(AbstractTrigger trigger) {
|
||||
Flow flow = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.tests")
|
||||
.labels(
|
||||
List.of(
|
||||
new Label("flow-label-1", "flow-label-1"),
|
||||
new Label("flow-label-2", "flow-label-2")))
|
||||
.variables(Map.of("custom_var", "VARIABLE VALUE"))
|
||||
.inputs(List.of(
|
||||
MultiselectInput.builder()
|
||||
.id("multiselectInput")
|
||||
.type(Type.MULTISELECT)
|
||||
.values(List.of("option1", "option2", "option3"))
|
||||
.defaults(Property.ofValue(List.of("option1", "option2")))
|
||||
.build()))
|
||||
.build();
|
||||
|
||||
TriggerContext triggerContext = TriggerContext.builder()
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.triggerId(trigger.getId())
|
||||
.build();
|
||||
|
||||
return ConditionContext.builder()
|
||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
|
||||
triggerContext, trigger))
|
||||
.flow(flow)
|
||||
.build();
|
||||
}
|
||||
|
||||
private ConditionContext conditionContextWithMultiselectAutoSelectFirst(AbstractTrigger trigger) {
|
||||
Flow flow = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.tests")
|
||||
.labels(
|
||||
List.of(
|
||||
new Label("flow-label-1", "flow-label-1"),
|
||||
new Label("flow-label-2", "flow-label-2")))
|
||||
.variables(Map.of("custom_var", "VARIABLE VALUE"))
|
||||
.inputs(List.of(
|
||||
MultiselectInput.builder()
|
||||
.id("multiselectAutoSelect")
|
||||
.type(Type.MULTISELECT)
|
||||
.values(List.of("first", "second", "third"))
|
||||
.autoSelectFirst(true)
|
||||
.build()))
|
||||
.build();
|
||||
|
||||
TriggerContext triggerContext = TriggerContext.builder()
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.triggerId(trigger.getId())
|
||||
.build();
|
||||
|
||||
return ConditionContext.builder()
|
||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
|
||||
triggerContext, trigger))
|
||||
.flow(flow)
|
||||
.build();
|
||||
}
|
||||
|
||||
private ZonedDateTime dateFromVars(String date, ZonedDateTime expexted) {
|
||||
return ZonedDateTime.parse(date).withZoneSameInstant(expexted.getZone());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldGetNextExecutionDateWithConditionMatchingFutureDate() throws InternalException {
|
||||
|
||||
|
||||
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
|
||||
OffsetTime before = now.minusHours(1).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||
OffsetTime after = now.minusHours(4).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||
|
||||
|
||||
Schedule trigger = Schedule.builder()
|
||||
.id("schedule").type(Schedule.class.getName())
|
||||
.cron("0 * * * *") // every hour
|
||||
@@ -527,25 +667,25 @@ class ScheduleTest {
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
|
||||
|
||||
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
|
||||
|
||||
|
||||
ConditionContext conditionContext = ConditionContext.builder()
|
||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
|
||||
.build();
|
||||
|
||||
|
||||
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
|
||||
assertThat(result).isNotEmpty();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void shouldGetNextExecutionDateWithConditionMatchingCurrentDate() throws InternalException {
|
||||
|
||||
|
||||
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
|
||||
|
||||
OffsetTime before = now.plusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||
OffsetTime after = now.minusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||
|
||||
|
||||
Schedule trigger = Schedule.builder()
|
||||
.id("schedule").type(Schedule.class.getName())
|
||||
.cron("*/30 * * * * *")
|
||||
@@ -558,13 +698,13 @@ class ScheduleTest {
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
|
||||
|
||||
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
|
||||
|
||||
|
||||
ConditionContext conditionContext = ConditionContext.builder()
|
||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
|
||||
.build();
|
||||
|
||||
|
||||
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
|
||||
assertThat(result).isNotEmpty();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
id: inputs-with-multiple-constraint-violations
|
||||
namespace: io.kestra.tests
|
||||
inputs:
|
||||
- id: multi
|
||||
type: MULTISELECT
|
||||
values:
|
||||
- A
|
||||
- B
|
||||
- C
|
||||
options:
|
||||
- X
|
||||
- Y
|
||||
- Z
|
||||
|
||||
tasks:
|
||||
- id: validMultiSelect
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{inputs.multi}}"
|
||||
10
core/src/test/resources/flows/valids/each-pause.yaml
Normal file
10
core/src/test/resources/flows/valids/each-pause.yaml
Normal file
@@ -0,0 +1,10 @@
|
||||
id: each-pause
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: each_task
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
values: '["a", "b"]'
|
||||
tasks:
|
||||
- id: pause
|
||||
type: io.kestra.plugin.core.flow.Pause
|
||||
@@ -8,4 +8,4 @@ concurrency:
|
||||
tasks:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT10S
|
||||
duration: PT2S
|
||||
|
||||
@@ -402,10 +402,11 @@ public class ExecutorService {
|
||||
|
||||
if (flow.getOutputs() != null) {
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
var inputAndOutput = runContext.inputAndOutput();
|
||||
|
||||
try {
|
||||
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
|
||||
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
|
||||
Map<String, Object> outputs = inputAndOutput.renderOutputs(flow.getOutputs());
|
||||
outputs = inputAndOutput.typedOutputs(flow, executor.getExecution(), outputs);
|
||||
newExecution = newExecution.withOutputs(outputs);
|
||||
} catch (Exception e) {
|
||||
Logs.logExecution(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||
|
||||
public class H2RunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
||||
public class H2RunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||
|
||||
public class MysqlRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
||||
public class MysqlRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||
|
||||
public class PostgresRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
||||
public class PostgresRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||
}
|
||||
|
||||
@@ -44,9 +44,15 @@ public abstract class AbstractJdbcSettingRepository extends AbstractJdbcCrudRepo
|
||||
|
||||
@Override
|
||||
public Setting save(Setting setting) {
|
||||
this.eventPublisher.publishEvent(new CrudEvent<>(setting, CrudEventType.UPDATE));
|
||||
|
||||
return internalSave(setting);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting internalSave(Setting setting) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(setting);
|
||||
this.jdbcRepository.persist(setting, fields);
|
||||
this.eventPublisher.publishEvent(new CrudEvent<>(setting, CrudEventType.UPDATE));
|
||||
|
||||
return setting;
|
||||
}
|
||||
|
||||
@@ -72,12 +72,12 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo
|
||||
|
||||
@Override
|
||||
public Optional<Trigger> findLast(TriggerContext trigger) {
|
||||
return findOne(DSL.trueCondition(), field("key").eq(trigger.uid()));
|
||||
return findByUid(trigger.uid());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Trigger> findByExecution(Execution execution) {
|
||||
return findOne(execution.getTenantId(), field("execution_id").eq(execution.getId()));
|
||||
public Optional<Trigger> findByUid(String uid) {
|
||||
return findOne(DSL.trueCondition(), field("key").eq(uid));
|
||||
}
|
||||
|
||||
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) {
|
||||
|
||||
@@ -74,15 +74,19 @@ public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository
|
||||
* Decrement the concurrency limit counter.
|
||||
* Must only be called when a flow having concurrency limit ends.
|
||||
*/
|
||||
public void decrement(FlowInterface flow) {
|
||||
this.jdbcRepository
|
||||
public int decrement(FlowInterface flow) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
.transactionResult(configuration -> {
|
||||
var dslContext = DSL.using(configuration);
|
||||
|
||||
fetchOne(dslContext, flow).ifPresent(
|
||||
concurrencyLimit -> update(dslContext, concurrencyLimit.withRunning(concurrencyLimit.getRunning() == 0 ? 0 : concurrencyLimit.getRunning() - 1))
|
||||
);
|
||||
return fetchOne(dslContext, flow).map(
|
||||
concurrencyLimit -> {
|
||||
int newLimit = concurrencyLimit.getRunning() == 0 ? 0 : concurrencyLimit.getRunning() - 1;
|
||||
update(dslContext, concurrencyLimit.withRunning(newLimit));
|
||||
return newLimit;
|
||||
}
|
||||
).orElse(0);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
@@ -1138,9 +1139,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
execution.getTrigger().getId()
|
||||
);
|
||||
} else {
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
|
||||
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1210,24 +1209,30 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
// as we may receive multiple time killed execution (one when we kill it, then one for each running worker task), we limit to the first we receive: when the state transitionned from KILLING to KILLED
|
||||
boolean killingThenKilled = execution.getState().getCurrent().isKilled() && executor.getOriginalState() == State.Type.KILLING;
|
||||
if (!queuedThenKilled && !concurrencyShortCircuitState && (!execution.getState().getCurrent().isKilled() || killingThenKilled)) {
|
||||
// decrement execution concurrency limit and pop a new queued execution if needed
|
||||
concurrencyLimitStorage.decrement(executor.getFlow());
|
||||
int newLimit = concurrencyLimitStorage.decrement(executor.getFlow());
|
||||
|
||||
if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
|
||||
var finalFlow = executor.getFlow();
|
||||
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
|
||||
executor.getFlow().getNamespace(),
|
||||
executor.getFlow().getId(),
|
||||
throwBiConsumer((dslContext, queued) -> {
|
||||
var newExecution = queued.withState(State.Type.RUNNING);
|
||||
concurrencyLimitStorage.increment(dslContext, finalFlow);
|
||||
executionQueue.emit(newExecution);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
|
||||
// process flow triggers to allow listening on RUNNING state after a QUEUED state
|
||||
processFlowTriggers(newExecution);
|
||||
})
|
||||
);
|
||||
if (newLimit < finalFlow.getConcurrency().getLimit()) {
|
||||
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
|
||||
executor.getFlow().getNamespace(),
|
||||
executor.getFlow().getId(),
|
||||
throwBiConsumer((dslContext, queued) -> {
|
||||
var newExecution = queued.withState(State.Type.RUNNING);
|
||||
concurrencyLimitStorage.increment(dslContext, finalFlow);
|
||||
executionQueue.emit(newExecution);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
|
||||
// process flow triggers to allow listening on RUNNING state after a QUEUED state
|
||||
processFlowTriggers(newExecution);
|
||||
})
|
||||
);
|
||||
} else {
|
||||
log.error("Concurrency limit reached for flow {}.{} after decrementing the execution running count due to the terminated execution {}. No new executions will be dequeued.", executor.getFlow().getNamespace(), executor.getFlow().getId(), executor.getExecution().getId());
|
||||
}
|
||||
} else if (newLimit >= executor.getFlow().getConcurrency().getLimit()) {
|
||||
log.error("Concurrency limit reached for flow {}.{} after decrementing the execution running count due to the terminated execution {}. This should not happen.", executor.getFlow().getNamespace(), executor.getFlow().getId(), executor.getExecution().getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1235,11 +1240,7 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
// purge the trigger: reset scheduler trigger at end
|
||||
if (execution.getTrigger() != null) {
|
||||
FlowWithSource flow = executor.getFlow();
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.ifPresent(trigger -> {
|
||||
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
|
||||
});
|
||||
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
|
||||
}
|
||||
|
||||
// Purge the workerTaskResultQueue and the workerJobQueue
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
import io.kestra.core.runners.ConcurrencyLimit;
|
||||
import io.kestra.core.runners.TestRunnerUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class JdbcConcurrencyRunnerTest extends AbstractRunnerConcurrencyTest {
|
||||
public static final String NAMESPACE = "io.kestra.tests";
|
||||
|
||||
@Inject
|
||||
private AbstractJdbcConcurrencyLimitStorage concurrencyLimitStorage;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
private TestRunnerUtils runnerUtils;
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-queued-protection")
|
||||
void flowConcurrencyQueuedProtection() throws QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
|
||||
Flow flow = flowRepository
|
||||
.findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(State.Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
|
||||
|
||||
// manually update the concurrency count so that queued protection kicks in and no new execution would be popped
|
||||
ConcurrencyLimit concurrencyLimit = concurrencyLimitStorage.findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue").orElseThrow();
|
||||
concurrencyLimit = concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1);
|
||||
concurrencyLimitStorage.update(concurrencyLimit);
|
||||
|
||||
Execution executionResult1 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(State.Type.SUCCESS), execution1);
|
||||
assertThat(executionResult1.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
// we wait for a few ms and checked that the second execution is still queued
|
||||
Thread.sleep(500);
|
||||
Execution executionResult2 = executionRepository.findById("flow-concurrency-queued-protection", execution2.getId()).orElseThrow();
|
||||
assertThat(executionResult2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
|
||||
|
||||
// we manually reset the concurrency count to avoid messing with any other tests
|
||||
concurrencyLimitStorage.update(concurrencyLimit.withRunning(concurrencyLimit.getRunning() - 1));
|
||||
}
|
||||
}
|
||||
@@ -30,14 +30,16 @@ dependencies {
|
||||
// as Jackson is in the Micronaut BOM, to force its version we need to use enforcedPlatform but it didn't really work, see later :(
|
||||
api enforcedPlatform("com.fasterxml.jackson:jackson-bom:$jacksonVersion")
|
||||
api enforcedPlatform("org.slf4j:slf4j-api:$slf4jVersion")
|
||||
api platform("io.micronaut.platform:micronaut-platform:4.9.4")
|
||||
api platform("io.qameta.allure:allure-bom:2.31.0")
|
||||
api platform("io.micronaut.platform:micronaut-platform:4.10.5")
|
||||
api platform("io.qameta.allure:allure-bom:2.32.0")
|
||||
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
|
||||
api platform('com.google.cloud:libraries-bom:26.72.0')
|
||||
api platform('com.google.cloud:libraries-bom:26.73.0')
|
||||
api platform("com.azure:azure-sdk-bom:1.3.3")
|
||||
api platform('software.amazon.awssdk:bom:2.40.5')
|
||||
api platform('software.amazon.awssdk:bom:2.40.10')
|
||||
api platform("dev.langchain4j:langchain4j-bom:$langchain4jVersion")
|
||||
api platform("dev.langchain4j:langchain4j-community-bom:$langchain4jCommunityVersion")
|
||||
// Micronaut 4.10 brings a Jetty version no compatible with the one from Wiremock so we bump it here
|
||||
api platform("org.eclipse.jetty.ee10:jetty-ee10-bom:12.1.2")
|
||||
|
||||
constraints {
|
||||
// downgrade to proto 1.3.2-alpha as 1.5.0 needs protobuf 4
|
||||
@@ -77,12 +79,12 @@ dependencies {
|
||||
api "org.apache.kafka:kafka-clients:$kafkaVersion"
|
||||
api "org.apache.kafka:kafka-streams:$kafkaVersion"
|
||||
// AWS CRT is not included in the AWS BOM but needed for the S3 Transfer manager
|
||||
api 'software.amazon.awssdk.crt:aws-crt:0.40.3'
|
||||
api 'software.amazon.awssdk.crt:aws-crt:0.41.0'
|
||||
|
||||
// Other libs
|
||||
api("org.projectlombok:lombok:1.18.42")
|
||||
api("org.codehaus.janino:janino:3.1.12")
|
||||
api group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.25.2'
|
||||
api group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.25.3'
|
||||
api group: 'org.slf4j', name: 'jul-to-slf4j', version: slf4jVersion
|
||||
api group: 'org.slf4j', name: 'jcl-over-slf4j', version: slf4jVersion
|
||||
api group: 'org.fusesource.jansi', name: 'jansi', version: '2.4.2'
|
||||
@@ -99,11 +101,11 @@ dependencies {
|
||||
api group: 'org.apache.maven.resolver', name: 'maven-resolver-transport-file', version: mavenResolverVersion
|
||||
api group: 'org.apache.maven.resolver', name: 'maven-resolver-transport-apache', version: mavenResolverVersion
|
||||
api 'com.github.oshi:oshi-core:6.9.1'
|
||||
api 'io.pebbletemplates:pebble:4.0.0'
|
||||
api 'io.pebbletemplates:pebble:4.1.0'
|
||||
api group: 'co.elastic.logging', name: 'logback-ecs-encoder', version: '1.7.0'
|
||||
api group: 'de.focus-shift', name: 'jollyday-core', version: jollydayVersion
|
||||
api group: 'de.focus-shift', name: 'jollyday-jaxb', version: jollydayVersion
|
||||
api 'nl.basjes.gitignore:gitignore-reader:1.13.0'
|
||||
api 'nl.basjes.gitignore:gitignore-reader:1.14.1'
|
||||
api group: 'dev.failsafe', name: 'failsafe', version: '3.3.2'
|
||||
api group: 'com.cronutils', name: 'cron-utils', version: '9.2.1'
|
||||
api group: 'com.github.victools', name: 'jsonschema-generator', version: jsonschemaVersion
|
||||
|
||||
@@ -288,7 +288,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
disableInvalidTrigger(workerTriggerResult.getTriggerContext(), e);
|
||||
return;
|
||||
}
|
||||
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate);
|
||||
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate, workerTriggerResult.getTrigger());
|
||||
} else {
|
||||
ZonedDateTime nextExecutionDate;
|
||||
try {
|
||||
@@ -715,7 +715,8 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
Optional<SchedulerExecutionWithTrigger> schedulerExecutionWithTrigger = evaluateScheduleTrigger(f);
|
||||
if (schedulerExecutionWithTrigger.isPresent()) {
|
||||
this.handleEvaluateSchedulingTriggerResult(schedule, schedulerExecutionWithTrigger.get(), f.getConditionContext(), scheduleContext);
|
||||
} else {
|
||||
}
|
||||
else{
|
||||
// compute next date and save the trigger to avoid evaluating it each second
|
||||
Trigger trigger = Trigger.fromEvaluateFailed(
|
||||
f.getTriggerContext(),
|
||||
@@ -750,26 +751,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
// validate schedule condition can fail to render variables
|
||||
// in this case, we send a failed execution so the trigger is not evaluated each second.
|
||||
logger.error("Unable to evaluate the trigger '{}'", f.getAbstractTrigger().getId(), ie);
|
||||
Execution execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(f.getTriggerContext().getTenantId())
|
||||
.namespace(f.getTriggerContext().getNamespace())
|
||||
.flowId(f.getTriggerContext().getFlowId())
|
||||
.flowRevision(f.getFlow().getRevision())
|
||||
.labels(LabelService.labelsExcludingSystem(f.getFlow()))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
ZonedDateTime nextExecutionDate;
|
||||
try {
|
||||
nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
|
||||
} catch (InvalidTriggerConfigurationException e2) {
|
||||
logError(f, e2);
|
||||
disableInvalidTrigger(f, e2);
|
||||
return;
|
||||
}
|
||||
|
||||
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
|
||||
handleFailedEvaluatedTrigger(f, scheduleContext, ie);
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -786,7 +768,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
}
|
||||
|
||||
private void handleEvaluateWorkerTriggerResult(SchedulerExecutionWithTrigger result, ZonedDateTime
|
||||
nextExecutionDate) {
|
||||
nextExecutionDate, AbstractTrigger abstractTrigger) {
|
||||
Optional.ofNullable(result)
|
||||
.ifPresent(executionWithTrigger -> {
|
||||
log(executionWithTrigger);
|
||||
@@ -797,6 +779,12 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
nextExecutionDate
|
||||
);
|
||||
|
||||
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
|
||||
// i.e., the trigger will not be locked
|
||||
if (abstractTrigger.isAllowConcurrent()) {
|
||||
trigger = trigger.toBuilder().executionId(null).build();
|
||||
}
|
||||
|
||||
// Worker triggers result is evaluated in another thread with the workerTriggerResultQueue.
|
||||
// We can then update the trigger directly.
|
||||
this.saveLastTriggerAndEmitExecution(executionWithTrigger.getExecution(), trigger, triggerToSave -> this.triggerState.update(triggerToSave));
|
||||
@@ -818,6 +806,12 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
if (result.getExecution().getState().getCurrent() == State.Type.FAILED) {
|
||||
trigger = trigger.resetExecution(State.Type.FAILED);
|
||||
}
|
||||
|
||||
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
|
||||
// i.e., the trigger will not be locked
|
||||
if (((AbstractTrigger)schedule).isAllowConcurrent()) {
|
||||
trigger = trigger.toBuilder().executionId(null).build();
|
||||
}
|
||||
|
||||
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
|
||||
// So we must save them by passing the scheduleContext.
|
||||
@@ -983,11 +977,43 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
));
|
||||
} catch (Exception e) {
|
||||
logError(flowWithTrigger, e);
|
||||
Execution failedExecution = createFailedExecution( flowWithTrigger, e);
|
||||
this.emitExecution(failedExecution, flowWithTrigger.getTriggerContext());
|
||||
return Optional.empty();
|
||||
}
|
||||
});
|
||||
}
|
||||
private Execution createFailedExecution(FlowWithWorkerTrigger flowWithTrigger, Throwable e){
|
||||
Execution execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.tenantId(flowWithTrigger.getTriggerContext().getTenantId())
|
||||
.namespace(flowWithTrigger.getTriggerContext().getNamespace())
|
||||
.flowId(flowWithTrigger.getTriggerContext().getFlowId())
|
||||
.flowRevision(flowWithTrigger.getFlow().getRevision())
|
||||
.labels(LabelService.labelsExcludingSystem(flowWithTrigger.getFlow()))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
Logger logger = runContextFactory.of(flowWithTrigger.getFlow(), execution).logger();
|
||||
logger.error("[trigger: {}] [date: {}] Evaluate Failed with error '{}'" , flowWithTrigger.getAbstractTrigger().getId(), now(), e.getMessage());
|
||||
return execution;
|
||||
}
|
||||
private void handleFailedEvaluatedTrigger(FlowWithWorkerTrigger flowWithTrigger, ScheduleContextInterface scheduleContext, Throwable e ){
|
||||
|
||||
Execution execution = createFailedExecution(flowWithTrigger, e);
|
||||
ZonedDateTime nextExecutionDate;
|
||||
try {
|
||||
nextExecutionDate = this.nextEvaluationDate(flowWithTrigger.getAbstractTrigger());
|
||||
} catch (InvalidTriggerConfigurationException e2) {
|
||||
logError(flowWithTrigger, e2);
|
||||
disableInvalidTrigger(flowWithTrigger, e2);
|
||||
return;
|
||||
}
|
||||
|
||||
var trigger = flowWithTrigger.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
|
||||
trigger = trigger.checkBackfill();
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
|
||||
|
||||
}
|
||||
private void logError(FlowWithWorkerTrigger flowWithWorkerTriggerNextDate, Throwable e) {
|
||||
Logger logger = flowWithWorkerTriggerNextDate.getConditionContext().getRunContext().logger();
|
||||
|
||||
|
||||
@@ -91,6 +91,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
assertThat(queueCount.getCount()).isEqualTo(0L);
|
||||
assertThat(last.get()).isNotNull();
|
||||
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,6 +137,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
assertThat(queueCount.getCount()).isEqualTo(0L);
|
||||
assertThat(last.get()).isNotNull();
|
||||
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(last.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
|
||||
// Assert that the trigger is now disabled.
|
||||
// It needs to await on assertion as it will be disabled AFTER we receive a success execution.
|
||||
|
||||
@@ -489,9 +489,8 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getDisabled()).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(10));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void failedEvaluationTest() {
|
||||
void failedEvaluationFromFailedCondition() {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
Schedule schedule = createScheduleTrigger("Europe/Paris", "* * * * *", "failedEvaluation", false)
|
||||
@@ -527,6 +526,61 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
assertThat(execution).isNotNull();
|
||||
assertThat(execution.getFlowId()).isEqualTo(flow.getId());
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
queueCount.countDown();
|
||||
});
|
||||
|
||||
scheduler.run();
|
||||
|
||||
queueCount.await(1, TimeUnit.MINUTES);
|
||||
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(queueCount.getCount()).isEqualTo(0L);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
@Test
|
||||
void failedEvaluationFromInvalidExpression() {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
Schedule schedule = createScheduleTrigger("Europe/Paris", "* * * * *", "failedEvaluation", false)
|
||||
.inputs(
|
||||
Map.of("invalidExpressionInput", Expression.builder()
|
||||
.type(Expression.class.getName())
|
||||
.expression(Property.ofExpression("{{ now().hour == 0 ? 3 : 2 }}"))
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.build();
|
||||
FlowWithSource flow = createFlow(this.tenantId,Collections.singletonList(schedule));
|
||||
doReturn(List.of(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// to avoid waiting too much before a trigger execution, we add a last trigger with a date now - 1m.
|
||||
Trigger lastTrigger = Trigger
|
||||
.builder()
|
||||
.triggerId("failedEvaluation")
|
||||
.tenantId(this.tenantId)
|
||||
.flowId(flow.getId())
|
||||
.namespace(flow.getNamespace())
|
||||
.date(ZonedDateTime.now().minusMinutes(1L))
|
||||
.build();
|
||||
triggerState.create(lastTrigger);
|
||||
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
assertThat(execution).isNotNull();
|
||||
assertThat(execution.getFlowId()).isEqualTo(flow.getId());
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
|
||||
@@ -104,6 +104,7 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
|
||||
assertThat(SchedulerStreamingTest.startedEvaluate.get(false), is(1));
|
||||
assertThat(last.getTrigger().getVariables().get("startedEvaluate"), is(1));
|
||||
assertTrue(last.getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||
assertTrue(last.getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -149,8 +149,7 @@ public class CommandsWrapper implements TaskCommands {
|
||||
|
||||
public <T extends TaskRunnerDetailResult> ScriptOutput run() throws Exception {
|
||||
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
|
||||
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class);
|
||||
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
||||
NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
||||
}
|
||||
|
||||
TaskRunner<T> realTaskRunner = this.getTaskRunner();
|
||||
@@ -158,9 +157,7 @@ public class CommandsWrapper implements TaskCommands {
|
||||
FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
|
||||
}
|
||||
|
||||
RunContextInitializer initializer = ((DefaultRunContext) runContext).getApplicationContext().getBean(RunContextInitializer.class);
|
||||
|
||||
RunContext taskRunnerRunContext = initializer.forPlugin(((DefaultRunContext) runContext).clone(), realTaskRunner);
|
||||
RunContext taskRunnerRunContext = runContext.cloneForPlugin(realTaskRunner);
|
||||
|
||||
List<String> renderedCommands = this.renderCommands(runContext, commands);
|
||||
List<String> renderedBeforeCommands = this.renderCommands(runContext, beforeCommands);
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
<meta http-equiv="X-UA-Compatible" content="IE=edge">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
|
||||
<link rel="icon" href="/favicon.ico">
|
||||
<link rel="icon" type="image/png" sizes="192x192" href="/favicon-192x192.png">
|
||||
<link rel="icon" type="image/png" sizes="192x192" href="/favicon-192x192.png">
|
||||
<meta name="msapplication-TileColor" content="#192A4E">
|
||||
<meta name="theme-color" content="#192A4E">
|
||||
<link rel="stylesheet" href="/loader.css" />
|
||||
@@ -26,26 +26,6 @@
|
||||
document.getElementsByTagName("html")[0].classList.add(localStorage.getItem("theme"));
|
||||
}
|
||||
</script>
|
||||
|
||||
<!-- Optional but recommended for faster connection -->
|
||||
<link rel="preconnect" href="https://fonts.googleapis.com">
|
||||
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
|
||||
|
||||
<!-- Load Google Fonts non-blocking -->
|
||||
<link
|
||||
rel="stylesheet"
|
||||
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;600;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
|
||||
media="print"
|
||||
onload="this.media='all'"
|
||||
>
|
||||
|
||||
<!-- Fallback for when JavaScript is disabled -->
|
||||
<noscript>
|
||||
<link
|
||||
rel="stylesheet"
|
||||
href="https://fonts.googleapis.com/css2?family=Public+Sans:wght@300;400;600;700;800&family=Source+Code+Pro:wght@400;700;800&display=swap"
|
||||
>
|
||||
</noscript>
|
||||
</head>
|
||||
<body>
|
||||
<noscript>
|
||||
|
||||
1167
ui/package-lock.json
generated
1167
ui/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user