mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
66 Commits
feat/refac
...
fix/remove
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
811af5c5a1 | ||
|
|
1b73ddd097 | ||
|
|
9803ecc6d0 | ||
|
|
b4871fcb15 | ||
|
|
f352be5746 | ||
|
|
5fc6d0b5d7 | ||
|
|
de5750f656 | ||
|
|
fa870b8df2 | ||
|
|
fa4bf64a23 | ||
|
|
27f81b5b6d | ||
|
|
90d322cd67 | ||
|
|
04246ace13 | ||
|
|
e0e745cb91 | ||
|
|
c69ecd7200 | ||
|
|
4b3419bc15 | ||
|
|
352d4eb194 | ||
|
|
e433833e62 | ||
|
|
d16a8de90f | ||
|
|
4784e459d6 | ||
|
|
2abea0fcde | ||
|
|
5d5165b7b9 | ||
|
|
44d0c10713 | ||
|
|
167734e32a | ||
|
|
24e61c81c0 | ||
|
|
379764a033 | ||
|
|
d55dd275c3 | ||
|
|
f409657e8a | ||
|
|
22f0b3ffdf | ||
|
|
0d99dc6862 | ||
|
|
fd3adc48b8 | ||
|
|
1a8a47c8cd | ||
|
|
7ea95f393e | ||
|
|
6935900699 | ||
|
|
0bc8e8d74a | ||
|
|
7f77b24ae0 | ||
|
|
ec6820dc25 | ||
|
|
d94193c143 | ||
|
|
c9628047fa | ||
|
|
4cbc069af4 | ||
|
|
eabe573fe6 | ||
|
|
ecd64617c3 | ||
|
|
a5650bca0f | ||
|
|
ed59e262d4 | ||
|
|
a5f9d54f7d | ||
|
|
47f4f43198 | ||
|
|
5d31c97f7f | ||
|
|
f8107285c4 | ||
|
|
8dc8dc1796 | ||
|
|
834dfd2947 | ||
|
|
6edb88841f | ||
|
|
5653531628 | ||
|
|
ee61276106 | ||
|
|
abcf76f7b4 | ||
|
|
67ada7f61b | ||
|
|
0c13633f77 | ||
|
|
a6cf2015ff | ||
|
|
2f9216c70b | ||
|
|
1903e6fac5 | ||
|
|
2d2cb00cab | ||
|
|
01b5441d16 | ||
|
|
efc778e294 | ||
|
|
60235a4e73 | ||
|
|
b167c52e76 | ||
|
|
216b124294 | ||
|
|
b6e4df8de2 | ||
|
|
429e7c7945 |
2
.github/pull_request_template.md
vendored
2
.github/pull_request_template.md
vendored
@@ -12,7 +12,7 @@ _Example: Replaces legacy scroll directive with the new API._
|
||||
### 🔗 Related Issue
|
||||
|
||||
Which issue does this PR resolve? Use [GitHub Keywords](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue) to automatically link the pull request to the issue.
|
||||
_Example: Closes https://github.com/kestra-io/kestra/issues/12345._
|
||||
_Example: Closes https://github.com/kestra-io/kestra/issues/ISSUE_NUMBER._
|
||||
|
||||
### 🎨 Frontend Checklist
|
||||
|
||||
|
||||
2
.github/workflows/vulnerabilities-check.yml
vendored
2
.github/workflows/vulnerabilities-check.yml
vendored
@@ -43,7 +43,7 @@ jobs:
|
||||
|
||||
# Upload dependency check report
|
||||
- name: Upload dependency check report
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: dependency-check-report
|
||||
|
||||
@@ -21,7 +21,7 @@ plugins {
|
||||
|
||||
// test
|
||||
id "com.adarshr.test-logger" version "4.0.0"
|
||||
id "org.sonarqube" version "7.2.0.6526"
|
||||
id "org.sonarqube" version "7.2.1.6560"
|
||||
id 'jacoco-report-aggregation'
|
||||
|
||||
// helper
|
||||
@@ -331,7 +331,7 @@ subprojects {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
agent "org.aspectj:aspectjweaver:1.9.25"
|
||||
agent "org.aspectj:aspectjweaver:1.9.25.1"
|
||||
}
|
||||
|
||||
test {
|
||||
|
||||
@@ -42,7 +42,7 @@ import picocli.CommandLine.Option;
|
||||
@Introspected
|
||||
public abstract class AbstractCommand implements Callable<Integer> {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
protected ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
private EndpointDefaultConfiguration endpointConfiguration;
|
||||
|
||||
@@ -18,7 +18,8 @@ import picocli.CommandLine;
|
||||
FlowDotCommand.class,
|
||||
FlowExportCommand.class,
|
||||
FlowUpdateCommand.class,
|
||||
FlowUpdatesCommand.class
|
||||
FlowUpdatesCommand.class,
|
||||
FlowsSyncFromSourceCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.List;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "syncFromSource",
|
||||
description = "Update a single flow",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
public class FlowsSyncFromSourceCommand extends AbstractApiCommand {
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
FlowRepositoryInterface repository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
String tenant = tenantService.getTenantId(tenantId);
|
||||
|
||||
List<FlowWithSource> persistedFlows = repository.findAllWithSource(tenant);
|
||||
|
||||
int count = 0;
|
||||
for (FlowWithSource persistedFlow : persistedFlows) {
|
||||
// Ensure exactly one trailing newline. We need this new line
|
||||
// because when we update a flow from its source,
|
||||
// we don't update it if no change is detected.
|
||||
// The goal here is to force an update from the source for every flows
|
||||
GenericFlow flow = GenericFlow.fromYaml(tenant,persistedFlow.getSource() + System.lineSeparator());
|
||||
repository.update(flow, persistedFlow);
|
||||
stdOut("- %s.%s".formatted(flow.getNamespace(), flow.getId()));
|
||||
count++;
|
||||
}
|
||||
stdOut("%s flow(s) successfully updated!".formatted(count));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected boolean loadExternalPlugins() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class FlowsSyncFromSourceCommandTest {
|
||||
@Test
|
||||
void updateAllFlowsFromSource() {
|
||||
URL directory = FlowUpdatesCommandTest.class.getClassLoader().getResource("flows");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"--delete",
|
||||
directory.getPath(),
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("successfully updated !");
|
||||
out.reset();
|
||||
|
||||
FlowRepositoryInterface repository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
List<Flow> flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(1);
|
||||
}
|
||||
|
||||
args = new String[]{
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word"
|
||||
|
||||
};
|
||||
PicocliRunner.call(FlowsSyncFromSourceCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("4 flow(s) successfully updated!");
|
||||
assertThat(out.toString()).contains("- io.kestra.outsider.quattro");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.second");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.third");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.first");
|
||||
|
||||
flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,9 @@ dependencies {
|
||||
// reactor
|
||||
api "io.projectreactor:reactor-core"
|
||||
|
||||
// awaitility
|
||||
api 'org.awaitility:awaitility'
|
||||
|
||||
// micronaut
|
||||
api "io.micronaut.data:micronaut-data-model"
|
||||
implementation "io.micronaut:micronaut-http-server-netty"
|
||||
@@ -82,8 +85,8 @@ dependencies {
|
||||
testImplementation "io.micronaut:micronaut-http-server-netty"
|
||||
testImplementation "io.micronaut:micronaut-management"
|
||||
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.3"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.4"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.4"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on"
|
||||
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
|
||||
@@ -42,13 +42,12 @@ import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.*;
|
||||
import java.time.*;
|
||||
@@ -299,7 +298,9 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
// default value
|
||||
builder.forFields().withDefaultResolver(this::defaults);
|
||||
builder.forFields()
|
||||
.withIgnoreCheck(fieldScope -> fieldScope.getAnnotation(Hidden.class) != null)
|
||||
.withDefaultResolver(this::defaults);
|
||||
|
||||
// def name
|
||||
builder.forTypesInGeneral()
|
||||
@@ -809,9 +810,9 @@ public class JsonSchemaGenerator {
|
||||
// we don't return base properties unless specified with @PluginProperty and hidden is false
|
||||
builder
|
||||
.forFields()
|
||||
.withIgnoreCheck(fieldScope -> base != null &&
|
||||
.withIgnoreCheck(fieldScope -> (base != null &&
|
||||
(fieldScope.getAnnotation(PluginProperty.class) == null || fieldScope.getAnnotation(PluginProperty.class).hidden()) &&
|
||||
fieldScope.getDeclaringType().getTypeName().equals(base.getName())
|
||||
fieldScope.getDeclaringType().getTypeName().equals(base.getName())) || fieldScope.getAnnotation(Hidden.class) != null
|
||||
);
|
||||
|
||||
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Exception that can be thrown when Inputs/Outputs have validation problems.
|
||||
*/
|
||||
public class InputOutputValidationException extends KestraRuntimeException {
|
||||
public InputOutputValidationException(String message) {
|
||||
super(message);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Input<?> input){
|
||||
String inputMessage = "Invalid value for input" + " `" + input.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(inputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Output output){
|
||||
String outputMessage = "Invalid value for output" + " `" + output.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(outputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of(String message){
|
||||
return new InputOutputValidationException(message);
|
||||
}
|
||||
|
||||
public static InputOutputValidationException merge(Set<InputOutputValidationException> exceptions){
|
||||
String combinedMessage = exceptions.stream()
|
||||
.map(InputOutputValidationException::getMessage)
|
||||
.collect(Collectors.joining(System.lineSeparator()));
|
||||
throw new InputOutputValidationException(combinedMessage);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
|
||||
|
||||
@@ -26,6 +26,7 @@ public record Label(
|
||||
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
|
||||
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
|
||||
public static final String TEST = SYSTEM_PREFIX + "test";
|
||||
public static final String FROM = SYSTEM_PREFIX + "from";
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a nested map.
|
||||
|
||||
@@ -16,6 +16,7 @@ import jakarta.validation.constraints.NotNull;
|
||||
public class Setting {
|
||||
public static final String INSTANCE_UUID = "instance.uuid";
|
||||
public static final String INSTANCE_VERSION = "instance.version";
|
||||
public static final String INSTANCE_EDITION = "instance.edition";
|
||||
|
||||
@NotNull
|
||||
private String key;
|
||||
|
||||
@@ -3,9 +3,7 @@ package io.kestra.core.models.executions;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
@@ -95,8 +93,16 @@ public class TaskRun implements TenantInterface {
|
||||
this.forceExecution
|
||||
);
|
||||
}
|
||||
public TaskRun withStateAndAttempt(State.Type state) {
|
||||
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
|
||||
|
||||
if (newAttempts.isEmpty()) {
|
||||
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
|
||||
} else {
|
||||
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
|
||||
newAttempts.set(newAttempts.size() - 1, updatedLast);
|
||||
}
|
||||
|
||||
public TaskRun replaceState(State newState) {
|
||||
return new TaskRun(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
@@ -106,9 +112,9 @@ public class TaskRun implements TenantInterface {
|
||||
this.taskId,
|
||||
this.parentTaskRunId,
|
||||
this.value,
|
||||
this.attempts,
|
||||
newAttempts,
|
||||
this.outputs,
|
||||
newState,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
/**
|
||||
* Interface for defining an identifiable and typed data.
|
||||
@@ -29,16 +27,4 @@ public interface Data {
|
||||
*/
|
||||
String getDisplayName();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
default ConstraintViolationException toConstraintViolationException(String message, Object value) {
|
||||
Class<Data> cls = (Class<Data>) this.getClass();
|
||||
|
||||
return ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid " + (this instanceof Output ? "output" : "input") + " for `" + getId() + "`, " + message + ", but received `" + value + "`",
|
||||
this,
|
||||
cls,
|
||||
this.getId(),
|
||||
value
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
@@ -130,7 +129,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
@Valid
|
||||
@PluginProperty
|
||||
List<SLA> sla;
|
||||
|
||||
|
||||
@Schema(
|
||||
title = "Conditions evaluated before the flow is executed.",
|
||||
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
|
||||
@@ -355,7 +354,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
* To be conservative a flow MUST not return any source.
|
||||
*/
|
||||
@Override
|
||||
@JsonIgnore
|
||||
@Schema(hidden = true)
|
||||
public String getSource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@@ -48,7 +46,7 @@ public class FlowWithSource extends Flow {
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore(value = false)
|
||||
@Schema(hidden = false)
|
||||
public String getSource() {
|
||||
return this.source;
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Represents an input along with its associated value and validation state.
|
||||
*
|
||||
@@ -12,15 +14,15 @@ import jakarta.validation.constraints.NotNull;
|
||||
* @param value The provided value for the input.
|
||||
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
|
||||
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
|
||||
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
|
||||
* @param exceptions The validation exceptions, if the input value is invalid; {@code null} otherwise.
|
||||
*/
|
||||
public record InputAndValue(
|
||||
Input<?> input,
|
||||
Object value,
|
||||
boolean enabled,
|
||||
boolean isDefault,
|
||||
ConstraintViolationException exception) {
|
||||
|
||||
Set<InputOutputValidationException> exceptions) {
|
||||
|
||||
/**
|
||||
* Creates a new {@link InputAndValue} instance.
|
||||
*
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.validations.Regex;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
@@ -14,10 +15,7 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
@SuperBuilder
|
||||
@@ -77,30 +75,35 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
@Override
|
||||
public void validate(List<String> inputs) throws ConstraintViolationException {
|
||||
Set<ConstraintViolation<?>> violations = new HashSet<>();
|
||||
|
||||
if (values != null && options != null) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
violations.add( ManualConstraintViolation.of(
|
||||
"you can't define both `values` and `options`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
""
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
if (!this.getAllowCustomValue()) {
|
||||
for (String input : inputs) {
|
||||
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
|
||||
if (!finalValues.contains(input)) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"it must match the values `" + finalValues + "`",
|
||||
violations.add(ManualConstraintViolation.of(
|
||||
"value `" + input + "` doesn't match the values `" + finalValues + "`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
input
|
||||
);
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!violations.isEmpty()) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(violations);
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@@ -145,7 +148,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -125,7 +125,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
SelectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextProperty;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -156,9 +157,9 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class)
|
||||
* @see RunContextProperty#as(Class)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return as(property, context, clazz, Map.of());
|
||||
@@ -167,25 +168,57 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
||||
* @see RunContextProperty#as(Class, Map)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.skipCache || property.value == null) {
|
||||
String rendered = context.render(property.expression, variables);
|
||||
property.value = MAPPER.convertValue(rendered, clazz);
|
||||
property.value = deserialize(rendered, clazz);
|
||||
}
|
||||
|
||||
return property.value;
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, clazz);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, clazz);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, JavaType type) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, type);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, type);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
|
||||
* @see RunContextProperty#asList(Class)
|
||||
*/
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
return asList(property, context, itemClazz, Map.of());
|
||||
@@ -194,37 +227,39 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
||||
* @see RunContextProperty#asList(Class, Map)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.skipCache || property.value == null) {
|
||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||
try {
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = MAPPER.readValue(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
if (item instanceof String str) {
|
||||
return MAPPER.convertValue(context.render(str, variables), itemClazz);
|
||||
} else if (item instanceof Map map) {
|
||||
return MAPPER.convertValue(context.render(map, variables), itemClazz);
|
||||
}
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = deserialize(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = deserialize(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
Object rendered = null;
|
||||
if (item instanceof String str) {
|
||||
rendered = context.render(str, variables);
|
||||
} else if (item instanceof Map map) {
|
||||
rendered = context.render(map, variables);
|
||||
}
|
||||
|
||||
if (rendered != null) {
|
||||
return deserialize(rendered, itemClazz);
|
||||
}
|
||||
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,9 +269,9 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property then convert it as a map of target types.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
|
||||
* @see RunContextProperty#asMap(Class, Class)
|
||||
*/
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
return asMap(property, runContext, keyClass, valueClass, Map.of());
|
||||
@@ -248,7 +283,7 @@ public class Property<T> {
|
||||
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
|
||||
* @see RunContextProperty#asMap(Class, Class, Map)
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
@@ -260,12 +295,12 @@ public class Property<T> {
|
||||
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a map.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(runContext.render(property.expression, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(property.expression, variables), targetMapType);
|
||||
}
|
||||
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
|
||||
else {
|
||||
Map asRawMap = MAPPER.readValue(property.expression, Map.class);
|
||||
property.value = MAPPER.convertValue(runContext.render(asRawMap, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(asRawMap, variables), targetMapType);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
|
||||
@@ -82,6 +82,12 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean failOnTriggerError = false;
|
||||
|
||||
@PluginProperty(group = PluginProperty.CORE_GROUP)
|
||||
@Schema(
|
||||
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
|
||||
)
|
||||
private boolean allowConcurrent = false;
|
||||
|
||||
/**
|
||||
* For backward compatibility: we rename minLogLevel to logLevel.
|
||||
* @deprecated use {@link #logLevel} instead
|
||||
|
||||
@@ -1,22 +1,37 @@
|
||||
package io.kestra.core.models.triggers;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
public interface Schedulable extends PollingTriggerInterface{
|
||||
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
Map<String, Object> getInputs();
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
/**
|
||||
* Compute the previous evaluation of a trigger.
|
||||
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
|
||||
*/
|
||||
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
|
||||
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
|
||||
/**
|
||||
* Load the default RecoverMissedSchedules from plugin property, or else ALL.
|
||||
*/
|
||||
|
||||
@@ -172,7 +172,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
|
||||
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
|
||||
try {
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.empty());
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, lastTrigger);
|
||||
} catch (InvalidTriggerConfigurationException e) {
|
||||
disabled = true;
|
||||
}
|
||||
|
||||
@@ -6,12 +6,9 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
|
||||
public abstract class TriggerService {
|
||||
@@ -51,49 +48,6 @@ public abstract class TriggerService {
|
||||
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
|
||||
}
|
||||
|
||||
public static Execution generateScheduledExecution(
|
||||
AbstractTrigger trigger,
|
||||
ConditionContext conditionContext,
|
||||
TriggerContext context,
|
||||
List<Label> labels,
|
||||
Map<String, Object> inputs,
|
||||
Map<String, Object> variables,
|
||||
Optional<ZonedDateTime> scheduleDate
|
||||
) {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
|
||||
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
||||
}
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(context.getTenantId())
|
||||
.namespace(context.getNamespace())
|
||||
.flowId(context.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.variables(conditionContext.getFlow().getVariables())
|
||||
.labels(executionLabels)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
|
||||
.build();
|
||||
|
||||
Map<String, Object> allInputs = new HashMap<>();
|
||||
|
||||
if (inputs != null) {
|
||||
allInputs.putAll(inputs);
|
||||
}
|
||||
|
||||
// add inputs and inject defaults (FlowInputOutput handles defaults internally)
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
private static Execution generateExecution(
|
||||
String id,
|
||||
AbstractTrigger trigger,
|
||||
@@ -102,6 +56,7 @@ public abstract class TriggerService {
|
||||
ConditionContext conditionContext
|
||||
) {
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
|
||||
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, id));
|
||||
|
||||
@@ -67,6 +67,11 @@ public class ManualConstraintViolation<T> implements ConstraintViolation<T> {
|
||||
invalidValue
|
||||
)));
|
||||
}
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(
|
||||
Set<? extends ConstraintViolation<?>> constraintViolations
|
||||
) {
|
||||
return new ConstraintViolationException(constraintViolations);
|
||||
}
|
||||
|
||||
public String getMessageTemplate() {
|
||||
return "{messageTemplate}";
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
public interface SettingRepositoryInterface {
|
||||
Optional<Setting> findByKey(String key);
|
||||
@@ -13,5 +13,7 @@ public interface SettingRepositoryInterface {
|
||||
|
||||
Setting save(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting internalSave(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting delete(Setting setting);
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@ import java.util.function.Function;
|
||||
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
|
||||
Optional<Trigger> findLast(TriggerContext trigger);
|
||||
|
||||
Optional<Trigger> findByExecution(Execution execution);
|
||||
|
||||
Optional<Trigger> findByUid(String uid);
|
||||
|
||||
List<Trigger> findAll(String tenantId);
|
||||
|
||||
List<Trigger> findAllForAllTenants();
|
||||
|
||||
@@ -599,6 +599,11 @@ public class DefaultRunContext extends RunContext {
|
||||
return localPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputAndOutput inputAndOutput() {
|
||||
return new InputAndOutputImpl(this.applicationContext, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder class for constructing new {@link DefaultRunContext} objects.
|
||||
*/
|
||||
|
||||
@@ -189,12 +189,11 @@ public final class ExecutableUtils {
|
||||
variables.put("taskRunIteration", currentTaskRun.getIteration());
|
||||
}
|
||||
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
|
||||
Execution execution = Execution
|
||||
.newExecution(
|
||||
flow,
|
||||
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
|
||||
(f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs),
|
||||
newLabels,
|
||||
Optional.empty())
|
||||
.withTrigger(ExecutionTrigger.builder()
|
||||
|
||||
@@ -3,13 +3,13 @@ package io.kestra.core.runners;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.flows.RenderableInput;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
@@ -19,7 +19,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.property.URIFetcher;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -209,8 +208,8 @@ public class FlowInputOutput {
|
||||
.filter(InputAndValue::enabled)
|
||||
.map(it -> {
|
||||
//TODO check to return all exception at-once.
|
||||
if (it.exception() != null) {
|
||||
throw it.exception();
|
||||
if (it.exceptions() != null && !it.exceptions().isEmpty()) {
|
||||
throw InputOutputValidationException.merge(it.exceptions());
|
||||
}
|
||||
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
|
||||
})
|
||||
@@ -294,13 +293,9 @@ public class FlowInputOutput {
|
||||
try {
|
||||
isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get()));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
resolvable.resolveWithError(ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid condition: " + e.getMessage(),
|
||||
input,
|
||||
(Class<Input>)input.getClass(),
|
||||
input.getId(),
|
||||
this
|
||||
));
|
||||
resolvable.resolveWithError(
|
||||
InputOutputValidationException.of("Invalid condition: " + e.getMessage())
|
||||
);
|
||||
isInputEnabled = false;
|
||||
}
|
||||
}
|
||||
@@ -333,7 +328,7 @@ public class FlowInputOutput {
|
||||
// validate and parse input value
|
||||
if (value == null) {
|
||||
if (input.getRequired()) {
|
||||
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
|
||||
resolvable.resolveWithError(InputOutputValidationException.of("Missing required input:" + input.getId()));
|
||||
} else {
|
||||
resolvable.resolveWithValue(null);
|
||||
}
|
||||
@@ -343,17 +338,18 @@ public class FlowInputOutput {
|
||||
parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue()));
|
||||
parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue()));
|
||||
} catch (ConstraintViolationException e) {
|
||||
ConstraintViolationException exception = e.getConstraintViolations().size() == 1 ?
|
||||
input.toConstraintViolationException(List.copyOf(e.getConstraintViolations()).getFirst().getMessage(), value) :
|
||||
input.toConstraintViolationException(e.getMessage(), value);
|
||||
resolvable.resolveWithError(exception);
|
||||
Input<?> finalInput = input;
|
||||
Set<InputOutputValidationException> exceptions = e.getConstraintViolations().stream()
|
||||
.map(c-> InputOutputValidationException.of(c.getMessage(), finalInput))
|
||||
.collect(Collectors.toSet());
|
||||
resolvable.resolveWithError(exceptions);
|
||||
}
|
||||
}
|
||||
} catch (ConstraintViolationException e) {
|
||||
resolvable.resolveWithError(e);
|
||||
} catch (Exception e) {
|
||||
ConstraintViolationException exception = input.toConstraintViolationException(e instanceof IllegalArgumentException ? e.getMessage() : e.toString(), resolvable.get().value());
|
||||
resolvable.resolveWithError(exception);
|
||||
} catch (IllegalArgumentException e){
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage(), input));
|
||||
}
|
||||
catch (Exception e) {
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage()));
|
||||
}
|
||||
|
||||
return resolvable.get();
|
||||
@@ -441,8 +437,12 @@ public class FlowInputOutput {
|
||||
}
|
||||
return entry;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw output.toConstraintViolationException(e.getMessage(), current);
|
||||
}
|
||||
catch (IllegalArgumentException e){
|
||||
throw InputOutputValidationException.of(e.getMessage(), output);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw InputOutputValidationException.of(e.getMessage());
|
||||
}
|
||||
})
|
||||
.filter(Optional::isPresent)
|
||||
@@ -505,7 +505,7 @@ public class FlowInputOutput {
|
||||
if (matcher.matches()) {
|
||||
yield current.toString();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Expected `URI` but received `" + current + "`");
|
||||
throw new IllegalArgumentException("Invalid URI format.");
|
||||
}
|
||||
}
|
||||
case ARRAY, MULTISELECT -> {
|
||||
@@ -535,34 +535,10 @@ public class FlowInputOutput {
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw e;
|
||||
} catch (Throwable e) {
|
||||
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
|
||||
throw new Exception(" errors:\n```\n" + e.getMessage() + "\n```");
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutable wrapper to hold a flow's input, and it's resolved value.
|
||||
*/
|
||||
@@ -591,27 +567,30 @@ public class FlowInputOutput {
|
||||
}
|
||||
|
||||
public void isDefault(boolean isDefault) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exceptions());
|
||||
}
|
||||
|
||||
public void setInput(final Input<?> input) {
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
}
|
||||
|
||||
public void resolveWithEnabled(boolean enabled) {
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithValue(@Nullable Object value) {
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithError(@Nullable ConstraintViolationException exception) {
|
||||
public void resolveWithError(@Nullable Set<InputOutputValidationException> exception) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
|
||||
markAsResolved();
|
||||
}
|
||||
private void resolveWithError(@Nullable InputOutputValidationException exception){
|
||||
resolveWithError(Collections.singleton(exception));
|
||||
}
|
||||
|
||||
private void markAsResolved() {
|
||||
this.isResolved = true;
|
||||
|
||||
@@ -5,6 +5,8 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.micronaut.context.annotation.Bean;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
@@ -15,7 +17,6 @@ import io.kestra.core.services.FlowListenersInterface;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@@ -26,15 +27,14 @@ import jakarta.inject.Singleton;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class FlowListeners implements FlowListenersInterface {
|
||||
|
||||
private final AtomicBoolean isStarted = new AtomicBoolean(false);
|
||||
private final QueueInterface<FlowInterface> flowQueue;
|
||||
private final List<FlowWithSource> flows;
|
||||
private final List<Consumer<List<FlowWithSource>>> consumers = new ArrayList<>();
|
||||
private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new ArrayList<>();
|
||||
|
||||
private final PluginDefaultService pluginDefaultService;
|
||||
|
||||
private Runnable queueListenerCancellation;
|
||||
|
||||
@Inject
|
||||
public FlowListeners(
|
||||
FlowRepositoryInterface flowRepository,
|
||||
@@ -49,8 +49,9 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (this) {
|
||||
if (this.isStarted.compareAndSet(false, true)) {
|
||||
this.flowQueue.receive(either -> {
|
||||
if (queueListenerCancellation == null) {
|
||||
log.info("STARTING FLOW LISTENER: {}", this);
|
||||
queueListenerCancellation = this.flowQueue.receive(either -> {
|
||||
FlowWithSource flow;
|
||||
if (either.isRight()) {
|
||||
flow = FlowWithException.from(either.getRight().getRecord(), either.getRight(), log).orElse(null);
|
||||
@@ -154,4 +155,18 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
// we forced a deep clone to avoid concurrency where instance are changed during iteration (especially scheduler).
|
||||
return new ArrayList<>(this.flows);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
synchronized (this) {
|
||||
boolean b = queueListenerCancellation != null;
|
||||
log.info("THREAD STACKTRACE: {}", (Object) Thread.currentThread().getStackTrace());
|
||||
log.info("LISTENER NOT NULL : {}", b);
|
||||
log.info("LISTENER THIS : {}", this);
|
||||
if (b) {
|
||||
queueListenerCancellation.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* InputAndOutput could be used to work with flow execution inputs and outputs.
|
||||
*/
|
||||
public interface InputAndOutput {
|
||||
/**
|
||||
* Reads the inputs of a flow execution.
|
||||
*/
|
||||
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
|
||||
|
||||
/**
|
||||
* Processes the outputs of a flow execution (parse them based on their types).
|
||||
*/
|
||||
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
|
||||
|
||||
/**
|
||||
* Render flow execution outputs.
|
||||
*/
|
||||
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
class InputAndOutputImpl implements InputAndOutput {
|
||||
private final FlowInputOutput flowInputOutput;
|
||||
private final RunContext runContext;
|
||||
|
||||
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
|
||||
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
|
||||
this.runContext = runContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
|
||||
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
|
||||
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
}
|
||||
@@ -211,4 +211,9 @@ public abstract class RunContext implements PropertyContext {
|
||||
* @return a new run context with the plugin configuration of the given plugin.
|
||||
*/
|
||||
public abstract RunContext cloneForPlugin(Plugin plugin);
|
||||
|
||||
/**
|
||||
* @return an InputAndOutput that can be used to work with inputs and outputs.
|
||||
*/
|
||||
public abstract InputAndOutput inputAndOutput();
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.micronaut.core.annotation.Nullable;
|
||||
import io.pebbletemplates.pebble.PebbleEngine;
|
||||
import io.pebbletemplates.pebble.extension.Extension;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.lexer.Syntax;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -37,6 +38,13 @@ public class PebbleEngineFactory {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithCustomSyntax(Syntax syntax, Class<? extends Extension> extension) {
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder()
|
||||
.syntax(syntax);
|
||||
this.applicationContext.getBeansOfType(extension).forEach(builder::extension);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
|
||||
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
|
||||
@@ -35,6 +35,10 @@ public final class YamlParser {
|
||||
return read(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(String input, Class<T> cls, Boolean strict) {
|
||||
return strict ? read(input, cls, type(cls)) : readNonStrict(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(Map<String, Object> input, Class<T> cls, Boolean strict) {
|
||||
ObjectMapper currentMapper = strict ? STRICT_MAPPER : NON_STRICT_MAPPER;
|
||||
|
||||
@@ -81,7 +85,31 @@ public final class YamlParser {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T readNonStrict(String input, Class<T> objectClass, String resource) {
|
||||
try {
|
||||
return NON_STRICT_MAPPER.readValue(input, objectClass);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
private static String formatYamlErrorMessage(String originalMessage, JsonProcessingException e) {
|
||||
StringBuilder friendlyMessage = new StringBuilder();
|
||||
if (originalMessage.contains("Expected a field name")) {
|
||||
friendlyMessage.append("YAML syntax error: Invalid structure. Check indentation and ensure all fields are properly formatted.");
|
||||
} else if (originalMessage.contains("MappingStartEvent")) {
|
||||
friendlyMessage.append("YAML syntax error: Unexpected mapping start. Verify that scalar values are properly quoted if needed.");
|
||||
} else if (originalMessage.contains("Scalar value")) {
|
||||
friendlyMessage.append("YAML syntax error: Expected a simple value but found complex structure. Check for unquoted special characters.");
|
||||
} else {
|
||||
friendlyMessage.append("YAML parsing error: ").append(originalMessage.replaceAll("org\\.yaml\\.snakeyaml.*", "").trim());
|
||||
}
|
||||
if (e.getLocation() != null) {
|
||||
int line = e.getLocation().getLineNr();
|
||||
friendlyMessage.append(String.format(" (at line %d)", line));
|
||||
}
|
||||
// Return a generic but cleaner message for other YAML errors
|
||||
return friendlyMessage.toString();
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
|
||||
@@ -121,11 +149,12 @@ public final class YamlParser {
|
||||
)
|
||||
));
|
||||
} else {
|
||||
String userFriendlyMessage = formatYamlErrorMessage(e.getMessage(), e);
|
||||
return new ConstraintViolationException(
|
||||
"Illegal " + resource + " source: " + e.getMessage(),
|
||||
"Illegal " + resource + " source: " + userFriendlyMessage,
|
||||
Collections.singleton(
|
||||
ManualConstraintViolation.of(
|
||||
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(),
|
||||
userFriendlyMessage,
|
||||
target,
|
||||
(Class<T>) target.getClass(),
|
||||
"yaml",
|
||||
@@ -136,4 +165,3 @@ public final class YamlParser {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.cronutils.utils.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
@@ -65,16 +64,6 @@ public class ConditionService {
|
||||
return this.valid(flow, conditions, conditionContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
*/
|
||||
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
|
||||
return conditions
|
||||
.stream()
|
||||
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
|
||||
@@ -754,7 +754,7 @@ public class ExecutionService {
|
||||
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
|
||||
Execution newExecution = execution;
|
||||
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED));
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withStateAndAttempt(State.Type.KILLED));
|
||||
}
|
||||
if (parentTaskRun.getParentTaskRunId() != null) {
|
||||
return killParentTaskruns(parentTaskRun, newExecution);
|
||||
|
||||
@@ -6,7 +6,7 @@ import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface FlowListenersInterface {
|
||||
public interface FlowListenersInterface extends AutoCloseable {
|
||||
void run();
|
||||
|
||||
void listen(Consumer<List<FlowWithSource>> consumer);
|
||||
|
||||
@@ -92,7 +92,14 @@ public class FlowService {
|
||||
return flowRepository
|
||||
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
|
||||
}
|
||||
|
||||
private static String formatValidationError(String message) {
|
||||
if (message.startsWith("Illegal flow source:")) {
|
||||
// Already formatted by YamlParser, return as-is
|
||||
return message;
|
||||
}
|
||||
// For other validation errors, provide context
|
||||
return "Validation error: " + message;
|
||||
}
|
||||
/**
|
||||
* Evaluates all checks defined in the given flow using the provided inputs.
|
||||
* <p>
|
||||
@@ -174,10 +181,12 @@ public class FlowService {
|
||||
modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false));
|
||||
|
||||
} catch (ConstraintViolationException e) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
String friendlyMessage = formatValidationError(e.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} catch (FlowProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
if (e.getCause() instanceof ConstraintViolationException cve) {
|
||||
String friendlyMessage = formatValidationError(cve.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} else {
|
||||
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
||||
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
|
||||
@@ -579,4 +588,4 @@ public class FlowService {
|
||||
private IllegalStateException noRepositoryException() {
|
||||
return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
@@ -272,7 +271,13 @@ public class InternalStorage implements Storage {
|
||||
return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<StorageContext.Task> getTaskStorageContext() {
|
||||
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FileAttributes> list(URI uri) throws IOException {
|
||||
return this.storage.list(context.getTenantId(), context.getNamespace(), uri);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,4 +173,6 @@ public interface Storage {
|
||||
* @return the task storage context
|
||||
*/
|
||||
Optional<StorageContext.Task> getTaskStorageContext();
|
||||
|
||||
List<FileAttributes> list(URI uri) throws IOException;
|
||||
}
|
||||
|
||||
@@ -1,13 +1,39 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
public class EditionProvider {
|
||||
public Edition get() {
|
||||
return Edition.OSS;
|
||||
}
|
||||
|
||||
@Inject
|
||||
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
|
||||
|
||||
@PostConstruct
|
||||
void start() {
|
||||
// check the edition in the settings and update if needed, we didn't use it would allow us to detect incompatible update later if needed
|
||||
settingRepository.ifPresent(settingRepositoryInterface -> persistEdition(settingRepositoryInterface, get()));
|
||||
}
|
||||
|
||||
private void persistEdition(SettingRepositoryInterface settingRepositoryInterface, Edition edition) {
|
||||
Optional<Setting> versionSetting = settingRepositoryInterface.findByKey(Setting.INSTANCE_EDITION);
|
||||
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(edition)) {
|
||||
settingRepositoryInterface.save(Setting.builder()
|
||||
.key(Setting.INSTANCE_EDITION)
|
||||
.value(edition)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public enum Edition {
|
||||
OSS,
|
||||
EE
|
||||
|
||||
@@ -70,4 +70,12 @@ public class ListUtils {
|
||||
.map(Object::toString)
|
||||
.toList();
|
||||
}
|
||||
|
||||
public static <T> List<List<T>> partition(List<T> list, int size) {
|
||||
List<List<T>> parts = new ArrayList<>();
|
||||
for (int i = 0; i < list.size(); i += size) {
|
||||
parts.add(list.subList(i, Math.min(i + size, list.size())));
|
||||
}
|
||||
return parts;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,21 +11,17 @@ public final class ThreadUncaughtExceptionHandler implements UncaughtExceptionHa
|
||||
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
boolean isTest = KestraContext.getContext().getEnvironments().contains("test");
|
||||
|
||||
try {
|
||||
// cannot use FormattingLogger due to a dependency loop
|
||||
log.error("Caught an exception in {}. {}", t, isTest ? "Keeping it running for test." : "Shutting down.", e);
|
||||
log.error("Caught an exception in {}. Shutting down.", t, e);
|
||||
} catch (Throwable errorInLogging) {
|
||||
// If logging fails, e.g. due to missing memory, at least try to log the
|
||||
// message and the cause for the failed logging.
|
||||
System.err.println(e.getMessage());
|
||||
System.err.println(errorInLogging.getMessage());
|
||||
} finally {
|
||||
if (!isTest) {
|
||||
KestraContext.getContext().shutdown();
|
||||
Runtime.getRuntime().exit(1);
|
||||
}
|
||||
KestraContext.getContext().shutdown();
|
||||
Runtime.getRuntime().exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
import io.kestra.core.services.StorageService;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.StorageSplitInterface;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.validations.NoSystemLabelValidation;
|
||||
@@ -540,7 +539,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
|
||||
|
||||
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
|
||||
FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext));
|
||||
FileSerde.write(bos, runContext.inputAndOutput().renderOutputs(flow.getOutputs()));
|
||||
URI uri = runContext.storage().putFile(
|
||||
new ByteArrayInputStream(bos.toByteArray()),
|
||||
URI.create((String) taskRun.getOutputs().get("uri"))
|
||||
@@ -602,9 +601,8 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
String subflowOutputsBase = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);
|
||||
URI subflowOutputsBaseUri = URI.create(StorageContext.KESTRA_PROTOCOL + subflowOutputsBase + "/");
|
||||
|
||||
StorageInterface storage = ((DefaultRunContext) runContext).getApplicationContext().getBean(StorageInterface.class);
|
||||
if (storage.exists(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri)) {
|
||||
List<FileAttributes> list = storage.list(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri);
|
||||
if (runContext.storage().isFileExist(subflowOutputsBaseUri)) {
|
||||
List<FileAttributes> list = runContext.storage().list(subflowOutputsBaseUri);;
|
||||
|
||||
if (!list.isEmpty()) {
|
||||
// Merge outputs from each sub-flow into a single stored in the internal storage.
|
||||
|
||||
@@ -63,7 +63,8 @@ import java.util.*;
|
||||
|
||||
- id: run_post_approval
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
runner: PROCESS
|
||||
taskRunner:
|
||||
type: io.kestra.plugin.core.runner.Process
|
||||
commands:
|
||||
- echo "Manual approval received! Continuing the execution..."
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.ExecutableUtils;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.SubflowExecution;
|
||||
@@ -38,7 +37,6 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Collections;
|
||||
@@ -246,11 +244,11 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
|
||||
if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
|
||||
try {
|
||||
Map<String, Object> rOutputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext);
|
||||
var inputAndOutput = runContext.inputAndOutput();
|
||||
Map<String, Object> rOutputs = inputAndOutput.renderOutputs(subflowOutputs);
|
||||
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
rOutputs = flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
||||
if (flow.getOutputs() != null) {
|
||||
rOutputs = inputAndOutput.typedOutputs(flow, execution, rOutputs);
|
||||
}
|
||||
builder.outputs(rOutputs);
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -2,10 +2,8 @@ package io.kestra.plugin.core.namespace;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.plugin.core.kv.Version;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@@ -0,0 +1,107 @@
|
||||
package io.kestra.plugin.core.trigger;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.Backfill;
|
||||
import io.kestra.core.models.triggers.Schedulable;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.chrono.ChronoZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Factory class for constructing a new {@link Execution} from a {@link Schedulable} trigger.
|
||||
*
|
||||
* @see io.kestra.plugin.core.trigger.Schedule
|
||||
* @see io.kestra.plugin.core.trigger.ScheduleOnDates
|
||||
*/
|
||||
final class SchedulableExecutionFactory {
|
||||
|
||||
static Execution createFailedExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext) throws IllegalVariableEvaluationException {
|
||||
return Execution.builder()
|
||||
.id(conditionContext.getRunContext().getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.labels(SchedulableExecutionFactory.getLabels(trigger, conditionContext.getRunContext(), triggerContext.getBackfill(), conditionContext.getFlow()))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
}
|
||||
|
||||
static Execution createExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext, Map<String, Object> variables, ZonedDateTime scheduleDate) throws IllegalVariableEvaluationException {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of((AbstractTrigger) trigger, variables);
|
||||
|
||||
List<Label> labels = getLabels(trigger, runContext, triggerContext.getBackfill(), conditionContext.getFlow());
|
||||
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
||||
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
||||
}
|
||||
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.variables(conditionContext.getFlow().getVariables())
|
||||
.labels(executionLabels)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
.scheduleDate(Optional.ofNullable(scheduleDate).map(ChronoZonedDateTime::toInstant).orElse(null))
|
||||
.build();
|
||||
|
||||
Map<String, Object> allInputs = getInputs(trigger, runContext, triggerContext.getBackfill());
|
||||
|
||||
// add inputs and inject defaults (FlowInputOutput handles defaults internally)
|
||||
execution = execution.withInputs(runContext.inputAndOutput().readInputs(conditionContext.getFlow(), execution, allInputs));
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
private static Map<String, Object> getInputs(Schedulable trigger, RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
Map<String, Object> inputs = new HashMap<>();
|
||||
|
||||
if (trigger.getInputs() != null) {
|
||||
inputs.putAll(runContext.render(trigger.getInputs()));
|
||||
}
|
||||
|
||||
if (backfill != null && backfill.getInputs() != null) {
|
||||
inputs.putAll(runContext.render(backfill.getInputs()));
|
||||
}
|
||||
|
||||
return inputs;
|
||||
}
|
||||
|
||||
private static List<Label> getLabels(Schedulable trigger, RunContext runContext, Backfill backfill, FlowInterface flow) throws IllegalVariableEvaluationException {
|
||||
List<Label> labels = LabelService.fromTrigger(runContext, flow, (AbstractTrigger) trigger);
|
||||
|
||||
if (backfill != null && backfill.getLabels() != null) {
|
||||
for (Label label : backfill.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
return labels;
|
||||
}
|
||||
}
|
||||
@@ -6,9 +6,7 @@ import com.cronutils.model.time.ExecutionTime;
|
||||
import com.cronutils.parser.CronParser;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
@@ -16,12 +14,8 @@ import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.*;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.ConditionService;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.validations.ScheduleValidation;
|
||||
import io.kestra.core.validations.TimezoneId;
|
||||
@@ -29,6 +23,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Null;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -40,6 +35,8 @@ import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
|
||||
@Slf4j
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@@ -224,11 +221,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private List<ScheduleCondition> scheduleConditions;
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
|
||||
private Map<String, Object> inputs;
|
||||
|
||||
@Schema(
|
||||
@@ -248,13 +241,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private Map<String, Object> backfill;
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
|
||||
private RecoverMissedSchedules recoverMissedSchedules;
|
||||
|
||||
@Override
|
||||
@@ -403,20 +390,11 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
if (!conditionResults) {
|
||||
return Optional.empty();
|
||||
}
|
||||
} catch(InternalException ie) {
|
||||
} catch (InternalException ie) {
|
||||
// validate schedule condition can fail to render variables
|
||||
// in this case, we return a failed execution so the trigger is not evaluated each second
|
||||
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.labels(generateLabels(runContext, conditionContext, backfill))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
return Optional.of(execution);
|
||||
return Optional.of(SchedulableExecutionFactory.createFailedExecution(this, conditionContext, triggerContext));
|
||||
}
|
||||
|
||||
// recalculate true output for previous and next based on conditions
|
||||
@@ -430,14 +408,12 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
variables = scheduleDates.toMap();
|
||||
}
|
||||
|
||||
Execution execution = TriggerService.generateScheduledExecution(
|
||||
Execution execution = SchedulableExecutionFactory.createExecution(
|
||||
this,
|
||||
conditionContext,
|
||||
triggerContext,
|
||||
generateLabels(runContext, conditionContext, backfill),
|
||||
generateInputs(runContext, backfill),
|
||||
variables,
|
||||
Optional.empty()
|
||||
null
|
||||
);
|
||||
|
||||
return Optional.of(execution);
|
||||
@@ -448,34 +424,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
return parser.parse(this.cron);
|
||||
}
|
||||
|
||||
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
List<Label> labels = LabelService.fromTrigger(runContext, conditionContext.getFlow(), this);
|
||||
|
||||
if (backfill != null && backfill.getLabels() != null) {
|
||||
for (Label label : backfill.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return labels;
|
||||
}
|
||||
|
||||
private Map<String, Object> generateInputs(RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
Map<String, Object> inputs = new HashMap<>();
|
||||
|
||||
if (this.inputs != null) {
|
||||
inputs.putAll(runContext.render(this.inputs));
|
||||
}
|
||||
|
||||
if (backfill != null && backfill.getInputs() != null) {
|
||||
inputs.putAll(runContext.render(backfill.getInputs()));
|
||||
}
|
||||
|
||||
return inputs;
|
||||
}
|
||||
private Optional<Output> scheduleDates(ExecutionTime executionTime, ZonedDateTime date) {
|
||||
Optional<ZonedDateTime> next = executionTime.nextExecution(date.minus(Duration.ofSeconds(1)));
|
||||
|
||||
@@ -549,9 +497,9 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, ZonedDateTime toTestDate, boolean next) throws InternalException {
|
||||
int upperYearBound = ZonedDateTime.now().getYear() + 10;
|
||||
int lowerYearBound = ZonedDateTime.now().getYear() - 10;
|
||||
|
||||
|
||||
while ((next && toTestDate.getYear() < upperYearBound) || (!next && toTestDate.getYear() > lowerYearBound)) {
|
||||
|
||||
|
||||
Optional<ZonedDateTime> currentDate = next ?
|
||||
executionTime.nextExecution(toTestDate) :
|
||||
executionTime.lastExecution(toTestDate);
|
||||
@@ -607,11 +555,10 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
||||
|
||||
private boolean validateScheduleCondition(ConditionContext conditionContext) throws InternalException {
|
||||
if (conditions != null) {
|
||||
ConditionService conditionService = ((DefaultRunContext)conditionContext.getRunContext()).getApplicationContext().getBean(ConditionService.class);
|
||||
return conditionService.isValid(
|
||||
conditions.stream().filter(c -> c instanceof ScheduleCondition).map(c -> (ScheduleCondition) c).toList(),
|
||||
conditionContext
|
||||
);
|
||||
return conditions.stream()
|
||||
.filter(c -> c instanceof ScheduleCondition)
|
||||
.map(c -> (ScheduleCondition) c)
|
||||
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
@@ -10,7 +10,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.VoidOutput;
|
||||
import io.kestra.core.models.triggers.*;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.validations.TimezoneId;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -23,7 +22,10 @@ import java.time.Duration;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@@ -45,11 +47,7 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
@Builder.Default
|
||||
@Null
|
||||
private final Duration interval = null;
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
|
||||
private Map<String, Object> inputs;
|
||||
|
||||
@TimezoneId
|
||||
@@ -63,31 +61,24 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
@NotNull
|
||||
private Property<List<ZonedDateTime>> dates;
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
private RecoverMissedSchedules recoverMissedSchedules;
|
||||
|
||||
@Override
|
||||
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext triggerContext) throws Exception {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
|
||||
ZonedDateTime lastEvaluation = triggerContext.getDate();
|
||||
Optional<ZonedDateTime> nextDate = nextDate(runContext, date -> date.isEqual(lastEvaluation) || date.isAfter(lastEvaluation));
|
||||
|
||||
if (nextDate.isPresent()) {
|
||||
log.info("Schedule execution on {}", nextDate.get());
|
||||
|
||||
Execution execution = TriggerService.generateScheduledExecution(
|
||||
Execution execution = SchedulableExecutionFactory.createExecution(
|
||||
this,
|
||||
conditionContext,
|
||||
triggerContext,
|
||||
LabelService.fromTrigger(runContext, conditionContext.getFlow(), this),
|
||||
this.inputs != null ? runContext.render(this.inputs) : Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
nextDate
|
||||
nextDate.orElse(null)
|
||||
);
|
||||
|
||||
return Optional.of(execution);
|
||||
@@ -97,29 +88,21 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
|
||||
try {
|
||||
return last
|
||||
.map(throwFunction(context ->
|
||||
nextDate(conditionContext.getRunContext(), date -> date.isAfter(context.getDate()))
|
||||
.orElse(ZonedDateTime.now().plusYears(1))
|
||||
))
|
||||
.orElse(conditionContext.getRunContext()
|
||||
.render(dates)
|
||||
.asList(ZonedDateTime.class)
|
||||
.stream()
|
||||
.sorted()
|
||||
.findFirst()
|
||||
.orElse(ZonedDateTime.now()))
|
||||
.truncatedTo(ChronoUnit.SECONDS);
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
|
||||
return ZonedDateTime.now().plusYears(1);
|
||||
}
|
||||
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> triggerContext) {
|
||||
return triggerContext
|
||||
.map(ctx -> ctx.getBackfill() != null ? ctx.getBackfill().getCurrentDate() : ctx.getDate())
|
||||
.map(this::withTimeZone)
|
||||
.or(() -> Optional.of(ZonedDateTime.now()))
|
||||
.flatMap(dt -> {
|
||||
try {
|
||||
return nextDate(conditionContext.getRunContext(), date -> date.isAfter(dt));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
|
||||
throw new InvalidTriggerConfigurationException("Failed to evaluate schedule 'dates'. Cause: " + e.getMessage());
|
||||
}
|
||||
}).orElseGet(() -> ZonedDateTime.now().plusYears(1));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public ZonedDateTime nextEvaluationDate() {
|
||||
// TODO this may be the next date from now?
|
||||
@@ -139,9 +122,17 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
||||
return previousDates.isEmpty() ? ZonedDateTime.now() : previousDates.getFirst();
|
||||
}
|
||||
|
||||
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> filter) throws IllegalVariableEvaluationException {
|
||||
return runContext.render(dates).asList(ZonedDateTime.class).stream().sorted()
|
||||
.filter(date -> filter.test(date))
|
||||
private ZonedDateTime withTimeZone(ZonedDateTime date) {
|
||||
if (this.timezone == null) {
|
||||
return date;
|
||||
}
|
||||
return date.withZoneSameInstant(ZoneId.of(this.timezone));
|
||||
}
|
||||
|
||||
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> predicate) throws IllegalVariableEvaluationException {
|
||||
return runContext.render(dates)
|
||||
.asList(ZonedDateTime.class).stream().sorted()
|
||||
.filter(predicate)
|
||||
.map(throwFunction(date -> timezone == null ? date : date.withZoneSameInstant(ZoneId.of(runContext.render(timezone)))))
|
||||
.findFirst()
|
||||
.map(date -> date.truncatedTo(ChronoUnit.SECONDS));
|
||||
|
||||
@@ -1,16 +1,14 @@
|
||||
package io.kestra.core.contexts;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class KestraContextTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
package io.kestra.core.contexts;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -10,7 +9,7 @@ import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest(environments = "maven")
|
||||
@MicronautTest(environments = "maven")
|
||||
class MavenPluginRepositoryConfigTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -10,7 +10,7 @@ import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.plugin.core.flow.Dag;
|
||||
import io.kestra.plugin.core.flow.Subflow;
|
||||
import io.kestra.plugin.core.state.Set;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
class DocumentationGeneratorTest {
|
||||
@Inject
|
||||
|
||||
@@ -170,10 +170,11 @@ class JsonSchemaGeneratorTest {
|
||||
|
||||
Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class);
|
||||
assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf(
|
||||
Matchers.aMapWithSize(3),
|
||||
Matchers.aMapWithSize(4),
|
||||
hasKey("conditions"),
|
||||
hasKey("stopAfter"),
|
||||
hasKey("type")
|
||||
hasKey("type"),
|
||||
hasKey("allowConcurrent")
|
||||
));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.models;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -12,7 +13,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class LabelTest {
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@@ -1,14 +1,20 @@
|
||||
package io.kestra.core.models.property;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.serializers.FileSerde;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.event.Level;
|
||||
import reactor.core.publisher.Flux;
|
||||
@@ -19,13 +25,14 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static java.util.Map.entry;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class PropertyTest {
|
||||
|
||||
@Inject
|
||||
@@ -362,10 +369,43 @@ class PropertyTest {
|
||||
assertThat(output.getMessages().getFirst().getValue()).isEqualTo("value1");
|
||||
}
|
||||
|
||||
@Test
|
||||
void jsonSubtype() throws JsonProcessingException, IllegalVariableEvaluationException {
|
||||
Optional<WithSubtype> rendered = runContextFactory.of().render(
|
||||
Property.<WithSubtype>ofExpression(JacksonMapper.ofJson().writeValueAsString(new MySubtype()))
|
||||
).as(WithSubtype.class);
|
||||
|
||||
assertThat(rendered).isPresent();
|
||||
assertThat(rendered.get()).isInstanceOf(MySubtype.class);
|
||||
|
||||
List<WithSubtype> renderedList = runContextFactory.of().render(
|
||||
Property.<List<WithSubtype>>ofExpression(JacksonMapper.ofJson().writeValueAsString(List.of(new MySubtype())))
|
||||
).asList(WithSubtype.class);
|
||||
assertThat(renderedList).hasSize(1);
|
||||
assertThat(renderedList.get(0)).isInstanceOf(MySubtype.class);
|
||||
}
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(value = MySubtype.class, name = "mySubtype")
|
||||
})
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
public abstract static class WithSubtype {
|
||||
abstract public String getType();
|
||||
}
|
||||
|
||||
@Getter
|
||||
public static class MySubtype extends WithSubtype {
|
||||
private final String type = "mySubtype";
|
||||
}
|
||||
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
private static class TestObj {
|
||||
private String key;
|
||||
private String value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
package io.kestra.core.models.property;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
@@ -28,7 +27,7 @@ import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class URIFetcherTest {
|
||||
@Inject
|
||||
private StorageInterface storage;
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
package io.kestra.core.models.triggers;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -17,7 +16,7 @@ import static io.kestra.core.models.triggers.StatefulTriggerService.*;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class StatefulTriggerInterfaceTest {
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import io.kestra.plugin.core.condition.ExecutionFlow;
|
||||
@@ -23,7 +24,7 @@ import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractMultipleConditionStorageTest {
|
||||
private static final String NAMESPACE = "io.kestra.unit";
|
||||
|
||||
|
||||
@@ -1,17 +1,15 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class PluginConfigurationTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.reporter.Reportable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -11,7 +11,7 @@ import java.time.ZoneId;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractFeatureUsageReportTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.collectors.ServiceUsage;
|
||||
import io.kestra.core.reporter.Reportable;
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
@@ -8,6 +7,7 @@ import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceInstance;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -20,7 +20,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractServiceUsageReportTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.plugin.core.http.Trigger;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.kestra.plugin.core.trigger.Schedule;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -13,7 +13,7 @@ import java.time.Instant;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class PluginMetricReportTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -16,7 +16,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class SystemInformationReportTest {
|
||||
|
||||
@Inject
|
||||
@@ -60,6 +60,15 @@ class SystemInformationReportTest {
|
||||
return setting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting internalSave(Setting setting) throws ConstraintViolationException {
|
||||
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
|
||||
UUID = setting.getValue();
|
||||
}
|
||||
|
||||
return setting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Setting delete(Setting setting) {
|
||||
return setting;
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import com.devskiller.friendly_id.FriendlyId;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.QueryFilter.Field;
|
||||
@@ -24,7 +23,6 @@ import io.kestra.core.models.flows.State.Type;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
@@ -34,6 +32,7 @@ import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import io.micronaut.http.HttpStatus;
|
||||
import io.micronaut.http.exceptions.HttpStatusException;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
@@ -42,10 +41,9 @@ import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.*;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
@@ -60,7 +58,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractExecutionRepositoryTest {
|
||||
public static final String NAMESPACE = "io.kestra.unittest";
|
||||
public static final String FLOW = "full";
|
||||
@@ -185,6 +183,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
@FlakyTest(description = "Filtering tests are sometimes returning 0")
|
||||
void should_find_all(QueryFilter filter, int expectedSize){
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
inject(tenant, "executionTriggerId");
|
||||
|
||||
@@ -10,7 +10,7 @@ import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.event.Level;
|
||||
@@ -28,7 +28,7 @@ import java.util.Objects;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractExecutionServiceTest {
|
||||
@Inject
|
||||
ExecutionService executionService;
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.QueryFilter.Field;
|
||||
@@ -26,6 +25,7 @@ import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.micronaut.context.event.ApplicationEventListener;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -49,7 +49,7 @@ import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractFlowRepositoryTest {
|
||||
public static final String TEST_NAMESPACE = "io.kestra.unittest";
|
||||
public static final String TEST_FLOW_ID = "test";
|
||||
|
||||
@@ -3,8 +3,8 @@ package io.kestra.core.repositories;
|
||||
import io.kestra.core.models.topologies.FlowNode;
|
||||
import io.kestra.core.models.topologies.FlowRelation;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -12,7 +12,7 @@ import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractFlowTopologyRepositoryTest {
|
||||
@Inject
|
||||
private FlowTopologyRepositoryInterface flowTopologyRepository;
|
||||
|
||||
@@ -4,8 +4,8 @@ import io.kestra.core.models.FetchVersion;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -18,7 +18,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractKvMetadataRepositoryTest {
|
||||
@Inject
|
||||
protected KvMetadataRepositoryInterface kvMetadataRepositoryInterface;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.QueryFilter.Field;
|
||||
import io.kestra.core.models.QueryFilter.Op;
|
||||
@@ -14,9 +13,9 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.dashboard.data.Executions;
|
||||
import io.kestra.plugin.core.dashboard.data.Logs;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
@@ -37,7 +36,7 @@ import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractLogRepositoryTest {
|
||||
@Inject
|
||||
protected LogRepositoryInterface logRepository;
|
||||
|
||||
@@ -10,10 +10,9 @@ import io.kestra.core.models.executions.metrics.MetricAggregations;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.ZonedDateTime;
|
||||
@@ -21,7 +20,7 @@ import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractMetricRepositoryTest {
|
||||
@Inject
|
||||
protected MetricRepositoryInterface metricRepository;
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.FetchVersion;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
@@ -21,7 +21,7 @@ import java.util.stream.Stream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractNamespaceFileMetadataRepositoryTest {
|
||||
@Inject
|
||||
protected NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepositoryInterface;
|
||||
|
||||
@@ -2,8 +2,8 @@ package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.utils.VersionProvider;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -12,7 +12,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractSettingRepositoryTest {
|
||||
@Inject
|
||||
protected SettingRepositoryInterface settingRepository;
|
||||
|
||||
@@ -10,7 +10,7 @@ import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.context.event.ApplicationEventListener;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import java.time.Duration;
|
||||
@@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractTemplateRepositoryTest {
|
||||
@Inject
|
||||
protected TemplateRepositoryInterface templateRepository;
|
||||
|
||||
@@ -12,6 +12,7 @@ import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
@@ -28,7 +29,7 @@ import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractTriggerRepositoryTest {
|
||||
private static final String TEST_NAMESPACE = "io.kestra.unittest";
|
||||
|
||||
|
||||
@@ -78,6 +78,7 @@ public abstract class AbstractRunnerConcurrencyTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@FlakyTest(description = "Only flaky in CI")
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-killed")
|
||||
void flowConcurrencyKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyKilled("flow-concurrency-killed");
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
@@ -466,4 +467,20 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.getTaskRunList()).hasSize(2);
|
||||
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/each-pause.yaml"})
|
||||
void killExecutionWithFlowableTask() throws Exception {
|
||||
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "each-pause");
|
||||
|
||||
TaskRun childTaskRun = execution.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("pause")).toList().getFirst();
|
||||
|
||||
Execution killed = executionService.killParentTaskruns(childTaskRun,execution);
|
||||
|
||||
TaskRun parentTaskRun = killed.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("each_task")).toList().getFirst();
|
||||
|
||||
assertThat(parentTaskRun.getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(parentTaskRun.getAttempts().getLast().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -178,7 +178,10 @@ public class FlowConcurrencyCaseTest {
|
||||
// we restart the first one, it should be queued then fail again.
|
||||
Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1);
|
||||
Execution restarted = executionService.restart(failedExecution, null);
|
||||
Execution executionResult1 = runnerUtils.restartExecution(e -> e.getState().getCurrent().equals(Type.FAILED), restarted);
|
||||
Execution executionResult1 = runnerUtils.restartExecution(
|
||||
e -> e.getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED) && e.getState().getCurrent().equals(Type.FAILED),
|
||||
restarted
|
||||
);
|
||||
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2);
|
||||
|
||||
assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
@@ -278,7 +281,6 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||
} finally {
|
||||
// kill everything to avoid dangling executions
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.killExecution(execution2);
|
||||
runnerUtils.killExecution(execution3);
|
||||
|
||||
@@ -321,7 +323,6 @@ public class FlowConcurrencyCaseTest {
|
||||
} finally {
|
||||
// kill everything to avoid dangling executions
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.killExecution(execution2);
|
||||
runnerUtils.killExecution(execution3);
|
||||
|
||||
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
||||
|
||||
@@ -22,6 +22,7 @@ import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.multipart.CompletedFileUpload;
|
||||
import io.micronaut.http.multipart.CompletedPart;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
@@ -42,7 +43,7 @@ import java.util.Optional;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class FlowInputOutputTest {
|
||||
|
||||
private static final String TEST_SECRET_VALUE = "test-secret-value";
|
||||
@@ -239,7 +240,7 @@ class FlowInputOutputTest {
|
||||
// Then
|
||||
Assertions.assertEquals(2, values.size());
|
||||
Assertions.assertFalse(values.get(1).enabled());
|
||||
Assertions.assertNotNull(values.get(1).exception());
|
||||
Assertions.assertNotNull(values.get(1).exceptions());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -257,7 +258,7 @@ class FlowInputOutputTest {
|
||||
List<InputAndValue> values = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, data).block();
|
||||
|
||||
// Then
|
||||
Assertions.assertNull(values.getFirst().exception());
|
||||
Assertions.assertNull(values.getFirst().exceptions());
|
||||
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,25 +1,25 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@@ -27,6 +27,9 @@ abstract public class FlowListenersTest {
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
protected FlowListenersInterface flowListenersService;
|
||||
|
||||
protected static FlowWithSource create(String tenantId, String flowId, String taskId) {
|
||||
FlowWithSource flow = FlowWithSource.builder()
|
||||
.id(flowId)
|
||||
@@ -44,60 +47,56 @@ abstract public class FlowListenersTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FlowListenersTest.class);
|
||||
|
||||
public void suite(FlowListenersInterface flowListenersService) throws TimeoutException {
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
flowListenersService.run();
|
||||
@Test
|
||||
public void all() throws Exception {
|
||||
FlowListenersInterface finalFlowListenersService = flowListenersService;
|
||||
try (finalFlowListenersService) {
|
||||
finalFlowListenersService.run();
|
||||
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
|
||||
flowListenersService.listen(flows -> count.set(getFlowsForTenant(flowListenersService, tenant).size()));
|
||||
flowListenersService.listen(flows -> count.set(getFlowsForTenant(flowListenersService, tenant).size()));
|
||||
|
||||
// initial state
|
||||
LOG.info("-----------> wait for zero");
|
||||
Await.until(() -> count.get() == 0, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isZero();
|
||||
|
||||
// resend on startup done for kafka
|
||||
LOG.info("-----------> wait for zero kafka");
|
||||
if (flowListenersService.getClass().getName().equals("io.kestra.ee.runner.kafka.KafkaFlowListeners")) {
|
||||
// initial state
|
||||
LOG.info("-----------> wait for zero");
|
||||
Await.until(() -> count.get() == 0, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isZero();
|
||||
|
||||
// create first
|
||||
LOG.info("-----------> create first flow");
|
||||
FlowWithSource first = create(tenant, "first_" + IdUtils.create(), "test");
|
||||
FlowWithSource firstUpdated = create(tenant, first.getId(), "test2");
|
||||
|
||||
|
||||
flowRepository.create(GenericFlow.of(first));
|
||||
Await.until(() -> "Expected to have 1 flow but got " + count.get(), () -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
// create the same id than first, no additional flows
|
||||
first = flowRepository.update(GenericFlow.of(firstUpdated), first);
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
FlowWithSource second = create(tenant, "second_" + IdUtils.create(), "test");
|
||||
// create a new one
|
||||
flowRepository.create(GenericFlow.of(second));
|
||||
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
|
||||
|
||||
// delete first
|
||||
FlowWithSource deleted = flowRepository.delete(first);
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
// restore must works
|
||||
flowRepository.create(GenericFlow.of(first));
|
||||
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
// create first
|
||||
LOG.info("-----------> create fist flow");
|
||||
FlowWithSource first = create(tenant, "first_" + IdUtils.create(), "test");
|
||||
FlowWithSource firstUpdated = create(tenant, first.getId(), "test2");
|
||||
|
||||
|
||||
flowRepository.create(GenericFlow.of(first));
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
// create the same id than first, no additional flows
|
||||
first = flowRepository.update(GenericFlow.of(firstUpdated), first);
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
FlowWithSource second = create(tenant, "second_" + IdUtils.create(), "test");
|
||||
// create a new one
|
||||
flowRepository.create(GenericFlow.of(second));
|
||||
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
|
||||
|
||||
// delete first
|
||||
FlowWithSource deleted = flowRepository.delete(first);
|
||||
Await.until(() -> count.get() == 1, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(1);
|
||||
|
||||
// restore must works
|
||||
flowRepository.create(GenericFlow.of(first));
|
||||
Await.until(() -> count.get() == 2, Duration.ofMillis(10), Duration.ofSeconds(5));
|
||||
assertThat(getFlowsForTenant(flowListenersService, tenant).size()).isEqualTo(2);
|
||||
|
||||
}
|
||||
|
||||
public List<FlowWithSource> getFlowsForTenant(FlowListenersInterface flowListenersService, String tenantId){
|
||||
public List<FlowWithSource> getFlowsForTenant(FlowListenersInterface flowListenersService, String tenantId) {
|
||||
return flowListenersService.flows().stream()
|
||||
.filter(f -> tenantId.equals(f.getTenantId()))
|
||||
.toList();
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.CharStreams;
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -137,8 +138,8 @@ public class InputsTest {
|
||||
void missingRequired() {
|
||||
HashMap<String, Object> inputs = new HashMap<>(InputsTest.inputs);
|
||||
inputs.put("string", null);
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(inputs, MAIN_TENANT));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `string`, missing required input, but received `null`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(inputs, MAIN_TENANT));
|
||||
assertThat(e.getMessage()).contains("Missing required input:string");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -232,9 +233,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("validatedString", "foo");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant4"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant4"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedString`, it must match the pattern");
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `validatedString`. Cause: it must match the pattern");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -242,15 +243,15 @@ public class InputsTest {
|
||||
void inputValidatedIntegerBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedInt", "9");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant5"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be more than `10`, but received `9`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant5"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be more than `10`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedInt", "21");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant5"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant5"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be less than `20`, but received `21`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be less than `20`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -258,15 +259,15 @@ public class InputsTest {
|
||||
void inputValidatedDateBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedDate", "2022-01-01");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant6"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be after `2023-01-01`, but received `2022-01-01`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant6"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be after `2023-01-01`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedDate", "2024-01-01");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant6"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant6"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be before `2023-12-31`, but received `2024-01-01`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be before `2023-12-31`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -274,15 +275,15 @@ public class InputsTest {
|
||||
void inputValidatedDateTimeBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedDateTime", "2022-01-01T00:00:00Z");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant7"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be after `2023-01-01T00:00:00Z`, but received `2022-01-01T00:00:00Z`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant7"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be after `2023-01-01T00:00:00Z`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedDateTime", "2024-01-01T00:00:00Z");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant7"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant7"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be before `2023-12-31T23:59:59Z`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be before `2023-12-31T23:59:59Z`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -290,15 +291,15 @@ public class InputsTest {
|
||||
void inputValidatedDurationBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedDuration", "PT1S");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant8"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be more than `PT10S`, but received `PT1S`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant8"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be more than `PT10S`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedDuration", "PT30S");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant8"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant8"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be less than `PT20S`, but received `PT30S`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be less than `PT20S`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -306,15 +307,15 @@ public class InputsTest {
|
||||
void inputValidatedFloatBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedFloat", "0.01");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant9"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be more than `0.1`, but received `0.01`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant9"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be more than `0.1`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedFloat", "1.01");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant9"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant9"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be less than `0.5`, but received `1.01`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be less than `0.5`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -322,15 +323,15 @@ public class InputsTest {
|
||||
void inputValidatedTimeBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedTime", "00:00:01");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant10"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be after `01:00`, but received `00:00:01`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant10"));
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `validatedTime`. Cause: it must be after `01:00`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedTime", "14:00:00");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant10"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant10"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be before `11:59:59`, but received `14:00:00`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedTime`. Cause: it must be before `11:59:59`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -339,9 +340,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("uri", "http:/bla");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant11"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant11"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `uri`, Expected `URI` but received `http:/bla`, but received `http:/bla`");
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `uri`. Cause: Invalid URI format." );
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -350,9 +351,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("enum", "INVALID");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant12"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant12"));
|
||||
|
||||
assertThat(e.getMessage()).isEqualTo("enum: Invalid input for `enum`, it must match the values `[ENUM_VALUE, OTHER_ONE]`, but received `INVALID`");
|
||||
assertThat(e.getMessage()).isEqualTo("Invalid value for input `enum`. Cause: it must match the values `[ENUM_VALUE, OTHER_ONE]`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -361,9 +362,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("array", "[\"s1\", \"s2\"]");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant13"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant13"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `array`, Unable to parse array element as `INT` on `s1`, but received `[\"s1\", \"s2\"]`");
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `array`. Cause: Unable to parse array element as `INT` on `s1`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -467,7 +468,20 @@ public class InputsTest {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat((String) execution.findTaskRunsByTaskId("file").getFirst().getOutputs().get("value")).isEqualTo(file.toString());
|
||||
}
|
||||
@Test
|
||||
@LoadFlows(value = "flows/invalids/inputs-with-multiple-constraint-violations.yaml")
|
||||
void multipleConstraintViolations() {
|
||||
InputOutputValidationException ex = assertThrows(InputOutputValidationException.class, ()-> runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "inputs-with-multiple-constraint-violations", null,
|
||||
(f, e) ->flowIO.readExecutionInputs(f, e , Map.of("multi", List.of("F", "H")) )));
|
||||
|
||||
List<String> messages = Arrays.asList(ex.getMessage().split(System.lineSeparator()));
|
||||
|
||||
assertThat(messages).containsExactlyInAnyOrder(
|
||||
"Invalid value for input `multi`. Cause: you can't define both `values` and `options`",
|
||||
"Invalid value for input `multi`. Cause: value `F` doesn't match the values `[A, B, C]`",
|
||||
"Invalid value for input `multi`. Cause: value `H` doesn't match the values `[A, B, C]`"
|
||||
);
|
||||
}
|
||||
private URI createFile() throws IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
Files.write(tempFile.toPath(), "Hello World".getBytes());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -71,6 +72,6 @@ public class NoEncryptionConfiguredTest implements TestPropertyProvider {
|
||||
.flowId(flow.getId())
|
||||
.build();
|
||||
|
||||
assertThrows(ConstraintViolationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs));
|
||||
assertThrows(InputOutputValidationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
@@ -24,6 +23,7 @@ import io.kestra.core.utils.IdUtils;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
@@ -36,7 +36,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class RunVariablesTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -14,7 +14,7 @@ import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class VariableRendererTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -6,7 +6,7 @@ import com.google.common.collect.ImmutableSet;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.utils.Rethrow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
@@ -18,7 +18,7 @@ import jakarta.inject.Inject;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class PebbleVariableRendererTest {
|
||||
@Inject
|
||||
VariableRenderer variableRenderer;
|
||||
|
||||
@@ -6,7 +6,7 @@ import com.google.common.collect.ImmutableSet;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -15,7 +15,7 @@ import java.util.Collections;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
@Property(name = "kestra.variables.recursive-rendering", value = "true")
|
||||
class RecursivePebbleVariableRendererTest {
|
||||
@Inject
|
||||
|
||||
@@ -3,7 +3,7 @@ package io.kestra.core.runners.pebble.functions;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -13,7 +13,7 @@ import java.util.Map;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class EncryptDecryptFunctionTest {
|
||||
@Inject
|
||||
private VariableRenderer variableRenderer;
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.repositories.LogRepositoryInterface;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -18,7 +18,7 @@ import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
@Property(name = "kestra.server-type", value = "WORKER")
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
class ErrorLogsFunctionTest {
|
||||
|
||||
@@ -4,7 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -13,7 +13,7 @@ import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class FetchContextFunctionTest {
|
||||
@Inject
|
||||
VariableRenderer variableRenderer;
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -30,7 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
@KestraTest(rebuildContext = true)
|
||||
@MicronautTest(rebuildContext = true)
|
||||
class FileExistsFunctionTest {
|
||||
|
||||
private static final String NAMESPACE = "my.namespace";
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -27,10 +26,9 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.hibernate.validator.internal.util.Contracts.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest(rebuildContext = true)
|
||||
@MicronautTest(rebuildContext = true)
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
public class FileSizeFunctionTest {
|
||||
private static final String FLOW = "flow";
|
||||
|
||||
@@ -5,14 +5,14 @@ import java.util.Map;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class FileURIFunctionTest {
|
||||
@Inject
|
||||
private VariableRenderer variableRenderer;
|
||||
|
||||
@@ -2,11 +2,11 @@ package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.serializers.FileSerde;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -21,7 +21,7 @@ import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class FromIonFunctionTest {
|
||||
@Inject
|
||||
VariableRenderer variableRenderer;
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
@@ -11,7 +11,7 @@ import jakarta.inject.Inject;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class FromJsonFunctionTest {
|
||||
@Inject
|
||||
VariableRenderer variableRenderer;
|
||||
|
||||
@@ -6,8 +6,8 @@ import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
|
||||
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.hc.client5.http.utils.Base64;
|
||||
@@ -25,7 +25,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
@WireMockTest(httpPort = 28182)
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
class HttpFunctionTest {
|
||||
|
||||
@@ -3,13 +3,13 @@ package io.kestra.core.runners.pebble.functions;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.Collections;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class IDFunctionTest {
|
||||
@Inject VariableRenderer variableRenderer;
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
@@ -9,6 +8,7 @@ import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -27,7 +27,7 @@ import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@KestraTest(rebuildContext = true)
|
||||
@MicronautTest(rebuildContext = true)
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
class IsFileEmptyFunctionTest {
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user