mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
94 Commits
plugin/tem
...
feat/entit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
63e11c7d94 | ||
|
|
11e199da33 | ||
|
|
5d5165b7b9 | ||
|
|
44d0c10713 | ||
|
|
167734e32a | ||
|
|
24e61c81c0 | ||
|
|
379764a033 | ||
|
|
d55dd275c3 | ||
|
|
f409657e8a | ||
|
|
22f0b3ffdf | ||
|
|
0d99dc6862 | ||
|
|
fd3adc48b8 | ||
|
|
1a8a47c8cd | ||
|
|
7ea95f393e | ||
|
|
6935900699 | ||
|
|
0bc8e8d74a | ||
|
|
7f77b24ae0 | ||
|
|
ec6820dc25 | ||
|
|
d94193c143 | ||
|
|
c9628047fa | ||
|
|
4cbc069af4 | ||
|
|
eabe573fe6 | ||
|
|
ecd64617c3 | ||
|
|
a5650bca0f | ||
|
|
ed59e262d4 | ||
|
|
a5f9d54f7d | ||
|
|
47f4f43198 | ||
|
|
5d31c97f7f | ||
|
|
f8107285c4 | ||
|
|
8dc8dc1796 | ||
|
|
834dfd2947 | ||
|
|
6edb88841f | ||
|
|
5653531628 | ||
|
|
ee61276106 | ||
|
|
abcf76f7b4 | ||
|
|
67ada7f61b | ||
|
|
0c13633f77 | ||
|
|
a6cf2015ff | ||
|
|
2f9216c70b | ||
|
|
1903e6fac5 | ||
|
|
2d2cb00cab | ||
|
|
01b5441d16 | ||
|
|
efc778e294 | ||
|
|
60235a4e73 | ||
|
|
b167c52e76 | ||
|
|
216b124294 | ||
|
|
b6e4df8de2 | ||
|
|
429e7c7945 | ||
|
|
e302b4be4a | ||
|
|
8e7ad9ae25 | ||
|
|
41a11abf16 | ||
|
|
1be16d5e9d | ||
|
|
e263224d7b | ||
|
|
12b89588a6 | ||
|
|
eae5eb80cb | ||
|
|
c0f6298484 | ||
|
|
ba1d6b2232 | ||
|
|
048dcb80cc | ||
|
|
a81de811d7 | ||
|
|
a960a9f982 | ||
|
|
c4d4fd935f | ||
|
|
f063a5a2d9 | ||
|
|
ac91d5605f | ||
|
|
e3d3c3651b | ||
|
|
5b6836237e | ||
|
|
2f8284b133 | ||
|
|
42992fd7c3 | ||
|
|
3a481f93d3 | ||
|
|
7e964ae563 | ||
|
|
25e54edbc9 | ||
|
|
e88dc7af76 | ||
|
|
b7a027f0dc | ||
|
|
98141d6010 | ||
|
|
bf119ab6df | ||
|
|
9bd6353b77 | ||
|
|
c0ab581cf1 | ||
|
|
0f38e19663 | ||
|
|
0c14ea621c | ||
|
|
fb14e57a7c | ||
|
|
09c707d865 | ||
|
|
86e08d71dd | ||
|
|
94c00cedeb | ||
|
|
eb12832b1e | ||
|
|
687cefdfb9 | ||
|
|
8eae8aba72 | ||
|
|
abdbb8d364 | ||
|
|
8a55ab3af6 | ||
|
|
b7cb933e1e | ||
|
|
3af003e5e4 | ||
|
|
c3861a5532 | ||
|
|
ae1f10f45a | ||
|
|
612dccfb8c | ||
|
|
2ae8df2f5f | ||
|
|
1abfa74a16 |
6
.github/dependabot.yml
vendored
6
.github/dependabot.yml
vendored
@@ -51,7 +51,7 @@ updates:
|
|||||||
|
|
||||||
storybook:
|
storybook:
|
||||||
applies-to: version-updates
|
applies-to: version-updates
|
||||||
patterns: ["storybook*", "@storybook/*"]
|
patterns: ["storybook*", "@storybook/*", "eslint-plugin-storybook"]
|
||||||
|
|
||||||
vitest:
|
vitest:
|
||||||
applies-to: version-updates
|
applies-to: version-updates
|
||||||
@@ -67,10 +67,10 @@ updates:
|
|||||||
"@types/*",
|
"@types/*",
|
||||||
"storybook*",
|
"storybook*",
|
||||||
"@storybook/*",
|
"@storybook/*",
|
||||||
|
"eslint-plugin-storybook",
|
||||||
"vitest",
|
"vitest",
|
||||||
"@vitest/*",
|
"@vitest/*",
|
||||||
# Temporary exclusion of these packages from major updates
|
# Temporary exclusion of these packages from major updates
|
||||||
"eslint-plugin-storybook",
|
|
||||||
"eslint-plugin-vue",
|
"eslint-plugin-vue",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -84,6 +84,7 @@ updates:
|
|||||||
"@types/*",
|
"@types/*",
|
||||||
"storybook*",
|
"storybook*",
|
||||||
"@storybook/*",
|
"@storybook/*",
|
||||||
|
"eslint-plugin-storybook",
|
||||||
"vitest",
|
"vitest",
|
||||||
"@vitest/*",
|
"@vitest/*",
|
||||||
# Temporary exclusion of these packages from minor updates
|
# Temporary exclusion of these packages from minor updates
|
||||||
@@ -102,6 +103,7 @@ updates:
|
|||||||
"@types/*",
|
"@types/*",
|
||||||
"storybook*",
|
"storybook*",
|
||||||
"@storybook/*",
|
"@storybook/*",
|
||||||
|
"eslint-plugin-storybook",
|
||||||
"vitest",
|
"vitest",
|
||||||
"@vitest/*",
|
"@vitest/*",
|
||||||
]
|
]
|
||||||
|
|||||||
2
.github/workflows/vulnerabilities-check.yml
vendored
2
.github/workflows/vulnerabilities-check.yml
vendored
@@ -43,7 +43,7 @@ jobs:
|
|||||||
|
|
||||||
# Upload dependency check report
|
# Upload dependency check report
|
||||||
- name: Upload dependency check report
|
- name: Upload dependency check report
|
||||||
uses: actions/upload-artifact@v5
|
uses: actions/upload-artifact@v6
|
||||||
if: ${{ always() }}
|
if: ${{ always() }}
|
||||||
with:
|
with:
|
||||||
name: dependency-check-report
|
name: dependency-check-report
|
||||||
|
|||||||
@@ -29,8 +29,8 @@ start_time2=$(date +%s)
|
|||||||
|
|
||||||
echo "cd ./ui"
|
echo "cd ./ui"
|
||||||
cd ./ui
|
cd ./ui
|
||||||
echo "npm i"
|
echo "npm ci"
|
||||||
npm i
|
npm ci
|
||||||
|
|
||||||
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
||||||
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ plugins {
|
|||||||
|
|
||||||
// test
|
// test
|
||||||
id "com.adarshr.test-logger" version "4.0.0"
|
id "com.adarshr.test-logger" version "4.0.0"
|
||||||
id "org.sonarqube" version "7.2.0.6526"
|
id "org.sonarqube" version "7.2.1.6560"
|
||||||
id 'jacoco-report-aggregation'
|
id 'jacoco-report-aggregation'
|
||||||
|
|
||||||
// helper
|
// helper
|
||||||
@@ -331,7 +331,7 @@ subprojects {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
agent "org.aspectj:aspectjweaver:1.9.25"
|
agent "org.aspectj:aspectjweaver:1.9.25.1"
|
||||||
}
|
}
|
||||||
|
|
||||||
test {
|
test {
|
||||||
|
|||||||
@@ -137,6 +137,11 @@ flyway:
|
|||||||
# We must ignore missing migrations as we delete some wrong or not used anymore migrations
|
# We must ignore missing migrations as we delete some wrong or not used anymore migrations
|
||||||
ignore-migration-patterns: "*:missing,*:future"
|
ignore-migration-patterns: "*:missing,*:future"
|
||||||
out-of-order: true
|
out-of-order: true
|
||||||
|
properties:
|
||||||
|
flyway:
|
||||||
|
postgresql:
|
||||||
|
transactional:
|
||||||
|
lock: false
|
||||||
mysql:
|
mysql:
|
||||||
enabled: true
|
enabled: true
|
||||||
locations:
|
locations:
|
||||||
|
|||||||
@@ -82,8 +82,8 @@ dependencies {
|
|||||||
testImplementation "io.micronaut:micronaut-http-server-netty"
|
testImplementation "io.micronaut:micronaut-http-server-netty"
|
||||||
testImplementation "io.micronaut:micronaut-management"
|
testImplementation "io.micronaut:micronaut-management"
|
||||||
|
|
||||||
testImplementation "org.testcontainers:testcontainers:1.21.3"
|
testImplementation "org.testcontainers:testcontainers:1.21.4"
|
||||||
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
|
testImplementation "org.testcontainers:junit-jupiter:1.21.4"
|
||||||
testImplementation "org.bouncycastle:bcpkix-jdk18on"
|
testImplementation "org.bouncycastle:bcpkix-jdk18on"
|
||||||
|
|
||||||
testImplementation "org.wiremock:wiremock-jetty12"
|
testImplementation "org.wiremock:wiremock-jetty12"
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package io.kestra.core.docs;
|
|||||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||||
import io.kestra.core.plugins.RegisteredPlugin;
|
import io.kestra.core.plugins.RegisteredPlugin;
|
||||||
import io.micronaut.core.annotation.Nullable;
|
import io.micronaut.core.annotation.Nullable;
|
||||||
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
@@ -117,10 +118,17 @@ public class Plugin {
|
|||||||
.filter(not(io.kestra.core.models.Plugin::isInternal))
|
.filter(not(io.kestra.core.models.Plugin::isInternal))
|
||||||
.filter(clazzFilter)
|
.filter(clazzFilter)
|
||||||
.filter(c -> !c.getName().startsWith("org.kestra."))
|
.filter(c -> !c.getName().startsWith("org.kestra."))
|
||||||
.map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null))
|
.map(c -> {
|
||||||
|
Schema schema = c.getAnnotation(Schema.class);
|
||||||
|
|
||||||
|
var title = Optional.ofNullable(schema).map(Schema::title).filter(t -> !t.isEmpty()).orElse(null);
|
||||||
|
var description = Optional.ofNullable(schema).map(Schema::description).filter(d -> !d.isEmpty()).orElse(null);
|
||||||
|
var deprecated = io.kestra.core.models.Plugin.isDeprecated(c) ? true : null;
|
||||||
|
|
||||||
|
return new PluginElementMetadata(c.getName(), deprecated, title, description);
|
||||||
|
})
|
||||||
.toList();
|
.toList();
|
||||||
}
|
}
|
||||||
|
|
||||||
public record PluginElementMetadata(String cls, Boolean deprecated) {
|
public record PluginElementMetadata(String cls, Boolean deprecated, String title, String description) {}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,37 @@
|
|||||||
|
package io.kestra.core.exceptions;
|
||||||
|
|
||||||
|
import io.kestra.core.models.flows.Data;
|
||||||
|
import io.kestra.core.models.flows.Input;
|
||||||
|
import io.kestra.core.models.flows.Output;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception that can be thrown when Inputs/Outputs have validation problems.
|
||||||
|
*/
|
||||||
|
public class InputOutputValidationException extends KestraRuntimeException {
|
||||||
|
public InputOutputValidationException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
public static InputOutputValidationException of( String message, Input<?> input){
|
||||||
|
String inputMessage = "Invalid value for input" + " `" + input.getId() + "`. Cause: " + message;
|
||||||
|
return new InputOutputValidationException(inputMessage);
|
||||||
|
}
|
||||||
|
public static InputOutputValidationException of( String message, Output output){
|
||||||
|
String outputMessage = "Invalid value for output" + " `" + output.getId() + "`. Cause: " + message;
|
||||||
|
return new InputOutputValidationException(outputMessage);
|
||||||
|
}
|
||||||
|
public static InputOutputValidationException of(String message){
|
||||||
|
return new InputOutputValidationException(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static InputOutputValidationException merge(Set<InputOutputValidationException> exceptions){
|
||||||
|
String combinedMessage = exceptions.stream()
|
||||||
|
.map(InputOutputValidationException::getMessage)
|
||||||
|
.collect(Collectors.joining(System.lineSeparator()));
|
||||||
|
throw new InputOutputValidationException(combinedMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,6 +1,8 @@
|
|||||||
package io.kestra.core.exceptions;
|
package io.kestra.core.exceptions;
|
||||||
|
|
||||||
import java.io.Serial;
|
import java.io.Serial;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
|
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
package io.kestra.core.models;
|
|
||||||
|
|
||||||
public interface DeletedInterface {
|
|
||||||
boolean isDeleted();
|
|
||||||
}
|
|
||||||
@@ -26,6 +26,7 @@ public record Label(
|
|||||||
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
|
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
|
||||||
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
|
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
|
||||||
public static final String TEST = SYSTEM_PREFIX + "test";
|
public static final String TEST = SYSTEM_PREFIX + "test";
|
||||||
|
public static final String FROM = SYSTEM_PREFIX + "from";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Static helper method for converting a list of labels to a nested map.
|
* Static helper method for converting a list of labels to a nested map.
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import jakarta.validation.constraints.NotNull;
|
|||||||
public class Setting {
|
public class Setting {
|
||||||
public static final String INSTANCE_UUID = "instance.uuid";
|
public static final String INSTANCE_UUID = "instance.uuid";
|
||||||
public static final String INSTANCE_VERSION = "instance.version";
|
public static final String INSTANCE_VERSION = "instance.version";
|
||||||
|
public static final String INSTANCE_EDITION = "instance.edition";
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
private String key;
|
private String key;
|
||||||
|
|||||||
18
core/src/main/java/io/kestra/core/models/SoftDeletable.java
Normal file
18
core/src/main/java/io/kestra/core/models/SoftDeletable.java
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
package io.kestra.core.models;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This interface marks entities that implement a soft deletion mechanism.
|
||||||
|
* Soft deletion is based on a <code>deleted</code> field that is set to <code>true</code> when the entity is deleted.
|
||||||
|
* Physical deletion either never occurs or occurs in a dedicated purge mechanism.
|
||||||
|
*/
|
||||||
|
public interface SoftDeletable<T> {
|
||||||
|
/**
|
||||||
|
* Whether en entity is deleted or not.
|
||||||
|
*/
|
||||||
|
boolean isDeleted();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete the current entity: set its <code>deleted</code> field to <code>true</code>.
|
||||||
|
*/
|
||||||
|
T toDeleted();
|
||||||
|
}
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
package io.kestra.core.models.dashboards;
|
package io.kestra.core.models.dashboards;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import io.kestra.core.models.DeletedInterface;
|
import io.kestra.core.models.SoftDeletable;
|
||||||
import io.kestra.core.models.HasUID;
|
import io.kestra.core.models.HasUID;
|
||||||
import io.kestra.core.models.dashboards.charts.Chart;
|
import io.kestra.core.models.dashboards.charts.Chart;
|
||||||
import io.kestra.core.utils.IdUtils;
|
import io.kestra.core.utils.IdUtils;
|
||||||
@@ -26,7 +26,7 @@ import java.util.Objects;
|
|||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@Introspected
|
@Introspected
|
||||||
@ToString
|
@ToString
|
||||||
public class Dashboard implements HasUID, DeletedInterface {
|
public class Dashboard implements HasUID, SoftDeletable<Dashboard> {
|
||||||
@Hidden
|
@Hidden
|
||||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||||
private String tenantId;
|
private String tenantId;
|
||||||
@@ -71,6 +71,7 @@ public class Dashboard implements HasUID, DeletedInterface {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Dashboard toDeleted() {
|
public Dashboard toDeleted() {
|
||||||
return this.toBuilder()
|
return this.toBuilder()
|
||||||
.deleted(true)
|
.deleted(true)
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import com.google.common.collect.ImmutableMap;
|
|||||||
import com.google.common.collect.Streams;
|
import com.google.common.collect.Streams;
|
||||||
import io.kestra.core.debug.Breakpoint;
|
import io.kestra.core.debug.Breakpoint;
|
||||||
import io.kestra.core.exceptions.InternalException;
|
import io.kestra.core.exceptions.InternalException;
|
||||||
import io.kestra.core.models.DeletedInterface;
|
import io.kestra.core.models.SoftDeletable;
|
||||||
import io.kestra.core.models.Label;
|
import io.kestra.core.models.Label;
|
||||||
import io.kestra.core.models.TenantInterface;
|
import io.kestra.core.models.TenantInterface;
|
||||||
import io.kestra.core.models.flows.Flow;
|
import io.kestra.core.models.flows.Flow;
|
||||||
@@ -53,7 +53,7 @@ import java.util.zip.CRC32;
|
|||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
@ToString
|
@ToString
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
public class Execution implements DeletedInterface, TenantInterface {
|
public class Execution implements SoftDeletable<Execution>, TenantInterface {
|
||||||
|
|
||||||
@With
|
@With
|
||||||
@Hidden
|
@Hidden
|
||||||
@@ -1111,7 +1111,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
|||||||
.toList();
|
.toList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Execution toDeleted() {
|
public Execution toDeleted() {
|
||||||
return this.toBuilder()
|
return this.toBuilder()
|
||||||
.deleted(true)
|
.deleted(true)
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package io.kestra.core.models.executions;
|
package io.kestra.core.models.executions;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import io.kestra.core.models.DeletedInterface;
|
|
||||||
import io.kestra.core.models.TenantInterface;
|
import io.kestra.core.models.TenantInterface;
|
||||||
import io.kestra.core.models.flows.FlowInterface;
|
import io.kestra.core.models.flows.FlowInterface;
|
||||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||||
@@ -22,7 +21,7 @@ import java.util.stream.Stream;
|
|||||||
|
|
||||||
@Value
|
@Value
|
||||||
@Builder(toBuilder = true)
|
@Builder(toBuilder = true)
|
||||||
public class LogEntry implements DeletedInterface, TenantInterface {
|
public class LogEntry implements TenantInterface {
|
||||||
@Hidden
|
@Hidden
|
||||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||||
String tenantId;
|
String tenantId;
|
||||||
@@ -57,10 +56,6 @@ public class LogEntry implements DeletedInterface, TenantInterface {
|
|||||||
|
|
||||||
String message;
|
String message;
|
||||||
|
|
||||||
@NotNull
|
|
||||||
@Builder.Default
|
|
||||||
boolean deleted = false;
|
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
ExecutionKind executionKind;
|
ExecutionKind executionKind;
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package io.kestra.core.models.executions;
|
package io.kestra.core.models.executions;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import io.kestra.core.models.DeletedInterface;
|
|
||||||
import io.kestra.core.models.TenantInterface;
|
import io.kestra.core.models.TenantInterface;
|
||||||
import io.kestra.core.models.executions.metrics.Counter;
|
import io.kestra.core.models.executions.metrics.Counter;
|
||||||
import io.kestra.core.models.executions.metrics.Gauge;
|
import io.kestra.core.models.executions.metrics.Gauge;
|
||||||
@@ -18,7 +17,7 @@ import jakarta.validation.constraints.Pattern;
|
|||||||
|
|
||||||
@Value
|
@Value
|
||||||
@Builder(toBuilder = true)
|
@Builder(toBuilder = true)
|
||||||
public class MetricEntry implements DeletedInterface, TenantInterface {
|
public class MetricEntry implements TenantInterface {
|
||||||
@Hidden
|
@Hidden
|
||||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||||
String tenantId;
|
String tenantId;
|
||||||
@@ -54,10 +53,6 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
|
|||||||
@Nullable
|
@Nullable
|
||||||
Map<String, String> tags;
|
Map<String, String> tags;
|
||||||
|
|
||||||
@NotNull
|
|
||||||
@Builder.Default
|
|
||||||
boolean deleted = false;
|
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
ExecutionKind executionKind;
|
ExecutionKind executionKind;
|
||||||
|
|
||||||
|
|||||||
@@ -3,9 +3,7 @@ package io.kestra.core.models.executions;
|
|||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import io.kestra.core.models.TenantInterface;
|
import io.kestra.core.models.TenantInterface;
|
||||||
import io.kestra.core.models.flows.State;
|
import io.kestra.core.models.flows.State;
|
||||||
import io.kestra.core.models.tasks.FlowableTask;
|
|
||||||
import io.kestra.core.models.tasks.ResolvedTask;
|
import io.kestra.core.models.tasks.ResolvedTask;
|
||||||
import io.kestra.core.models.tasks.Task;
|
|
||||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||||
import io.kestra.core.utils.IdUtils;
|
import io.kestra.core.utils.IdUtils;
|
||||||
import io.swagger.v3.oas.annotations.Hidden;
|
import io.swagger.v3.oas.annotations.Hidden;
|
||||||
@@ -95,8 +93,16 @@ public class TaskRun implements TenantInterface {
|
|||||||
this.forceExecution
|
this.forceExecution
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
public TaskRun withStateAndAttempt(State.Type state) {
|
||||||
|
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
|
||||||
|
|
||||||
|
if (newAttempts.isEmpty()) {
|
||||||
|
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
|
||||||
|
} else {
|
||||||
|
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
|
||||||
|
newAttempts.set(newAttempts.size() - 1, updatedLast);
|
||||||
|
}
|
||||||
|
|
||||||
public TaskRun replaceState(State newState) {
|
|
||||||
return new TaskRun(
|
return new TaskRun(
|
||||||
this.tenantId,
|
this.tenantId,
|
||||||
this.id,
|
this.id,
|
||||||
@@ -106,9 +112,9 @@ public class TaskRun implements TenantInterface {
|
|||||||
this.taskId,
|
this.taskId,
|
||||||
this.parentTaskRunId,
|
this.parentTaskRunId,
|
||||||
this.value,
|
this.value,
|
||||||
this.attempts,
|
newAttempts,
|
||||||
this.outputs,
|
this.outputs,
|
||||||
newState,
|
this.state.withState(state),
|
||||||
this.iteration,
|
this.iteration,
|
||||||
this.dynamic,
|
this.dynamic,
|
||||||
this.forceExecution
|
this.forceExecution
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
package io.kestra.core.models.flows;
|
package io.kestra.core.models.flows;
|
||||||
|
|
||||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
|
||||||
import jakarta.validation.ConstraintViolationException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for defining an identifiable and typed data.
|
* Interface for defining an identifiable and typed data.
|
||||||
@@ -29,16 +27,4 @@ public interface Data {
|
|||||||
*/
|
*/
|
||||||
String getDisplayName();
|
String getDisplayName();
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
default ConstraintViolationException toConstraintViolationException(String message, Object value) {
|
|
||||||
Class<Data> cls = (Class<Data>) this.getClass();
|
|
||||||
|
|
||||||
return ManualConstraintViolation.toConstraintViolationException(
|
|
||||||
"Invalid " + (this instanceof Output ? "output" : "input") + " for `" + getId() + "`, " + message + ", but received `" + value + "`",
|
|
||||||
this,
|
|
||||||
cls,
|
|
||||||
this.getId(),
|
|
||||||
value
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package io.kestra.core.models.flows;
|
package io.kestra.core.models.flows;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
@@ -130,7 +129,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
|||||||
@Valid
|
@Valid
|
||||||
@PluginProperty
|
@PluginProperty
|
||||||
List<SLA> sla;
|
List<SLA> sla;
|
||||||
|
|
||||||
@Schema(
|
@Schema(
|
||||||
title = "Conditions evaluated before the flow is executed.",
|
title = "Conditions evaluated before the flow is executed.",
|
||||||
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
|
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
|
||||||
@@ -343,6 +342,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Flow toDeleted() {
|
public Flow toDeleted() {
|
||||||
return this.toBuilder()
|
return this.toBuilder()
|
||||||
.revision(this.revision + 1)
|
.revision(this.revision + 1)
|
||||||
@@ -355,7 +355,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
|||||||
* To be conservative a flow MUST not return any source.
|
* To be conservative a flow MUST not return any source.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@JsonIgnore
|
@Schema(hidden = true)
|
||||||
public String getSource() {
|
public String getSource() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,4 +58,9 @@ public class FlowForExecution extends AbstractFlow {
|
|||||||
public String getSource() {
|
public String getSource() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FlowForExecution toDeleted() {
|
||||||
|
throw new UnsupportedOperationException("Can't delete a FlowForExecution");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
|
|||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||||
import io.kestra.core.models.DeletedInterface;
|
import io.kestra.core.models.SoftDeletable;
|
||||||
import io.kestra.core.models.HasSource;
|
import io.kestra.core.models.HasSource;
|
||||||
import io.kestra.core.models.HasUID;
|
import io.kestra.core.models.HasUID;
|
||||||
import io.kestra.core.models.Label;
|
import io.kestra.core.models.Label;
|
||||||
@@ -27,7 +27,7 @@ import java.util.stream.Collectors;
|
|||||||
* The base interface for FLow.
|
* The base interface for FLow.
|
||||||
*/
|
*/
|
||||||
@JsonDeserialize(as = GenericFlow.class)
|
@JsonDeserialize(as = GenericFlow.class)
|
||||||
public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface, HasUID, HasSource {
|
public interface FlowInterface extends FlowId, SoftDeletable<FlowInterface>, TenantInterface, HasUID, HasSource {
|
||||||
|
|
||||||
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");
|
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");
|
||||||
|
|
||||||
|
|||||||
@@ -1,14 +1,12 @@
|
|||||||
package io.kestra.core.models.flows;
|
package io.kestra.core.models.flows;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
|
||||||
import io.micronaut.core.annotation.Introspected;
|
import io.micronaut.core.annotation.Introspected;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.ToString;
|
import lombok.ToString;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
@SuperBuilder(toBuilder = true)
|
@SuperBuilder(toBuilder = true)
|
||||||
@Getter
|
@Getter
|
||||||
@@ -48,7 +46,7 @@ public class FlowWithSource extends Flow {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonIgnore(value = false)
|
@Schema(hidden = false)
|
||||||
public String getSource() {
|
public String getSource() {
|
||||||
return this.source;
|
return this.source;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -96,4 +96,9 @@ public class GenericFlow extends AbstractFlow implements HasUID {
|
|||||||
public List<GenericTrigger> getTriggers() {
|
public List<GenericTrigger> getTriggers() {
|
||||||
return Optional.ofNullable(triggers).orElse(List.of());
|
return Optional.ofNullable(triggers).orElse(List.of());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FlowInterface toDeleted() {
|
||||||
|
throw new UnsupportedOperationException("Can't delete a GenericFlow");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
package io.kestra.core.models.flows.input;
|
package io.kestra.core.models.flows.input;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||||
import io.kestra.core.models.flows.Input;
|
import io.kestra.core.models.flows.Input;
|
||||||
import jakarta.annotation.Nullable;
|
import jakarta.annotation.Nullable;
|
||||||
import jakarta.validation.ConstraintViolationException;
|
|
||||||
import jakarta.validation.constraints.NotNull;
|
import jakarta.validation.constraints.NotNull;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents an input along with its associated value and validation state.
|
* Represents an input along with its associated value and validation state.
|
||||||
*
|
*
|
||||||
@@ -12,15 +14,15 @@ import jakarta.validation.constraints.NotNull;
|
|||||||
* @param value The provided value for the input.
|
* @param value The provided value for the input.
|
||||||
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
|
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
|
||||||
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
|
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
|
||||||
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
|
* @param exceptions The validation exceptions, if the input value is invalid; {@code null} otherwise.
|
||||||
*/
|
*/
|
||||||
public record InputAndValue(
|
public record InputAndValue(
|
||||||
Input<?> input,
|
Input<?> input,
|
||||||
Object value,
|
Object value,
|
||||||
boolean enabled,
|
boolean enabled,
|
||||||
boolean isDefault,
|
boolean isDefault,
|
||||||
ConstraintViolationException exception) {
|
Set<InputOutputValidationException> exceptions) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@link InputAndValue} instance.
|
* Creates a new {@link InputAndValue} instance.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.property.Property;
|
|||||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||||
import io.kestra.core.validations.Regex;
|
import io.kestra.core.validations.Regex;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
|
import jakarta.validation.ConstraintViolation;
|
||||||
import jakarta.validation.ConstraintViolationException;
|
import jakarta.validation.ConstraintViolationException;
|
||||||
import jakarta.validation.constraints.NotNull;
|
import jakarta.validation.constraints.NotNull;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
@@ -14,10 +15,7 @@ import lombok.Getter;
|
|||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@@ -77,30 +75,35 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void validate(List<String> inputs) throws ConstraintViolationException {
|
public void validate(List<String> inputs) throws ConstraintViolationException {
|
||||||
|
Set<ConstraintViolation<?>> violations = new HashSet<>();
|
||||||
|
|
||||||
if (values != null && options != null) {
|
if (values != null && options != null) {
|
||||||
throw ManualConstraintViolation.toConstraintViolationException(
|
violations.add( ManualConstraintViolation.of(
|
||||||
"you can't define both `values` and `options`",
|
"you can't define both `values` and `options`",
|
||||||
this,
|
this,
|
||||||
MultiselectInput.class,
|
MultiselectInput.class,
|
||||||
getId(),
|
getId(),
|
||||||
""
|
""
|
||||||
);
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!this.getAllowCustomValue()) {
|
if (!this.getAllowCustomValue()) {
|
||||||
for (String input : inputs) {
|
for (String input : inputs) {
|
||||||
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
|
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
|
||||||
if (!finalValues.contains(input)) {
|
if (!finalValues.contains(input)) {
|
||||||
throw ManualConstraintViolation.toConstraintViolationException(
|
violations.add(ManualConstraintViolation.of(
|
||||||
"it must match the values `" + finalValues + "`",
|
"value `" + input + "` doesn't match the values `" + finalValues + "`",
|
||||||
this,
|
this,
|
||||||
MultiselectInput.class,
|
MultiselectInput.class,
|
||||||
getId(),
|
getId(),
|
||||||
input
|
input
|
||||||
);
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!violations.isEmpty()) {
|
||||||
|
throw ManualConstraintViolation.toConstraintViolationException(violations);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} **/
|
/** {@inheritDoc} **/
|
||||||
@@ -145,7 +148,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
|||||||
|
|
||||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||||
throw ManualConstraintViolation.toConstraintViolationException(
|
throw ManualConstraintViolation.toConstraintViolationException(
|
||||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
"Invalid expression result. Expected a list of strings",
|
||||||
this,
|
this,
|
||||||
MultiselectInput.class,
|
MultiselectInput.class,
|
||||||
getId(),
|
getId(),
|
||||||
|
|||||||
@@ -125,7 +125,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
|
|||||||
|
|
||||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||||
throw ManualConstraintViolation.toConstraintViolationException(
|
throw ManualConstraintViolation.toConstraintViolationException(
|
||||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
"Invalid expression result. Expected a list of strings",
|
||||||
this,
|
this,
|
||||||
SelectInput.class,
|
SelectInput.class,
|
||||||
getId(),
|
getId(),
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package io.kestra.core.models.kv;
|
package io.kestra.core.models.kv;
|
||||||
|
|
||||||
import io.kestra.core.models.DeletedInterface;
|
import io.kestra.core.models.SoftDeletable;
|
||||||
import io.kestra.core.models.HasUID;
|
import io.kestra.core.models.HasUID;
|
||||||
import io.kestra.core.models.TenantInterface;
|
import io.kestra.core.models.TenantInterface;
|
||||||
import io.kestra.core.storages.kv.KVEntry;
|
import io.kestra.core.storages.kv.KVEntry;
|
||||||
@@ -22,7 +22,7 @@ import java.util.Optional;
|
|||||||
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
||||||
@ToString
|
@ToString
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID {
|
public class PersistedKvMetadata implements SoftDeletable<PersistedKvMetadata>, TenantInterface, HasUID {
|
||||||
@With
|
@With
|
||||||
@Hidden
|
@Hidden
|
||||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||||
@@ -83,6 +83,7 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
|
|||||||
return this.toBuilder().updated(Instant.now()).last(true).build();
|
return this.toBuilder().updated(Instant.now()).last(true).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public PersistedKvMetadata toDeleted() {
|
public PersistedKvMetadata toDeleted() {
|
||||||
return this.toBuilder().updated(Instant.now()).deleted(true).build();
|
return this.toBuilder().updated(Instant.now()).deleted(true).build();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,8 +17,4 @@ public class Namespace implements NamespaceInterface {
|
|||||||
@NotNull
|
@NotNull
|
||||||
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
|
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
|
||||||
protected String id;
|
protected String id;
|
||||||
|
|
||||||
@NotNull
|
|
||||||
@Builder.Default
|
|
||||||
boolean deleted = false;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
package io.kestra.core.models.namespaces;
|
package io.kestra.core.models.namespaces;
|
||||||
|
|
||||||
import io.kestra.core.models.DeletedInterface;
|
|
||||||
import io.kestra.core.models.HasUID;
|
import io.kestra.core.models.HasUID;
|
||||||
|
|
||||||
public interface NamespaceInterface extends DeletedInterface, HasUID {
|
public interface NamespaceInterface extends HasUID {
|
||||||
String getId();
|
String getId();
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -2,8 +2,8 @@ package io.kestra.core.models.namespaces.files;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import io.kestra.core.models.DeletedInterface;
|
|
||||||
import io.kestra.core.models.HasUID;
|
import io.kestra.core.models.HasUID;
|
||||||
|
import io.kestra.core.models.SoftDeletable;
|
||||||
import io.kestra.core.models.TenantInterface;
|
import io.kestra.core.models.TenantInterface;
|
||||||
import io.kestra.core.storages.FileAttributes;
|
import io.kestra.core.storages.FileAttributes;
|
||||||
import io.kestra.core.storages.NamespaceFile;
|
import io.kestra.core.storages.NamespaceFile;
|
||||||
@@ -24,7 +24,7 @@ import java.time.Instant;
|
|||||||
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
||||||
@ToString
|
@ToString
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
public class NamespaceFileMetadata implements DeletedInterface, TenantInterface, HasUID {
|
public class NamespaceFileMetadata implements SoftDeletable<NamespaceFileMetadata>, TenantInterface, HasUID {
|
||||||
@With
|
@With
|
||||||
@Hidden
|
@Hidden
|
||||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||||
@@ -116,6 +116,7 @@ public class NamespaceFileMetadata implements DeletedInterface, TenantInterface,
|
|||||||
return this.toBuilder().updated(saveDate).last(true).build();
|
return this.toBuilder().updated(saveDate).last(true).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public NamespaceFileMetadata toDeleted() {
|
public NamespaceFileMetadata toDeleted() {
|
||||||
return this.toBuilder().deleted(true).updated(Instant.now()).build();
|
return this.toBuilder().deleted(true).updated(Instant.now()).build();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
|||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.runners.RunContext;
|
||||||
|
import io.kestra.core.runners.RunContextProperty;
|
||||||
import io.kestra.core.serializers.JacksonMapper;
|
import io.kestra.core.serializers.JacksonMapper;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import jakarta.validation.constraints.NotNull;
|
import jakarta.validation.constraints.NotNull;
|
||||||
@@ -156,9 +157,9 @@ public class Property<T> {
|
|||||||
/**
|
/**
|
||||||
* Render a property, then convert it to its target type.<br>
|
* Render a property, then convert it to its target type.<br>
|
||||||
* <p>
|
* <p>
|
||||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||||
*
|
*
|
||||||
* @see io.kestra.core.runners.RunContextProperty#as(Class)
|
* @see RunContextProperty#as(Class)
|
||||||
*/
|
*/
|
||||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
|
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||||
return as(property, context, clazz, Map.of());
|
return as(property, context, clazz, Map.of());
|
||||||
@@ -167,25 +168,57 @@ public class Property<T> {
|
|||||||
/**
|
/**
|
||||||
* Render a property with additional variables, then convert it to its target type.<br>
|
* Render a property with additional variables, then convert it to its target type.<br>
|
||||||
* <p>
|
* <p>
|
||||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||||
*
|
*
|
||||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
* @see RunContextProperty#as(Class, Map)
|
||||||
*/
|
*/
|
||||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||||
if (property.skipCache || property.value == null) {
|
if (property.skipCache || property.value == null) {
|
||||||
String rendered = context.render(property.expression, variables);
|
String rendered = context.render(property.expression, variables);
|
||||||
property.value = MAPPER.convertValue(rendered, clazz);
|
property.value = deserialize(rendered, clazz);
|
||||||
}
|
}
|
||||||
|
|
||||||
return property.value;
|
return property.value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <T> T deserialize(Object rendered, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||||
|
try {
|
||||||
|
return MAPPER.convertValue(rendered, clazz);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
if (rendered instanceof String str) {
|
||||||
|
try {
|
||||||
|
return MAPPER.readValue(str, clazz);
|
||||||
|
} catch (JsonProcessingException ex) {
|
||||||
|
throw new IllegalVariableEvaluationException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new IllegalVariableEvaluationException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T> T deserialize(Object rendered, JavaType type) throws IllegalVariableEvaluationException {
|
||||||
|
try {
|
||||||
|
return MAPPER.convertValue(rendered, type);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
if (rendered instanceof String str) {
|
||||||
|
try {
|
||||||
|
return MAPPER.readValue(str, type);
|
||||||
|
} catch (JsonProcessingException ex) {
|
||||||
|
throw new IllegalVariableEvaluationException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new IllegalVariableEvaluationException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Render a property then convert it as a list of target type.<br>
|
* Render a property then convert it as a list of target type.<br>
|
||||||
* <p>
|
* <p>
|
||||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||||
*
|
*
|
||||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
|
* @see RunContextProperty#asList(Class)
|
||||||
*/
|
*/
|
||||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||||
return asList(property, context, itemClazz, Map.of());
|
return asList(property, context, itemClazz, Map.of());
|
||||||
@@ -194,37 +227,39 @@ public class Property<T> {
|
|||||||
/**
|
/**
|
||||||
* Render a property with additional variables, then convert it as a list of target type.<br>
|
* Render a property with additional variables, then convert it as a list of target type.<br>
|
||||||
* <p>
|
* <p>
|
||||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||||
*
|
*
|
||||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
* @see RunContextProperty#asList(Class, Map)
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||||
if (property.skipCache || property.value == null) {
|
if (property.skipCache || property.value == null) {
|
||||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||||
try {
|
String trimmedExpression = property.expression.trim();
|
||||||
String trimmedExpression = property.expression.trim();
|
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
property.value = deserialize(context.render(property.expression, variables), type);
|
||||||
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
|
}
|
||||||
}
|
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
else {
|
||||||
else {
|
List<?> asRawList = deserialize(property.expression, List.class);
|
||||||
List<?> asRawList = MAPPER.readValue(property.expression, List.class);
|
property.value = (T) asRawList.stream()
|
||||||
property.value = (T) asRawList.stream()
|
.map(throwFunction(item -> {
|
||||||
.map(throwFunction(item -> {
|
Object rendered = null;
|
||||||
if (item instanceof String str) {
|
if (item instanceof String str) {
|
||||||
return MAPPER.convertValue(context.render(str, variables), itemClazz);
|
rendered = context.render(str, variables);
|
||||||
} else if (item instanceof Map map) {
|
} else if (item instanceof Map map) {
|
||||||
return MAPPER.convertValue(context.render(map, variables), itemClazz);
|
rendered = context.render(map, variables);
|
||||||
}
|
}
|
||||||
return item;
|
|
||||||
}))
|
if (rendered != null) {
|
||||||
.toList();
|
return deserialize(rendered, itemClazz);
|
||||||
}
|
}
|
||||||
} catch (JsonProcessingException e) {
|
|
||||||
throw new IllegalVariableEvaluationException(e);
|
return item;
|
||||||
|
}))
|
||||||
|
.toList();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,9 +269,9 @@ public class Property<T> {
|
|||||||
/**
|
/**
|
||||||
* Render a property then convert it as a map of target types.<br>
|
* Render a property then convert it as a map of target types.<br>
|
||||||
* <p>
|
* <p>
|
||||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||||
*
|
*
|
||||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
|
* @see RunContextProperty#asMap(Class, Class)
|
||||||
*/
|
*/
|
||||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||||
return asMap(property, runContext, keyClass, valueClass, Map.of());
|
return asMap(property, runContext, keyClass, valueClass, Map.of());
|
||||||
@@ -248,7 +283,7 @@ public class Property<T> {
|
|||||||
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
|
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
|
||||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||||
*
|
*
|
||||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
|
* @see RunContextProperty#asMap(Class, Class, Map)
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||||
@@ -260,12 +295,12 @@ public class Property<T> {
|
|||||||
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
|
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
|
||||||
// Doing that allows us to, if it's an expression, first render then read it as a map.
|
// Doing that allows us to, if it's an expression, first render then read it as a map.
|
||||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||||
property.value = MAPPER.readValue(runContext.render(property.expression, variables), targetMapType);
|
property.value = deserialize(runContext.render(property.expression, variables), targetMapType);
|
||||||
}
|
}
|
||||||
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
|
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
|
||||||
else {
|
else {
|
||||||
Map asRawMap = MAPPER.readValue(property.expression, Map.class);
|
Map asRawMap = MAPPER.readValue(property.expression, Map.class);
|
||||||
property.value = MAPPER.convertValue(runContext.render(asRawMap, variables), targetMapType);
|
property.value = deserialize(runContext.render(asRawMap, variables), targetMapType);
|
||||||
}
|
}
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
throw new IllegalVariableEvaluationException(e);
|
throw new IllegalVariableEvaluationException(e);
|
||||||
|
|||||||
@@ -7,8 +7,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
|||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
|
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
|
||||||
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
|
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
|
||||||
import io.kestra.core.models.DeletedInterface;
|
|
||||||
import io.kestra.core.models.HasUID;
|
import io.kestra.core.models.HasUID;
|
||||||
|
import io.kestra.core.models.SoftDeletable;
|
||||||
import io.kestra.core.models.TenantInterface;
|
import io.kestra.core.models.TenantInterface;
|
||||||
import io.kestra.core.models.tasks.Task;
|
import io.kestra.core.models.tasks.Task;
|
||||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||||
@@ -35,7 +35,7 @@ import jakarta.validation.constraints.Pattern;
|
|||||||
@Introspected
|
@Introspected
|
||||||
@ToString
|
@ToString
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
public class Template implements DeletedInterface, TenantInterface, HasUID {
|
public class Template implements SoftDeletable<Template>, TenantInterface, HasUID {
|
||||||
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
|
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
|
||||||
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
|
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
|
||||||
@Override
|
@Override
|
||||||
@@ -141,6 +141,7 @@ public class Template implements DeletedInterface, TenantInterface, HasUID {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Template toDeleted() {
|
public Template toDeleted() {
|
||||||
return new Template(
|
return new Template(
|
||||||
this.tenantId,
|
this.tenantId,
|
||||||
|
|||||||
@@ -82,6 +82,12 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
|||||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||||
private boolean failOnTriggerError = false;
|
private boolean failOnTriggerError = false;
|
||||||
|
|
||||||
|
@PluginProperty(group = PluginProperty.CORE_GROUP)
|
||||||
|
@Schema(
|
||||||
|
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
|
||||||
|
)
|
||||||
|
private boolean allowConcurrent = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For backward compatibility: we rename minLogLevel to logLevel.
|
* For backward compatibility: we rename minLogLevel to logLevel.
|
||||||
* @deprecated use {@link #logLevel} instead
|
* @deprecated use {@link #logLevel} instead
|
||||||
|
|||||||
@@ -1,22 +1,37 @@
|
|||||||
package io.kestra.core.models.triggers;
|
package io.kestra.core.models.triggers;
|
||||||
|
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
|
import io.kestra.core.models.annotations.PluginProperty;
|
||||||
import io.kestra.core.models.conditions.ConditionContext;
|
import io.kestra.core.models.conditions.ConditionContext;
|
||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.runners.RunContext;
|
||||||
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
|
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public interface Schedulable extends PollingTriggerInterface{
|
public interface Schedulable extends PollingTriggerInterface{
|
||||||
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
|
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
|
||||||
|
|
||||||
|
@Schema(
|
||||||
|
title = "The inputs to pass to the scheduled flow"
|
||||||
|
)
|
||||||
|
@PluginProperty(dynamic = true)
|
||||||
|
Map<String, Object> getInputs();
|
||||||
|
|
||||||
|
@Schema(
|
||||||
|
title = "Action to take in the case of missed schedules",
|
||||||
|
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||||
|
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||||
|
)
|
||||||
|
@PluginProperty
|
||||||
|
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute the previous evaluation of a trigger.
|
* Compute the previous evaluation of a trigger.
|
||||||
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
|
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
|
||||||
*/
|
*/
|
||||||
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
|
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
|
||||||
|
|
||||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load the default RecoverMissedSchedules from plugin property, or else ALL.
|
* Load the default RecoverMissedSchedules from plugin property, or else ALL.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -172,7 +172,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
|||||||
|
|
||||||
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
|
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
|
||||||
try {
|
try {
|
||||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.empty());
|
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, lastTrigger);
|
||||||
} catch (InvalidTriggerConfigurationException e) {
|
} catch (InvalidTriggerConfigurationException e) {
|
||||||
disabled = true;
|
disabled = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,12 +6,9 @@ import io.kestra.core.models.executions.Execution;
|
|||||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||||
import io.kestra.core.models.tasks.Output;
|
import io.kestra.core.models.tasks.Output;
|
||||||
import io.kestra.core.models.flows.State;
|
import io.kestra.core.models.flows.State;
|
||||||
import io.kestra.core.runners.DefaultRunContext;
|
|
||||||
import io.kestra.core.runners.FlowInputOutput;
|
|
||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.runners.RunContext;
|
||||||
import io.kestra.core.utils.IdUtils;
|
import io.kestra.core.utils.IdUtils;
|
||||||
import io.kestra.core.utils.ListUtils;
|
import io.kestra.core.utils.ListUtils;
|
||||||
import java.time.ZonedDateTime;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
public abstract class TriggerService {
|
public abstract class TriggerService {
|
||||||
@@ -51,58 +48,6 @@ public abstract class TriggerService {
|
|||||||
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
|
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Execution generateScheduledExecution(
|
|
||||||
AbstractTrigger trigger,
|
|
||||||
ConditionContext conditionContext,
|
|
||||||
TriggerContext context,
|
|
||||||
List<Label> labels,
|
|
||||||
Map<String, Object> inputs,
|
|
||||||
Map<String, Object> variables,
|
|
||||||
Optional<ZonedDateTime> scheduleDate
|
|
||||||
) {
|
|
||||||
RunContext runContext = conditionContext.getRunContext();
|
|
||||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
|
|
||||||
|
|
||||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
|
||||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
|
||||||
// add a correlation ID if none exist
|
|
||||||
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
|
||||||
}
|
|
||||||
Execution execution = Execution.builder()
|
|
||||||
.id(runContext.getTriggerExecutionId())
|
|
||||||
.tenantId(context.getTenantId())
|
|
||||||
.namespace(context.getNamespace())
|
|
||||||
.flowId(context.getFlowId())
|
|
||||||
.flowRevision(conditionContext.getFlow().getRevision())
|
|
||||||
.variables(conditionContext.getFlow().getVariables())
|
|
||||||
.labels(executionLabels)
|
|
||||||
.state(new State())
|
|
||||||
.trigger(executionTrigger)
|
|
||||||
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
Map<String, Object> allInputs = new HashMap<>();
|
|
||||||
// add flow inputs with default value
|
|
||||||
var flow = conditionContext.getFlow();
|
|
||||||
if (flow.getInputs() != null) {
|
|
||||||
flow.getInputs().stream()
|
|
||||||
.filter(input -> input.getDefaults() != null)
|
|
||||||
.forEach(input -> allInputs.put(input.getId(), input.getDefaults()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (inputs != null) {
|
|
||||||
allInputs.putAll(inputs);
|
|
||||||
}
|
|
||||||
|
|
||||||
// add inputs and inject defaults
|
|
||||||
if (!allInputs.isEmpty()) {
|
|
||||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
|
||||||
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
|
|
||||||
}
|
|
||||||
|
|
||||||
return execution;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Execution generateExecution(
|
private static Execution generateExecution(
|
||||||
String id,
|
String id,
|
||||||
AbstractTrigger trigger,
|
AbstractTrigger trigger,
|
||||||
@@ -111,6 +56,7 @@ public abstract class TriggerService {
|
|||||||
ConditionContext conditionContext
|
ConditionContext conditionContext
|
||||||
) {
|
) {
|
||||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
|
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
|
||||||
|
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||||
// add a correlation ID if none exist
|
// add a correlation ID if none exist
|
||||||
executionLabels.add(new Label(Label.CORRELATION_ID, id));
|
executionLabels.add(new Label(Label.CORRELATION_ID, id));
|
||||||
|
|||||||
@@ -67,6 +67,11 @@ public class ManualConstraintViolation<T> implements ConstraintViolation<T> {
|
|||||||
invalidValue
|
invalidValue
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
public static <T> ConstraintViolationException toConstraintViolationException(
|
||||||
|
Set<? extends ConstraintViolation<?>> constraintViolations
|
||||||
|
) {
|
||||||
|
return new ConstraintViolationException(constraintViolations);
|
||||||
|
}
|
||||||
|
|
||||||
public String getMessageTemplate() {
|
public String getMessageTemplate() {
|
||||||
return "{messageTemplate}";
|
return "{messageTemplate}";
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ public interface KvMetadataRepositoryInterface extends SaveRepositoryInterface<P
|
|||||||
);
|
);
|
||||||
|
|
||||||
default PersistedKvMetadata delete(PersistedKvMetadata persistedKvMetadata) throws IOException {
|
default PersistedKvMetadata delete(PersistedKvMetadata persistedKvMetadata) throws IOException {
|
||||||
return this.save(persistedKvMetadata.toBuilder().deleted(true).build());
|
return this.save(persistedKvMetadata.toDeleted());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
package io.kestra.core.repositories;
|
package io.kestra.core.repositories;
|
||||||
|
|
||||||
import io.kestra.core.models.Setting;
|
import io.kestra.core.models.Setting;
|
||||||
|
import jakarta.validation.ConstraintViolationException;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import jakarta.validation.ConstraintViolationException;
|
|
||||||
|
|
||||||
public interface SettingRepositoryInterface {
|
public interface SettingRepositoryInterface {
|
||||||
Optional<Setting> findByKey(String key);
|
Optional<Setting> findByKey(String key);
|
||||||
@@ -13,5 +13,7 @@ public interface SettingRepositoryInterface {
|
|||||||
|
|
||||||
Setting save(Setting setting) throws ConstraintViolationException;
|
Setting save(Setting setting) throws ConstraintViolationException;
|
||||||
|
|
||||||
|
Setting internalSave(Setting setting) throws ConstraintViolationException;
|
||||||
|
|
||||||
Setting delete(Setting setting);
|
Setting delete(Setting setting);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,8 +16,8 @@ import java.util.function.Function;
|
|||||||
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
|
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
|
||||||
Optional<Trigger> findLast(TriggerContext trigger);
|
Optional<Trigger> findLast(TriggerContext trigger);
|
||||||
|
|
||||||
Optional<Trigger> findByExecution(Execution execution);
|
Optional<Trigger> findByUid(String uid);
|
||||||
|
|
||||||
List<Trigger> findAll(String tenantId);
|
List<Trigger> findAll(String tenantId);
|
||||||
|
|
||||||
List<Trigger> findAllForAllTenants();
|
List<Trigger> findAllForAllTenants();
|
||||||
|
|||||||
@@ -6,10 +6,12 @@ import com.google.common.base.CaseFormat;
|
|||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
import io.kestra.core.metrics.MetricRegistry;
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
|
import io.kestra.core.models.Plugin;
|
||||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||||
import io.kestra.core.models.property.Property;
|
import io.kestra.core.models.property.Property;
|
||||||
import io.kestra.core.models.tasks.Task;
|
import io.kestra.core.models.tasks.Task;
|
||||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||||
|
import io.kestra.core.plugins.PluginConfigurations;
|
||||||
import io.kestra.core.services.KVStoreService;
|
import io.kestra.core.services.KVStoreService;
|
||||||
import io.kestra.core.storages.Storage;
|
import io.kestra.core.storages.Storage;
|
||||||
import io.kestra.core.storages.StorageInterface;
|
import io.kestra.core.storages.StorageInterface;
|
||||||
@@ -235,6 +237,14 @@ public class DefaultRunContext extends RunContext {
|
|||||||
return runContext;
|
return runContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RunContext cloneForPlugin(Plugin plugin) {
|
||||||
|
PluginConfigurations pluginConfigurations = applicationContext.getBean(PluginConfigurations.class);
|
||||||
|
DefaultRunContext runContext = clone();
|
||||||
|
runContext.pluginConfiguration = pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass());
|
||||||
|
return runContext;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@@ -589,6 +599,11 @@ public class DefaultRunContext extends RunContext {
|
|||||||
return localPath;
|
return localPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputAndOutput inputAndOutput() {
|
||||||
|
return new InputAndOutputImpl(this.applicationContext, this);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builder class for constructing new {@link DefaultRunContext} objects.
|
* Builder class for constructing new {@link DefaultRunContext} objects.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -189,12 +189,11 @@ public final class ExecutableUtils {
|
|||||||
variables.put("taskRunIteration", currentTaskRun.getIteration());
|
variables.put("taskRunIteration", currentTaskRun.getIteration());
|
||||||
}
|
}
|
||||||
|
|
||||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
|
||||||
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
|
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
|
||||||
Execution execution = Execution
|
Execution execution = Execution
|
||||||
.newExecution(
|
.newExecution(
|
||||||
flow,
|
flow,
|
||||||
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
|
(f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs),
|
||||||
newLabels,
|
newLabels,
|
||||||
Optional.empty())
|
Optional.empty())
|
||||||
.withTrigger(ExecutionTrigger.builder()
|
.withTrigger(ExecutionTrigger.builder()
|
||||||
|
|||||||
@@ -3,13 +3,13 @@ package io.kestra.core.runners;
|
|||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import io.kestra.core.encryption.EncryptionService;
|
import io.kestra.core.encryption.EncryptionService;
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
|
||||||
|
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.models.flows.Data;
|
import io.kestra.core.models.flows.Data;
|
||||||
import io.kestra.core.models.flows.DependsOn;
|
import io.kestra.core.models.flows.DependsOn;
|
||||||
import io.kestra.core.models.flows.FlowInterface;
|
import io.kestra.core.models.flows.FlowInterface;
|
||||||
import io.kestra.core.models.flows.Input;
|
import io.kestra.core.models.flows.Input;
|
||||||
import io.kestra.core.models.flows.Output;
|
|
||||||
import io.kestra.core.models.flows.RenderableInput;
|
import io.kestra.core.models.flows.RenderableInput;
|
||||||
import io.kestra.core.models.flows.Type;
|
import io.kestra.core.models.flows.Type;
|
||||||
import io.kestra.core.models.flows.input.FileInput;
|
import io.kestra.core.models.flows.input.FileInput;
|
||||||
@@ -19,7 +19,6 @@ import io.kestra.core.models.property.Property;
|
|||||||
import io.kestra.core.models.property.PropertyContext;
|
import io.kestra.core.models.property.PropertyContext;
|
||||||
import io.kestra.core.models.property.URIFetcher;
|
import io.kestra.core.models.property.URIFetcher;
|
||||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
|
||||||
import io.kestra.core.serializers.JacksonMapper;
|
import io.kestra.core.serializers.JacksonMapper;
|
||||||
import io.kestra.core.storages.StorageContext;
|
import io.kestra.core.storages.StorageContext;
|
||||||
import io.kestra.core.storages.StorageInterface;
|
import io.kestra.core.storages.StorageInterface;
|
||||||
@@ -209,8 +208,8 @@ public class FlowInputOutput {
|
|||||||
.filter(InputAndValue::enabled)
|
.filter(InputAndValue::enabled)
|
||||||
.map(it -> {
|
.map(it -> {
|
||||||
//TODO check to return all exception at-once.
|
//TODO check to return all exception at-once.
|
||||||
if (it.exception() != null) {
|
if (it.exceptions() != null && !it.exceptions().isEmpty()) {
|
||||||
throw it.exception();
|
throw InputOutputValidationException.merge(it.exceptions());
|
||||||
}
|
}
|
||||||
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
|
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
|
||||||
})
|
})
|
||||||
@@ -294,13 +293,9 @@ public class FlowInputOutput {
|
|||||||
try {
|
try {
|
||||||
isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get()));
|
isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get()));
|
||||||
} catch (IllegalVariableEvaluationException e) {
|
} catch (IllegalVariableEvaluationException e) {
|
||||||
resolvable.resolveWithError(ManualConstraintViolation.toConstraintViolationException(
|
resolvable.resolveWithError(
|
||||||
"Invalid condition: " + e.getMessage(),
|
InputOutputValidationException.of("Invalid condition: " + e.getMessage())
|
||||||
input,
|
);
|
||||||
(Class<Input>)input.getClass(),
|
|
||||||
input.getId(),
|
|
||||||
this
|
|
||||||
));
|
|
||||||
isInputEnabled = false;
|
isInputEnabled = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -333,7 +328,7 @@ public class FlowInputOutput {
|
|||||||
// validate and parse input value
|
// validate and parse input value
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
if (input.getRequired()) {
|
if (input.getRequired()) {
|
||||||
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
|
resolvable.resolveWithError(InputOutputValidationException.of("Missing required input:" + input.getId()));
|
||||||
} else {
|
} else {
|
||||||
resolvable.resolveWithValue(null);
|
resolvable.resolveWithValue(null);
|
||||||
}
|
}
|
||||||
@@ -343,17 +338,18 @@ public class FlowInputOutput {
|
|||||||
parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue()));
|
parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue()));
|
||||||
parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue()));
|
parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue()));
|
||||||
} catch (ConstraintViolationException e) {
|
} catch (ConstraintViolationException e) {
|
||||||
ConstraintViolationException exception = e.getConstraintViolations().size() == 1 ?
|
Input<?> finalInput = input;
|
||||||
input.toConstraintViolationException(List.copyOf(e.getConstraintViolations()).getFirst().getMessage(), value) :
|
Set<InputOutputValidationException> exceptions = e.getConstraintViolations().stream()
|
||||||
input.toConstraintViolationException(e.getMessage(), value);
|
.map(c-> InputOutputValidationException.of(c.getMessage(), finalInput))
|
||||||
resolvable.resolveWithError(exception);
|
.collect(Collectors.toSet());
|
||||||
|
resolvable.resolveWithError(exceptions);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (ConstraintViolationException e) {
|
} catch (IllegalArgumentException e){
|
||||||
resolvable.resolveWithError(e);
|
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage(), input));
|
||||||
} catch (Exception e) {
|
}
|
||||||
ConstraintViolationException exception = input.toConstraintViolationException(e instanceof IllegalArgumentException ? e.getMessage() : e.toString(), resolvable.get().value());
|
catch (Exception e) {
|
||||||
resolvable.resolveWithError(exception);
|
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage()));
|
||||||
}
|
}
|
||||||
|
|
||||||
return resolvable.get();
|
return resolvable.get();
|
||||||
@@ -441,8 +437,12 @@ public class FlowInputOutput {
|
|||||||
}
|
}
|
||||||
return entry;
|
return entry;
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
}
|
||||||
throw output.toConstraintViolationException(e.getMessage(), current);
|
catch (IllegalArgumentException e){
|
||||||
|
throw InputOutputValidationException.of(e.getMessage(), output);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw InputOutputValidationException.of(e.getMessage());
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.filter(Optional::isPresent)
|
.filter(Optional::isPresent)
|
||||||
@@ -505,7 +505,7 @@ public class FlowInputOutput {
|
|||||||
if (matcher.matches()) {
|
if (matcher.matches()) {
|
||||||
yield current.toString();
|
yield current.toString();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Expected `URI` but received `" + current + "`");
|
throw new IllegalArgumentException("Invalid URI format.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case ARRAY, MULTISELECT -> {
|
case ARRAY, MULTISELECT -> {
|
||||||
@@ -535,34 +535,10 @@ public class FlowInputOutput {
|
|||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
|
throw new Exception(" errors:\n```\n" + e.getMessage() + "\n```");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
|
|
||||||
if (outputs == null) return Map.of();
|
|
||||||
|
|
||||||
// render required outputs
|
|
||||||
Map<String, Object> outputsById = outputs
|
|
||||||
.stream()
|
|
||||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
|
||||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
|
||||||
outputsById = runContext.render(outputsById);
|
|
||||||
|
|
||||||
// render optional outputs one by one to catch, log, and skip any error.
|
|
||||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
|
||||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
|
||||||
try {
|
|
||||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
|
||||||
} catch (Exception e) {
|
|
||||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
|
||||||
outputsById.put(output.getId(), null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return outputsById;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mutable wrapper to hold a flow's input, and it's resolved value.
|
* Mutable wrapper to hold a flow's input, and it's resolved value.
|
||||||
*/
|
*/
|
||||||
@@ -591,27 +567,30 @@ public class FlowInputOutput {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void isDefault(boolean isDefault) {
|
public void isDefault(boolean isDefault) {
|
||||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
|
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exceptions());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setInput(final Input<?> input) {
|
public void setInput(final Input<?> input) {
|
||||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
|
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resolveWithEnabled(boolean enabled) {
|
public void resolveWithEnabled(boolean enabled) {
|
||||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
|
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exceptions());
|
||||||
markAsResolved();
|
markAsResolved();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resolveWithValue(@Nullable Object value) {
|
public void resolveWithValue(@Nullable Object value) {
|
||||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
|
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||||
markAsResolved();
|
markAsResolved();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resolveWithError(@Nullable ConstraintViolationException exception) {
|
public void resolveWithError(@Nullable Set<InputOutputValidationException> exception) {
|
||||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
|
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
|
||||||
markAsResolved();
|
markAsResolved();
|
||||||
}
|
}
|
||||||
|
private void resolveWithError(@Nullable InputOutputValidationException exception){
|
||||||
|
resolveWithError(Collections.singleton(exception));
|
||||||
|
}
|
||||||
|
|
||||||
private void markAsResolved() {
|
private void markAsResolved() {
|
||||||
this.isResolved = true;
|
this.isResolved = true;
|
||||||
|
|||||||
@@ -0,0 +1,29 @@
|
|||||||
|
package io.kestra.core.runners;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.flows.FlowInterface;
|
||||||
|
import io.kestra.core.models.flows.Output;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* InputAndOutput could be used to work with flow execution inputs and outputs.
|
||||||
|
*/
|
||||||
|
public interface InputAndOutput {
|
||||||
|
/**
|
||||||
|
* Reads the inputs of a flow execution.
|
||||||
|
*/
|
||||||
|
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Processes the outputs of a flow execution (parse them based on their types).
|
||||||
|
*/
|
||||||
|
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Render flow execution outputs.
|
||||||
|
*/
|
||||||
|
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
|
||||||
|
}
|
||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package io.kestra.core.runners;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.flows.FlowInterface;
|
||||||
|
import io.kestra.core.models.flows.Output;
|
||||||
|
import io.micronaut.context.ApplicationContext;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
class InputAndOutputImpl implements InputAndOutput {
|
||||||
|
private final FlowInputOutput flowInputOutput;
|
||||||
|
private final RunContext runContext;
|
||||||
|
|
||||||
|
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
|
||||||
|
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
|
||||||
|
this.runContext = runContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
|
||||||
|
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
|
||||||
|
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
|
||||||
|
if (outputs == null) return Map.of();
|
||||||
|
|
||||||
|
// render required outputs
|
||||||
|
Map<String, Object> outputsById = outputs
|
||||||
|
.stream()
|
||||||
|
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||||
|
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||||
|
outputsById = runContext.render(outputsById);
|
||||||
|
|
||||||
|
// render optional outputs one by one to catch, log, and skip any error.
|
||||||
|
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||||
|
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||||
|
try {
|
||||||
|
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||||
|
} catch (Exception e) {
|
||||||
|
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||||
|
outputsById.put(output.getId(), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return outputsById;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import io.kestra.core.encryption.EncryptionService;
|
import io.kestra.core.encryption.EncryptionService;
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
|
import io.kestra.core.models.Plugin;
|
||||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||||
import io.kestra.core.models.property.Property;
|
import io.kestra.core.models.property.Property;
|
||||||
import io.kestra.core.models.property.PropertyContext;
|
import io.kestra.core.models.property.PropertyContext;
|
||||||
@@ -204,4 +205,15 @@ public abstract class RunContext implements PropertyContext {
|
|||||||
* when Namespace ACLs are used (EE).
|
* when Namespace ACLs are used (EE).
|
||||||
*/
|
*/
|
||||||
public abstract AclChecker acl();
|
public abstract AclChecker acl();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clone this run context for a specific plugin.
|
||||||
|
* @return a new run context with the plugin configuration of the given plugin.
|
||||||
|
*/
|
||||||
|
public abstract RunContext cloneForPlugin(Plugin plugin);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return an InputAndOutput that can be used to work with inputs and outputs.
|
||||||
|
*/
|
||||||
|
public abstract InputAndOutput inputAndOutput();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,8 @@
|
|||||||
package io.kestra.core.runners;
|
package io.kestra.core.runners;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import io.kestra.core.models.Plugin;
|
|
||||||
import io.kestra.core.models.executions.TaskRun;
|
import io.kestra.core.models.executions.TaskRun;
|
||||||
import io.kestra.core.models.tasks.Task;
|
import io.kestra.core.models.tasks.Task;
|
||||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
|
||||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||||
import io.kestra.core.models.triggers.TriggerContext;
|
import io.kestra.core.models.triggers.TriggerContext;
|
||||||
import io.kestra.core.plugins.PluginConfigurations;
|
import io.kestra.core.plugins.PluginConfigurations;
|
||||||
@@ -53,20 +51,6 @@ public class RunContextInitializer {
|
|||||||
@Value("${kestra.encryption.secret-key}")
|
@Value("${kestra.encryption.secret-key}")
|
||||||
protected Optional<String> secretKey;
|
protected Optional<String> secretKey;
|
||||||
|
|
||||||
/**
|
|
||||||
* Initializes the given {@link RunContext} for the given {@link Plugin}.
|
|
||||||
*
|
|
||||||
* @param runContext The {@link RunContext} to initialize.
|
|
||||||
* @param plugin The {@link TaskRunner} used for initialization.
|
|
||||||
* @return The {@link RunContext} to initialize
|
|
||||||
*/
|
|
||||||
public DefaultRunContext forPlugin(final DefaultRunContext runContext,
|
|
||||||
final Plugin plugin) {
|
|
||||||
runContext.init(applicationContext);
|
|
||||||
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass()));
|
|
||||||
return runContext;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor.
|
* Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -55,11 +55,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
|||||||
|
|
||||||
public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) {
|
public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) {
|
||||||
if (logEntry.getTaskId() != null) {
|
if (logEntry.getTaskId() != null) {
|
||||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTaskId();
|
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTaskId();
|
||||||
} else if (logEntry.getTriggerId() != null) {
|
} else if (logEntry.getTriggerId() != null) {
|
||||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTriggerId();
|
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTriggerId();
|
||||||
} else {
|
} else {
|
||||||
this.loggerName = "flow." + logEntry.getFlowId();
|
this.loggerName = baseLoggerName(logEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logQueue = logQueue;
|
this.logQueue = logQueue;
|
||||||
@@ -68,6 +68,10 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
|||||||
this.logToFile = logToFile;
|
this.logToFile = logToFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String baseLoggerName(LogEntry logEntry) {
|
||||||
|
return "flow." + logEntry.getTenantId() + "." + logEntry.getNamespace() + "." + logEntry.getFlowId();
|
||||||
|
}
|
||||||
|
|
||||||
private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) {
|
private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) {
|
||||||
Iterable<String> split;
|
Iterable<String> split;
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import io.micronaut.core.annotation.Nullable;
|
|||||||
import io.pebbletemplates.pebble.PebbleEngine;
|
import io.pebbletemplates.pebble.PebbleEngine;
|
||||||
import io.pebbletemplates.pebble.extension.Extension;
|
import io.pebbletemplates.pebble.extension.Extension;
|
||||||
import io.pebbletemplates.pebble.extension.Function;
|
import io.pebbletemplates.pebble.extension.Function;
|
||||||
|
import io.pebbletemplates.pebble.lexer.Syntax;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.inject.Singleton;
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
@@ -37,6 +38,13 @@ public class PebbleEngineFactory {
|
|||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public PebbleEngine createWithCustomSyntax(Syntax syntax, Class<? extends Extension> extension) {
|
||||||
|
PebbleEngine.Builder builder = newPebbleEngineBuilder()
|
||||||
|
.syntax(syntax);
|
||||||
|
this.applicationContext.getBeansOfType(extension).forEach(builder::extension);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
|
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
|
||||||
|
|
||||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||||
|
|||||||
@@ -35,6 +35,10 @@ public final class YamlParser {
|
|||||||
return read(input, cls, type(cls));
|
return read(input, cls, type(cls));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T> T parse(String input, Class<T> cls, Boolean strict) {
|
||||||
|
return strict ? read(input, cls, type(cls)) : readNonStrict(input, cls, type(cls));
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> T parse(Map<String, Object> input, Class<T> cls, Boolean strict) {
|
public static <T> T parse(Map<String, Object> input, Class<T> cls, Boolean strict) {
|
||||||
ObjectMapper currentMapper = strict ? STRICT_MAPPER : NON_STRICT_MAPPER;
|
ObjectMapper currentMapper = strict ? STRICT_MAPPER : NON_STRICT_MAPPER;
|
||||||
|
|
||||||
@@ -81,7 +85,31 @@ public final class YamlParser {
|
|||||||
throw toConstraintViolationException(input, resource, e);
|
throw toConstraintViolationException(input, resource, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
private static <T> T readNonStrict(String input, Class<T> objectClass, String resource) {
|
||||||
|
try {
|
||||||
|
return NON_STRICT_MAPPER.readValue(input, objectClass);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw toConstraintViolationException(input, resource, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private static String formatYamlErrorMessage(String originalMessage, JsonProcessingException e) {
|
||||||
|
StringBuilder friendlyMessage = new StringBuilder();
|
||||||
|
if (originalMessage.contains("Expected a field name")) {
|
||||||
|
friendlyMessage.append("YAML syntax error: Invalid structure. Check indentation and ensure all fields are properly formatted.");
|
||||||
|
} else if (originalMessage.contains("MappingStartEvent")) {
|
||||||
|
friendlyMessage.append("YAML syntax error: Unexpected mapping start. Verify that scalar values are properly quoted if needed.");
|
||||||
|
} else if (originalMessage.contains("Scalar value")) {
|
||||||
|
friendlyMessage.append("YAML syntax error: Expected a simple value but found complex structure. Check for unquoted special characters.");
|
||||||
|
} else {
|
||||||
|
friendlyMessage.append("YAML parsing error: ").append(originalMessage.replaceAll("org\\.yaml\\.snakeyaml.*", "").trim());
|
||||||
|
}
|
||||||
|
if (e.getLocation() != null) {
|
||||||
|
int line = e.getLocation().getLineNr();
|
||||||
|
friendlyMessage.append(String.format(" (at line %d)", line));
|
||||||
|
}
|
||||||
|
// Return a generic but cleaner message for other YAML errors
|
||||||
|
return friendlyMessage.toString();
|
||||||
|
}
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
|
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
|
||||||
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
|
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
|
||||||
@@ -121,11 +149,12 @@ public final class YamlParser {
|
|||||||
)
|
)
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
|
String userFriendlyMessage = formatYamlErrorMessage(e.getMessage(), e);
|
||||||
return new ConstraintViolationException(
|
return new ConstraintViolationException(
|
||||||
"Illegal " + resource + " source: " + e.getMessage(),
|
"Illegal " + resource + " source: " + userFriendlyMessage,
|
||||||
Collections.singleton(
|
Collections.singleton(
|
||||||
ManualConstraintViolation.of(
|
ManualConstraintViolation.of(
|
||||||
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(),
|
userFriendlyMessage,
|
||||||
target,
|
target,
|
||||||
(Class<T>) target.getClass(),
|
(Class<T>) target.getClass(),
|
||||||
"yaml",
|
"yaml",
|
||||||
@@ -136,4 +165,3 @@ public final class YamlParser {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import com.cronutils.utils.VisibleForTesting;
|
|||||||
import io.kestra.core.exceptions.InternalException;
|
import io.kestra.core.exceptions.InternalException;
|
||||||
import io.kestra.core.models.conditions.Condition;
|
import io.kestra.core.models.conditions.Condition;
|
||||||
import io.kestra.core.models.conditions.ConditionContext;
|
import io.kestra.core.models.conditions.ConditionContext;
|
||||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.models.flows.Flow;
|
import io.kestra.core.models.flows.Flow;
|
||||||
import io.kestra.core.models.flows.FlowInterface;
|
import io.kestra.core.models.flows.FlowInterface;
|
||||||
@@ -65,16 +64,6 @@ public class ConditionService {
|
|||||||
return this.valid(flow, conditions, conditionContext);
|
return this.valid(flow, conditions, conditionContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check that all conditions are valid.
|
|
||||||
* Warning, this method throws if a condition cannot be evaluated.
|
|
||||||
*/
|
|
||||||
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
|
|
||||||
return conditions
|
|
||||||
.stream()
|
|
||||||
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check that all conditions are valid.
|
* Check that all conditions are valid.
|
||||||
* Warning, this method throws if a condition cannot be evaluated.
|
* Warning, this method throws if a condition cannot be evaluated.
|
||||||
|
|||||||
@@ -754,7 +754,7 @@ public class ExecutionService {
|
|||||||
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
|
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
|
||||||
Execution newExecution = execution;
|
Execution newExecution = execution;
|
||||||
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
|
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
|
||||||
newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED));
|
newExecution = newExecution.withTaskRun(parentTaskRun.withStateAndAttempt(State.Type.KILLED));
|
||||||
}
|
}
|
||||||
if (parentTaskRun.getParentTaskRunId() != null) {
|
if (parentTaskRun.getParentTaskRunId() != null) {
|
||||||
return killParentTaskruns(parentTaskRun, newExecution);
|
return killParentTaskruns(parentTaskRun, newExecution);
|
||||||
|
|||||||
@@ -92,7 +92,14 @@ public class FlowService {
|
|||||||
return flowRepository
|
return flowRepository
|
||||||
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
|
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
|
||||||
}
|
}
|
||||||
|
private static String formatValidationError(String message) {
|
||||||
|
if (message.startsWith("Illegal flow source:")) {
|
||||||
|
// Already formatted by YamlParser, return as-is
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
// For other validation errors, provide context
|
||||||
|
return "Validation error: " + message;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Evaluates all checks defined in the given flow using the provided inputs.
|
* Evaluates all checks defined in the given flow using the provided inputs.
|
||||||
* <p>
|
* <p>
|
||||||
@@ -174,10 +181,12 @@ public class FlowService {
|
|||||||
modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false));
|
modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false));
|
||||||
|
|
||||||
} catch (ConstraintViolationException e) {
|
} catch (ConstraintViolationException e) {
|
||||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
String friendlyMessage = formatValidationError(e.getMessage());
|
||||||
|
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||||
} catch (FlowProcessingException e) {
|
} catch (FlowProcessingException e) {
|
||||||
if (e.getCause() instanceof ConstraintViolationException) {
|
if (e.getCause() instanceof ConstraintViolationException cve) {
|
||||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
String friendlyMessage = formatValidationError(cve.getMessage());
|
||||||
|
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||||
} else {
|
} else {
|
||||||
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
||||||
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
|
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
|
||||||
@@ -579,4 +588,4 @@ public class FlowService {
|
|||||||
private IllegalStateException noRepositoryException() {
|
private IllegalStateException noRepositoryException() {
|
||||||
return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set.");
|
return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,6 +1,5 @@
|
|||||||
package io.kestra.core.storages;
|
package io.kestra.core.storages;
|
||||||
|
|
||||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
|
||||||
import io.kestra.core.services.NamespaceService;
|
import io.kestra.core.services.NamespaceService;
|
||||||
import jakarta.annotation.Nullable;
|
import jakarta.annotation.Nullable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -272,7 +271,13 @@ public class InternalStorage implements Storage {
|
|||||||
return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream));
|
return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Optional<StorageContext.Task> getTaskStorageContext() {
|
public Optional<StorageContext.Task> getTaskStorageContext() {
|
||||||
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
|
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<FileAttributes> list(URI uri) throws IOException {
|
||||||
|
return this.storage.list(context.getTenantId(), context.getNamespace(), uri);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -173,4 +173,6 @@ public interface Storage {
|
|||||||
* @return the task storage context
|
* @return the task storage context
|
||||||
*/
|
*/
|
||||||
Optional<StorageContext.Task> getTaskStorageContext();
|
Optional<StorageContext.Task> getTaskStorageContext();
|
||||||
|
|
||||||
|
List<FileAttributes> list(URI uri) throws IOException;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
package io.kestra.core.test;
|
package io.kestra.core.test;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import io.kestra.core.models.DeletedInterface;
|
import io.kestra.core.models.SoftDeletable;
|
||||||
import io.kestra.core.models.HasSource;
|
import io.kestra.core.models.HasSource;
|
||||||
import io.kestra.core.models.HasUID;
|
import io.kestra.core.models.HasUID;
|
||||||
import io.kestra.core.models.TenantInterface;
|
import io.kestra.core.models.TenantInterface;
|
||||||
@@ -25,7 +25,7 @@ import java.util.List;
|
|||||||
@ToString
|
@ToString
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
@TestSuiteValidation
|
@TestSuiteValidation
|
||||||
public class TestSuite implements HasUID, TenantInterface, DeletedInterface, HasSource {
|
public class TestSuite implements HasUID, TenantInterface, SoftDeletable<TestSuite>, HasSource {
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
@NotBlank
|
@NotBlank
|
||||||
@@ -85,10 +85,6 @@ public class TestSuite implements HasUID, TenantInterface, DeletedInterface, Has
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestSuite delete() {
|
|
||||||
return this.toBuilder().deleted(true).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
public TestSuite disable() {
|
public TestSuite disable() {
|
||||||
var disabled = true;
|
var disabled = true;
|
||||||
return this.toBuilder()
|
return this.toBuilder()
|
||||||
@@ -120,4 +116,9 @@ public class TestSuite implements HasUID, TenantInterface, DeletedInterface, Has
|
|||||||
|
|
||||||
return yamlSource + String.format("\ndisabled: %s", disabled);
|
return yamlSource + String.format("\ndisabled: %s", disabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestSuite toDeleted() {
|
||||||
|
return toBuilder().deleted(true).build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package io.kestra.core.test;
|
package io.kestra.core.test;
|
||||||
|
|
||||||
import io.kestra.core.models.DeletedInterface;
|
import io.kestra.core.models.SoftDeletable;
|
||||||
import io.kestra.core.models.HasUID;
|
import io.kestra.core.models.HasUID;
|
||||||
import io.kestra.core.models.TenantInterface;
|
import io.kestra.core.models.TenantInterface;
|
||||||
import io.kestra.core.test.flow.UnitTestResult;
|
import io.kestra.core.test.flow.UnitTestResult;
|
||||||
@@ -24,7 +24,7 @@ public record TestSuiteRunEntity(
|
|||||||
String flowId,
|
String flowId,
|
||||||
TestState state,
|
TestState state,
|
||||||
List<UnitTestResult> results
|
List<UnitTestResult> results
|
||||||
) implements DeletedInterface, TenantInterface, HasUID {
|
) implements SoftDeletable<TestSuiteRunEntity>, TenantInterface, HasUID {
|
||||||
|
|
||||||
public static TestSuiteRunEntity create(String tenantId, TestSuiteUid testSuiteUid, TestSuiteRunResult testSuiteRunResult) {
|
public static TestSuiteRunEntity create(String tenantId, TestSuiteUid testSuiteUid, TestSuiteRunResult testSuiteRunResult) {
|
||||||
return new TestSuiteRunEntity(
|
return new TestSuiteRunEntity(
|
||||||
@@ -43,23 +43,6 @@ public record TestSuiteRunEntity(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestSuiteRunEntity delete() {
|
|
||||||
return new TestSuiteRunEntity(
|
|
||||||
this.uid,
|
|
||||||
this.id,
|
|
||||||
this.tenantId,
|
|
||||||
true,
|
|
||||||
this.startDate,
|
|
||||||
this.endDate,
|
|
||||||
this.testSuiteId,
|
|
||||||
this.testSuiteUid,
|
|
||||||
this.namespace,
|
|
||||||
this.flowId,
|
|
||||||
this.state,
|
|
||||||
this.results
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* only used for backup
|
* only used for backup
|
||||||
* @param newTenantId the tenant to migrate to
|
* @param newTenantId the tenant to migrate to
|
||||||
@@ -86,6 +69,24 @@ public record TestSuiteRunEntity(
|
|||||||
return this.deleted;
|
return this.deleted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestSuiteRunEntity toDeleted() {
|
||||||
|
return new TestSuiteRunEntity(
|
||||||
|
this.uid,
|
||||||
|
this.id,
|
||||||
|
this.tenantId,
|
||||||
|
true,
|
||||||
|
this.startDate,
|
||||||
|
this.endDate,
|
||||||
|
this.testSuiteId,
|
||||||
|
this.testSuiteUid,
|
||||||
|
this.namespace,
|
||||||
|
this.flowId,
|
||||||
|
this.state,
|
||||||
|
this.results
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getTenantId() {
|
public String getTenantId() {
|
||||||
return this.tenantId;
|
return this.tenantId;
|
||||||
|
|||||||
@@ -1,13 +1,39 @@
|
|||||||
package io.kestra.core.utils;
|
package io.kestra.core.utils;
|
||||||
|
|
||||||
|
import io.kestra.core.models.Setting;
|
||||||
|
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
import jakarta.inject.Singleton;
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class EditionProvider {
|
public class EditionProvider {
|
||||||
public Edition get() {
|
public Edition get() {
|
||||||
return Edition.OSS;
|
return Edition.OSS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
void start() {
|
||||||
|
// check the edition in the settings and update if needed, we didn't use it would allow us to detect incompatible update later if needed
|
||||||
|
settingRepository.ifPresent(settingRepositoryInterface -> persistEdition(settingRepositoryInterface, get()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void persistEdition(SettingRepositoryInterface settingRepositoryInterface, Edition edition) {
|
||||||
|
Optional<Setting> versionSetting = settingRepositoryInterface.findByKey(Setting.INSTANCE_EDITION);
|
||||||
|
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(edition)) {
|
||||||
|
settingRepositoryInterface.save(Setting.builder()
|
||||||
|
.key(Setting.INSTANCE_EDITION)
|
||||||
|
.value(edition)
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public enum Edition {
|
public enum Edition {
|
||||||
OSS,
|
OSS,
|
||||||
EE
|
EE
|
||||||
|
|||||||
@@ -11,6 +11,11 @@ import jakarta.inject.Inject;
|
|||||||
import jakarta.inject.Singleton;
|
import jakarta.inject.Singleton;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class to create {@link java.util.concurrent.ExecutorService} with {@link java.util.concurrent.ExecutorService} instances.
|
||||||
|
* WARNING: those instances will use the {@link ThreadUncaughtExceptionHandler} which terminates Kestra if an error occurs in any thread,
|
||||||
|
* so it should not be used inside plugins.
|
||||||
|
*/
|
||||||
@Singleton
|
@Singleton
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ExecutorsUtils {
|
public class ExecutorsUtils {
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
import org.slf4j.event.Level;
|
import org.slf4j.event.Level;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class for logging
|
* Utility class for server logging
|
||||||
*/
|
*/
|
||||||
public final class Logs {
|
public final class Logs {
|
||||||
|
|
||||||
@@ -18,7 +18,7 @@ public final class Logs {
|
|||||||
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
|
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
|
||||||
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
|
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
|
||||||
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
|
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
|
||||||
|
|
||||||
private Logs() {}
|
private Logs() {}
|
||||||
|
|
||||||
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
|
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
|
||||||
@@ -29,7 +29,7 @@ public final class Logs {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Log an {@link Execution} via the execution logger named: 'execution.{flowId}'.
|
* Log an {@link Execution} via the executor logger named: 'executor.{tenantId}.{namespace}.{flowId}'.
|
||||||
*/
|
*/
|
||||||
public static void logExecution(Execution execution, Level level, String message, Object... args) {
|
public static void logExecution(Execution execution, Level level, String message, Object... args) {
|
||||||
Logger logger = logger(execution);
|
Logger logger = logger(execution);
|
||||||
@@ -43,7 +43,7 @@ public final class Logs {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Log a {@link TriggerContext} via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
|
* Log a {@link TriggerContext} via the scheduler logger named: 'trigger.{tenantId}.{namespace}.{flowId}.{triggerId}'.
|
||||||
*/
|
*/
|
||||||
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
|
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
|
||||||
Logger logger = logger(triggerContext);
|
Logger logger = logger(triggerContext);
|
||||||
@@ -57,7 +57,7 @@ public final class Logs {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Log a {@link TaskRun} via the taskRun logger named: 'task.{flowId}.{taskId}'.
|
* Log a {@link TaskRun} via the worker logger named: 'worker.{tenantId}.{namespace}.{flowId}.{taskId}'.
|
||||||
*/
|
*/
|
||||||
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
|
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
|
||||||
String prefix = TASKRUN_PREFIX_WITH_TENANT;
|
String prefix = TASKRUN_PREFIX_WITH_TENANT;
|
||||||
@@ -73,19 +73,19 @@ public final class Logs {
|
|||||||
|
|
||||||
private static Logger logger(TaskRun taskRun) {
|
private static Logger logger(TaskRun taskRun) {
|
||||||
return LoggerFactory.getLogger(
|
return LoggerFactory.getLogger(
|
||||||
"task." + taskRun.getFlowId() + "." + taskRun.getTaskId()
|
"worker." + taskRun.getTenantId() + "." + taskRun.getNamespace() + "." + taskRun.getFlowId() + "." + taskRun.getTaskId()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Logger logger(TriggerContext triggerContext) {
|
private static Logger logger(TriggerContext triggerContext) {
|
||||||
return LoggerFactory.getLogger(
|
return LoggerFactory.getLogger(
|
||||||
"trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
|
"scheduler." + triggerContext.getTenantId() + "." + triggerContext.getNamespace() + "." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Logger logger(Execution execution) {
|
private static Logger logger(Execution execution) {
|
||||||
return LoggerFactory.getLogger(
|
return LoggerFactory.getLogger(
|
||||||
"execution." + execution.getFlowId()
|
"executor." + execution.getTenantId() + "." + execution.getNamespace() + "." + execution.getFlowId()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -120,7 +120,10 @@ public class MapUtils {
|
|||||||
private static Collection<?> mergeCollections(Collection<?> colA, Collection<?> colB) {
|
private static Collection<?> mergeCollections(Collection<?> colA, Collection<?> colB) {
|
||||||
List<Object> merged = new ArrayList<>(colA.size() + colB.size());
|
List<Object> merged = new ArrayList<>(colA.size() + colB.size());
|
||||||
merged.addAll(colA);
|
merged.addAll(colA);
|
||||||
merged.addAll(colB);
|
if (!colB.isEmpty()) {
|
||||||
|
List<?> filtered = colB.stream().filter(it -> !colA.contains(it)).toList();
|
||||||
|
merged.addAll(filtered);
|
||||||
|
}
|
||||||
return merged;
|
return merged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,14 +1,12 @@
|
|||||||
package io.kestra.core.utils;
|
package io.kestra.core.utils;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import io.kestra.core.models.executions.metrics.Counter;
|
import io.kestra.core.models.executions.metrics.Counter;
|
||||||
import io.kestra.core.models.executions.metrics.Timer;
|
import io.kestra.core.models.executions.metrics.Timer;
|
||||||
import io.kestra.core.models.tasks.FileExistComportment;
|
import io.kestra.core.models.tasks.FileExistComportment;
|
||||||
import io.kestra.core.models.tasks.NamespaceFiles;
|
import io.kestra.core.models.tasks.NamespaceFiles;
|
||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.runners.RunContext;
|
||||||
import io.kestra.core.storages.NamespaceFile;
|
import io.kestra.core.storages.NamespaceFile;
|
||||||
import jakarta.annotation.PostConstruct;
|
|
||||||
import jakarta.inject.Inject;
|
|
||||||
import jakarta.inject.Singleton;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.time.DurationFormatUtils;
|
import org.apache.commons.lang3.time.DurationFormatUtils;
|
||||||
import org.apache.commons.lang3.time.StopWatch;
|
import org.apache.commons.lang3.time.StopWatch;
|
||||||
@@ -19,28 +17,27 @@ import java.io.InputStream;
|
|||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
|
|
||||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||||
|
|
||||||
@Singleton
|
public final class NamespaceFilesUtils {
|
||||||
public class NamespaceFilesUtils {
|
private static final int maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
|
||||||
@Inject
|
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
|
||||||
private ExecutorsUtils executorsUtils;
|
0,
|
||||||
|
maxThreads,
|
||||||
|
60L,
|
||||||
|
TimeUnit.SECONDS,
|
||||||
|
new SynchronousQueue<>(),
|
||||||
|
new ThreadFactoryBuilder().setNameFormat("namespace-files").build()
|
||||||
|
);;
|
||||||
|
|
||||||
private ExecutorService executorService;
|
private NamespaceFilesUtils() {
|
||||||
private int maxThreads;
|
// utility class pattern
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void postConstruct() {
|
|
||||||
this.maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
|
|
||||||
this.executorService = executorsUtils.maxCachedThreadPool(maxThreads, "namespace-file");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void loadNamespaceFiles(
|
public static void loadNamespaceFiles(
|
||||||
RunContext runContext,
|
RunContext runContext,
|
||||||
NamespaceFiles namespaceFiles
|
NamespaceFiles namespaceFiles
|
||||||
)
|
)
|
||||||
@@ -69,7 +66,7 @@ public class NamespaceFilesUtils {
|
|||||||
int parallelism = maxThreads / 2;
|
int parallelism = maxThreads / 2;
|
||||||
Flux.fromIterable(matchedNamespaceFiles)
|
Flux.fromIterable(matchedNamespaceFiles)
|
||||||
.parallel(parallelism)
|
.parallel(parallelism)
|
||||||
.runOn(Schedulers.fromExecutorService(executorService))
|
.runOn(Schedulers.fromExecutorService(EXECUTOR_SERVICE))
|
||||||
.doOnNext(throwConsumer(nsFile -> {
|
.doOnNext(throwConsumer(nsFile -> {
|
||||||
InputStream content = runContext.storage().getFile(nsFile.uri());
|
InputStream content = runContext.storage().getFile(nsFile.uri());
|
||||||
Path path = folderPerNamespace ?
|
Path path = folderPerNamespace ?
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
|||||||
import io.kestra.core.services.StorageService;
|
import io.kestra.core.services.StorageService;
|
||||||
import io.kestra.core.storages.FileAttributes;
|
import io.kestra.core.storages.FileAttributes;
|
||||||
import io.kestra.core.storages.StorageContext;
|
import io.kestra.core.storages.StorageContext;
|
||||||
import io.kestra.core.storages.StorageInterface;
|
|
||||||
import io.kestra.core.storages.StorageSplitInterface;
|
import io.kestra.core.storages.StorageSplitInterface;
|
||||||
import io.kestra.core.utils.GraphUtils;
|
import io.kestra.core.utils.GraphUtils;
|
||||||
import io.kestra.core.validations.NoSystemLabelValidation;
|
import io.kestra.core.validations.NoSystemLabelValidation;
|
||||||
@@ -540,7 +539,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
|||||||
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
|
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
|
||||||
|
|
||||||
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
|
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
|
||||||
FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext));
|
FileSerde.write(bos, runContext.inputAndOutput().renderOutputs(flow.getOutputs()));
|
||||||
URI uri = runContext.storage().putFile(
|
URI uri = runContext.storage().putFile(
|
||||||
new ByteArrayInputStream(bos.toByteArray()),
|
new ByteArrayInputStream(bos.toByteArray()),
|
||||||
URI.create((String) taskRun.getOutputs().get("uri"))
|
URI.create((String) taskRun.getOutputs().get("uri"))
|
||||||
@@ -602,9 +601,8 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
|||||||
String subflowOutputsBase = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);
|
String subflowOutputsBase = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);
|
||||||
URI subflowOutputsBaseUri = URI.create(StorageContext.KESTRA_PROTOCOL + subflowOutputsBase + "/");
|
URI subflowOutputsBaseUri = URI.create(StorageContext.KESTRA_PROTOCOL + subflowOutputsBase + "/");
|
||||||
|
|
||||||
StorageInterface storage = ((DefaultRunContext) runContext).getApplicationContext().getBean(StorageInterface.class);
|
if (runContext.storage().isFileExist(subflowOutputsBaseUri)) {
|
||||||
if (storage.exists(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri)) {
|
List<FileAttributes> list = runContext.storage().list(subflowOutputsBaseUri);;
|
||||||
List<FileAttributes> list = storage.list(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri);
|
|
||||||
|
|
||||||
if (!list.isEmpty()) {
|
if (!list.isEmpty()) {
|
||||||
// Merge outputs from each sub-flow into a single stored in the internal storage.
|
// Merge outputs from each sub-flow into a single stored in the internal storage.
|
||||||
|
|||||||
@@ -63,7 +63,8 @@ import java.util.*;
|
|||||||
|
|
||||||
- id: run_post_approval
|
- id: run_post_approval
|
||||||
type: io.kestra.plugin.scripts.shell.Commands
|
type: io.kestra.plugin.scripts.shell.Commands
|
||||||
runner: PROCESS
|
taskRunner:
|
||||||
|
type: io.kestra.plugin.core.runner.Process
|
||||||
commands:
|
commands:
|
||||||
- echo "Manual approval received! Continuing the execution..."
|
- echo "Manual approval received! Continuing the execution..."
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,6 @@ import io.kestra.core.models.tasks.ExecutableTask;
|
|||||||
import io.kestra.core.models.tasks.Task;
|
import io.kestra.core.models.tasks.Task;
|
||||||
import io.kestra.core.runners.DefaultRunContext;
|
import io.kestra.core.runners.DefaultRunContext;
|
||||||
import io.kestra.core.runners.ExecutableUtils;
|
import io.kestra.core.runners.ExecutableUtils;
|
||||||
import io.kestra.core.runners.FlowInputOutput;
|
|
||||||
import io.kestra.core.runners.FlowMetaStoreInterface;
|
import io.kestra.core.runners.FlowMetaStoreInterface;
|
||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.runners.RunContext;
|
||||||
import io.kestra.core.runners.SubflowExecution;
|
import io.kestra.core.runners.SubflowExecution;
|
||||||
@@ -38,7 +37,6 @@ import lombok.Getter;
|
|||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.ToString;
|
import lombok.ToString;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
import org.slf4j.event.Level;
|
|
||||||
|
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -246,11 +244,11 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
|||||||
|
|
||||||
if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
|
if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
Map<String, Object> rOutputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext);
|
var inputAndOutput = runContext.inputAndOutput();
|
||||||
|
Map<String, Object> rOutputs = inputAndOutput.renderOutputs(subflowOutputs);
|
||||||
|
|
||||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
if (flow.getOutputs() != null) {
|
||||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
rOutputs = inputAndOutput.typedOutputs(flow, execution, rOutputs);
|
||||||
rOutputs = flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
|
||||||
}
|
}
|
||||||
builder.outputs(rOutputs);
|
builder.outputs(rOutputs);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
@@ -260,8 +260,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
|
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
|
||||||
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class);
|
NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
||||||
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.inputFiles != null) {
|
if (this.inputFiles != null) {
|
||||||
|
|||||||
@@ -2,10 +2,8 @@ package io.kestra.plugin.core.namespace;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
|
||||||
import io.kestra.core.storages.Namespace;
|
import io.kestra.core.storages.Namespace;
|
||||||
import io.kestra.core.storages.NamespaceFile;
|
import io.kestra.core.storages.NamespaceFile;
|
||||||
import io.kestra.plugin.core.kv.Version;
|
|
||||||
import io.micronaut.core.annotation.Introspected;
|
import io.micronaut.core.annotation.Introspected;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
|||||||
@@ -0,0 +1,107 @@
|
|||||||
|
package io.kestra.plugin.core.trigger;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
|
import io.kestra.core.models.Label;
|
||||||
|
import io.kestra.core.models.conditions.ConditionContext;
|
||||||
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||||
|
import io.kestra.core.models.flows.FlowInterface;
|
||||||
|
import io.kestra.core.models.flows.State;
|
||||||
|
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||||
|
import io.kestra.core.models.triggers.Backfill;
|
||||||
|
import io.kestra.core.models.triggers.Schedulable;
|
||||||
|
import io.kestra.core.models.triggers.TriggerContext;
|
||||||
|
import io.kestra.core.runners.RunContext;
|
||||||
|
import io.kestra.core.services.LabelService;
|
||||||
|
import io.kestra.core.utils.ListUtils;
|
||||||
|
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
|
import java.time.chrono.ChronoZonedDateTime;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Factory class for constructing a new {@link Execution} from a {@link Schedulable} trigger.
|
||||||
|
*
|
||||||
|
* @see io.kestra.plugin.core.trigger.Schedule
|
||||||
|
* @see io.kestra.plugin.core.trigger.ScheduleOnDates
|
||||||
|
*/
|
||||||
|
final class SchedulableExecutionFactory {
|
||||||
|
|
||||||
|
static Execution createFailedExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext) throws IllegalVariableEvaluationException {
|
||||||
|
return Execution.builder()
|
||||||
|
.id(conditionContext.getRunContext().getTriggerExecutionId())
|
||||||
|
.tenantId(triggerContext.getTenantId())
|
||||||
|
.namespace(triggerContext.getNamespace())
|
||||||
|
.flowId(triggerContext.getFlowId())
|
||||||
|
.flowRevision(conditionContext.getFlow().getRevision())
|
||||||
|
.labels(SchedulableExecutionFactory.getLabels(trigger, conditionContext.getRunContext(), triggerContext.getBackfill(), conditionContext.getFlow()))
|
||||||
|
.state(new State().withState(State.Type.FAILED))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
static Execution createExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext, Map<String, Object> variables, ZonedDateTime scheduleDate) throws IllegalVariableEvaluationException {
|
||||||
|
RunContext runContext = conditionContext.getRunContext();
|
||||||
|
ExecutionTrigger executionTrigger = ExecutionTrigger.of((AbstractTrigger) trigger, variables);
|
||||||
|
|
||||||
|
List<Label> labels = getLabels(trigger, runContext, triggerContext.getBackfill(), conditionContext.getFlow());
|
||||||
|
|
||||||
|
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
||||||
|
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||||
|
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||||
|
// add a correlation ID if none exist
|
||||||
|
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Execution execution = Execution.builder()
|
||||||
|
.id(runContext.getTriggerExecutionId())
|
||||||
|
.tenantId(triggerContext.getTenantId())
|
||||||
|
.namespace(triggerContext.getNamespace())
|
||||||
|
.flowId(triggerContext.getFlowId())
|
||||||
|
.flowRevision(conditionContext.getFlow().getRevision())
|
||||||
|
.variables(conditionContext.getFlow().getVariables())
|
||||||
|
.labels(executionLabels)
|
||||||
|
.state(new State())
|
||||||
|
.trigger(executionTrigger)
|
||||||
|
.scheduleDate(Optional.ofNullable(scheduleDate).map(ChronoZonedDateTime::toInstant).orElse(null))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Map<String, Object> allInputs = getInputs(trigger, runContext, triggerContext.getBackfill());
|
||||||
|
|
||||||
|
// add inputs and inject defaults (FlowInputOutput handles defaults internally)
|
||||||
|
execution = execution.withInputs(runContext.inputAndOutput().readInputs(conditionContext.getFlow(), execution, allInputs));
|
||||||
|
|
||||||
|
return execution;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, Object> getInputs(Schedulable trigger, RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||||
|
Map<String, Object> inputs = new HashMap<>();
|
||||||
|
|
||||||
|
if (trigger.getInputs() != null) {
|
||||||
|
inputs.putAll(runContext.render(trigger.getInputs()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (backfill != null && backfill.getInputs() != null) {
|
||||||
|
inputs.putAll(runContext.render(backfill.getInputs()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return inputs;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Label> getLabels(Schedulable trigger, RunContext runContext, Backfill backfill, FlowInterface flow) throws IllegalVariableEvaluationException {
|
||||||
|
List<Label> labels = LabelService.fromTrigger(runContext, flow, (AbstractTrigger) trigger);
|
||||||
|
|
||||||
|
if (backfill != null && backfill.getLabels() != null) {
|
||||||
|
for (Label label : backfill.getLabels()) {
|
||||||
|
final var value = runContext.render(label.value());
|
||||||
|
if (value != null) {
|
||||||
|
labels.add(new Label(label.key(), value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return labels;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,9 +6,7 @@ import com.cronutils.model.time.ExecutionTime;
|
|||||||
import com.cronutils.parser.CronParser;
|
import com.cronutils.parser.CronParser;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
|
||||||
import io.kestra.core.exceptions.InternalException;
|
import io.kestra.core.exceptions.InternalException;
|
||||||
import io.kestra.core.models.Label;
|
|
||||||
import io.kestra.core.models.annotations.Example;
|
import io.kestra.core.models.annotations.Example;
|
||||||
import io.kestra.core.models.annotations.Plugin;
|
import io.kestra.core.models.annotations.Plugin;
|
||||||
import io.kestra.core.models.annotations.PluginProperty;
|
import io.kestra.core.models.annotations.PluginProperty;
|
||||||
@@ -16,12 +14,8 @@ import io.kestra.core.models.conditions.Condition;
|
|||||||
import io.kestra.core.models.conditions.ConditionContext;
|
import io.kestra.core.models.conditions.ConditionContext;
|
||||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.models.flows.State;
|
|
||||||
import io.kestra.core.models.triggers.*;
|
import io.kestra.core.models.triggers.*;
|
||||||
import io.kestra.core.runners.DefaultRunContext;
|
|
||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.runners.RunContext;
|
||||||
import io.kestra.core.services.ConditionService;
|
|
||||||
import io.kestra.core.services.LabelService;
|
|
||||||
import io.kestra.core.utils.ListUtils;
|
import io.kestra.core.utils.ListUtils;
|
||||||
import io.kestra.core.validations.ScheduleValidation;
|
import io.kestra.core.validations.ScheduleValidation;
|
||||||
import io.kestra.core.validations.TimezoneId;
|
import io.kestra.core.validations.TimezoneId;
|
||||||
@@ -29,6 +23,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
|
|||||||
import jakarta.validation.Valid;
|
import jakarta.validation.Valid;
|
||||||
import jakarta.validation.constraints.NotNull;
|
import jakarta.validation.constraints.NotNull;
|
||||||
import jakarta.validation.constraints.Null;
|
import jakarta.validation.constraints.Null;
|
||||||
|
import lombok.AccessLevel;
|
||||||
import lombok.*;
|
import lombok.*;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -40,6 +35,8 @@ import java.time.temporal.ChronoUnit;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@ToString
|
@ToString
|
||||||
@@ -224,11 +221,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
|||||||
@PluginProperty
|
@PluginProperty
|
||||||
@Deprecated
|
@Deprecated
|
||||||
private List<ScheduleCondition> scheduleConditions;
|
private List<ScheduleCondition> scheduleConditions;
|
||||||
|
|
||||||
@Schema(
|
|
||||||
title = "The inputs to pass to the scheduled flow"
|
|
||||||
)
|
|
||||||
@PluginProperty(dynamic = true)
|
|
||||||
private Map<String, Object> inputs;
|
private Map<String, Object> inputs;
|
||||||
|
|
||||||
@Schema(
|
@Schema(
|
||||||
@@ -248,13 +241,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
|||||||
@PluginProperty
|
@PluginProperty
|
||||||
@Deprecated
|
@Deprecated
|
||||||
private Map<String, Object> backfill;
|
private Map<String, Object> backfill;
|
||||||
|
|
||||||
@Schema(
|
|
||||||
title = "Action to take in the case of missed schedules",
|
|
||||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
|
||||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
|
||||||
)
|
|
||||||
@PluginProperty
|
|
||||||
private RecoverMissedSchedules recoverMissedSchedules;
|
private RecoverMissedSchedules recoverMissedSchedules;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -403,20 +390,11 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
|||||||
if (!conditionResults) {
|
if (!conditionResults) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
} catch(InternalException ie) {
|
} catch (InternalException ie) {
|
||||||
// validate schedule condition can fail to render variables
|
// validate schedule condition can fail to render variables
|
||||||
// in this case, we return a failed execution so the trigger is not evaluated each second
|
// in this case, we return a failed execution so the trigger is not evaluated each second
|
||||||
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
|
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
|
||||||
Execution execution = Execution.builder()
|
return Optional.of(SchedulableExecutionFactory.createFailedExecution(this, conditionContext, triggerContext));
|
||||||
.id(runContext.getTriggerExecutionId())
|
|
||||||
.tenantId(triggerContext.getTenantId())
|
|
||||||
.namespace(triggerContext.getNamespace())
|
|
||||||
.flowId(triggerContext.getFlowId())
|
|
||||||
.flowRevision(conditionContext.getFlow().getRevision())
|
|
||||||
.labels(generateLabels(runContext, conditionContext, backfill))
|
|
||||||
.state(new State().withState(State.Type.FAILED))
|
|
||||||
.build();
|
|
||||||
return Optional.of(execution);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// recalculate true output for previous and next based on conditions
|
// recalculate true output for previous and next based on conditions
|
||||||
@@ -430,14 +408,12 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
|||||||
variables = scheduleDates.toMap();
|
variables = scheduleDates.toMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
Execution execution = TriggerService.generateScheduledExecution(
|
Execution execution = SchedulableExecutionFactory.createExecution(
|
||||||
this,
|
this,
|
||||||
conditionContext,
|
conditionContext,
|
||||||
triggerContext,
|
triggerContext,
|
||||||
generateLabels(runContext, conditionContext, backfill),
|
|
||||||
generateInputs(runContext, backfill),
|
|
||||||
variables,
|
variables,
|
||||||
Optional.empty()
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
return Optional.of(execution);
|
return Optional.of(execution);
|
||||||
@@ -448,34 +424,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
|||||||
return parser.parse(this.cron);
|
return parser.parse(this.cron);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
|
||||||
List<Label> labels = LabelService.fromTrigger(runContext, conditionContext.getFlow(), this);
|
|
||||||
|
|
||||||
if (backfill != null && backfill.getLabels() != null) {
|
|
||||||
for (Label label : backfill.getLabels()) {
|
|
||||||
final var value = runContext.render(label.value());
|
|
||||||
if (value != null) {
|
|
||||||
labels.add(new Label(label.key(), value));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return labels;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, Object> generateInputs(RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
|
||||||
Map<String, Object> inputs = new HashMap<>();
|
|
||||||
|
|
||||||
if (this.inputs != null) {
|
|
||||||
inputs.putAll(runContext.render(this.inputs));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (backfill != null && backfill.getInputs() != null) {
|
|
||||||
inputs.putAll(runContext.render(backfill.getInputs()));
|
|
||||||
}
|
|
||||||
|
|
||||||
return inputs;
|
|
||||||
}
|
|
||||||
private Optional<Output> scheduleDates(ExecutionTime executionTime, ZonedDateTime date) {
|
private Optional<Output> scheduleDates(ExecutionTime executionTime, ZonedDateTime date) {
|
||||||
Optional<ZonedDateTime> next = executionTime.nextExecution(date.minus(Duration.ofSeconds(1)));
|
Optional<ZonedDateTime> next = executionTime.nextExecution(date.minus(Duration.ofSeconds(1)));
|
||||||
|
|
||||||
@@ -549,9 +497,9 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
|||||||
Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, ZonedDateTime toTestDate, boolean next) throws InternalException {
|
Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, ZonedDateTime toTestDate, boolean next) throws InternalException {
|
||||||
int upperYearBound = ZonedDateTime.now().getYear() + 10;
|
int upperYearBound = ZonedDateTime.now().getYear() + 10;
|
||||||
int lowerYearBound = ZonedDateTime.now().getYear() - 10;
|
int lowerYearBound = ZonedDateTime.now().getYear() - 10;
|
||||||
|
|
||||||
while ((next && toTestDate.getYear() < upperYearBound) || (!next && toTestDate.getYear() > lowerYearBound)) {
|
while ((next && toTestDate.getYear() < upperYearBound) || (!next && toTestDate.getYear() > lowerYearBound)) {
|
||||||
|
|
||||||
Optional<ZonedDateTime> currentDate = next ?
|
Optional<ZonedDateTime> currentDate = next ?
|
||||||
executionTime.nextExecution(toTestDate) :
|
executionTime.nextExecution(toTestDate) :
|
||||||
executionTime.lastExecution(toTestDate);
|
executionTime.lastExecution(toTestDate);
|
||||||
@@ -607,11 +555,10 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
|
|||||||
|
|
||||||
private boolean validateScheduleCondition(ConditionContext conditionContext) throws InternalException {
|
private boolean validateScheduleCondition(ConditionContext conditionContext) throws InternalException {
|
||||||
if (conditions != null) {
|
if (conditions != null) {
|
||||||
ConditionService conditionService = ((DefaultRunContext)conditionContext.getRunContext()).getApplicationContext().getBean(ConditionService.class);
|
return conditions.stream()
|
||||||
return conditionService.isValid(
|
.filter(c -> c instanceof ScheduleCondition)
|
||||||
conditions.stream().filter(c -> c instanceof ScheduleCondition).map(c -> (ScheduleCondition) c).toList(),
|
.map(c -> (ScheduleCondition) c)
|
||||||
conditionContext
|
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import io.kestra.core.models.property.Property;
|
|||||||
import io.kestra.core.models.tasks.VoidOutput;
|
import io.kestra.core.models.tasks.VoidOutput;
|
||||||
import io.kestra.core.models.triggers.*;
|
import io.kestra.core.models.triggers.*;
|
||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.runners.RunContext;
|
||||||
import io.kestra.core.services.LabelService;
|
|
||||||
import io.kestra.core.validations.TimezoneId;
|
import io.kestra.core.validations.TimezoneId;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import jakarta.validation.constraints.NotNull;
|
import jakarta.validation.constraints.NotNull;
|
||||||
@@ -23,7 +22,10 @@ import java.time.Duration;
|
|||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.*;
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||||
@@ -45,11 +47,7 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
|||||||
@Builder.Default
|
@Builder.Default
|
||||||
@Null
|
@Null
|
||||||
private final Duration interval = null;
|
private final Duration interval = null;
|
||||||
|
|
||||||
@Schema(
|
|
||||||
title = "The inputs to pass to the scheduled flow"
|
|
||||||
)
|
|
||||||
@PluginProperty(dynamic = true)
|
|
||||||
private Map<String, Object> inputs;
|
private Map<String, Object> inputs;
|
||||||
|
|
||||||
@TimezoneId
|
@TimezoneId
|
||||||
@@ -63,31 +61,24 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
|||||||
@NotNull
|
@NotNull
|
||||||
private Property<List<ZonedDateTime>> dates;
|
private Property<List<ZonedDateTime>> dates;
|
||||||
|
|
||||||
@Schema(
|
|
||||||
title = "Action to take in the case of missed schedules",
|
|
||||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
|
||||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
|
||||||
)
|
|
||||||
@PluginProperty
|
|
||||||
private RecoverMissedSchedules recoverMissedSchedules;
|
private RecoverMissedSchedules recoverMissedSchedules;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext triggerContext) throws Exception {
|
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext triggerContext) throws Exception {
|
||||||
RunContext runContext = conditionContext.getRunContext();
|
RunContext runContext = conditionContext.getRunContext();
|
||||||
|
|
||||||
ZonedDateTime lastEvaluation = triggerContext.getDate();
|
ZonedDateTime lastEvaluation = triggerContext.getDate();
|
||||||
Optional<ZonedDateTime> nextDate = nextDate(runContext, date -> date.isEqual(lastEvaluation) || date.isAfter(lastEvaluation));
|
Optional<ZonedDateTime> nextDate = nextDate(runContext, date -> date.isEqual(lastEvaluation) || date.isAfter(lastEvaluation));
|
||||||
|
|
||||||
if (nextDate.isPresent()) {
|
if (nextDate.isPresent()) {
|
||||||
log.info("Schedule execution on {}", nextDate.get());
|
log.info("Schedule execution on {}", nextDate.get());
|
||||||
|
|
||||||
Execution execution = TriggerService.generateScheduledExecution(
|
Execution execution = SchedulableExecutionFactory.createExecution(
|
||||||
this,
|
this,
|
||||||
conditionContext,
|
conditionContext,
|
||||||
triggerContext,
|
triggerContext,
|
||||||
LabelService.fromTrigger(runContext, conditionContext.getFlow(), this),
|
|
||||||
this.inputs != null ? runContext.render(this.inputs) : Collections.emptyMap(),
|
|
||||||
Collections.emptyMap(),
|
Collections.emptyMap(),
|
||||||
nextDate
|
nextDate.orElse(null)
|
||||||
);
|
);
|
||||||
|
|
||||||
return Optional.of(execution);
|
return Optional.of(execution);
|
||||||
@@ -97,29 +88,21 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) {
|
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> triggerContext) {
|
||||||
try {
|
return triggerContext
|
||||||
return last
|
.map(ctx -> ctx.getBackfill() != null ? ctx.getBackfill().getCurrentDate() : ctx.getDate())
|
||||||
.map(throwFunction(context ->
|
.map(this::withTimeZone)
|
||||||
nextDate(conditionContext.getRunContext(), date -> date.isAfter(context.getDate()))
|
.or(() -> Optional.of(ZonedDateTime.now()))
|
||||||
.orElse(ZonedDateTime.now().plusYears(1))
|
.flatMap(dt -> {
|
||||||
))
|
try {
|
||||||
.orElse(conditionContext.getRunContext()
|
return nextDate(conditionContext.getRunContext(), date -> date.isAfter(dt));
|
||||||
.render(dates)
|
} catch (IllegalVariableEvaluationException e) {
|
||||||
.asList(ZonedDateTime.class)
|
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
|
||||||
.stream()
|
throw new InvalidTriggerConfigurationException("Failed to evaluate schedule 'dates'. Cause: " + e.getMessage());
|
||||||
.sorted()
|
}
|
||||||
.findFirst()
|
}).orElseGet(() -> ZonedDateTime.now().plusYears(1));
|
||||||
.orElse(ZonedDateTime.now()))
|
|
||||||
.truncatedTo(ChronoUnit.SECONDS);
|
|
||||||
} catch (IllegalVariableEvaluationException e) {
|
|
||||||
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
|
|
||||||
return ZonedDateTime.now().plusYears(1);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ZonedDateTime nextEvaluationDate() {
|
public ZonedDateTime nextEvaluationDate() {
|
||||||
// TODO this may be the next date from now?
|
// TODO this may be the next date from now?
|
||||||
@@ -139,9 +122,17 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
|
|||||||
return previousDates.isEmpty() ? ZonedDateTime.now() : previousDates.getFirst();
|
return previousDates.isEmpty() ? ZonedDateTime.now() : previousDates.getFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> filter) throws IllegalVariableEvaluationException {
|
private ZonedDateTime withTimeZone(ZonedDateTime date) {
|
||||||
return runContext.render(dates).asList(ZonedDateTime.class).stream().sorted()
|
if (this.timezone == null) {
|
||||||
.filter(date -> filter.test(date))
|
return date;
|
||||||
|
}
|
||||||
|
return date.withZoneSameInstant(ZoneId.of(this.timezone));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> predicate) throws IllegalVariableEvaluationException {
|
||||||
|
return runContext.render(dates)
|
||||||
|
.asList(ZonedDateTime.class).stream().sorted()
|
||||||
|
.filter(predicate)
|
||||||
.map(throwFunction(date -> timezone == null ? date : date.withZoneSameInstant(ZoneId.of(runContext.render(timezone)))))
|
.map(throwFunction(date -> timezone == null ? date : date.withZoneSameInstant(ZoneId.of(runContext.render(timezone)))))
|
||||||
.findFirst()
|
.findFirst()
|
||||||
.map(date -> date.truncatedTo(ChronoUnit.SECONDS));
|
.map(date -> date.truncatedTo(ChronoUnit.SECONDS));
|
||||||
|
|||||||
@@ -9,10 +9,14 @@
|
|||||||
<property name="pattern" value="%date{HH:mm:ss}.%ms %highlight(%-5.5level) %magenta(%-12.36thread) %cyan(%-12.36logger{36}) %msg%n" />
|
<property name="pattern" value="%date{HH:mm:ss}.%ms %highlight(%-5.5level) %magenta(%-12.36thread) %cyan(%-12.36logger{36}) %msg%n" />
|
||||||
|
|
||||||
<logger name="io.kestra" level="INFO" />
|
<logger name="io.kestra" level="INFO" />
|
||||||
<logger name="flow" level="INFO" />
|
|
||||||
<logger name="task" level="INFO" />
|
<!-- Flow execution logs - disabled by default -->
|
||||||
<logger name="execution" level="INFO" />
|
<logger name="flow" level="OFF" />
|
||||||
<logger name="trigger" level="INFO" />
|
|
||||||
|
<!-- Server loggers -->
|
||||||
|
<logger name="worker" level="INFO" />
|
||||||
|
<logger name="executor" level="INFO" />
|
||||||
|
<logger name="scheduler" level="INFO" />
|
||||||
|
|
||||||
<logger name="io.kestra.ee.runner.kafka.services.KafkaConsumerService" level="WARN" />
|
<logger name="io.kestra.ee.runner.kafka.services.KafkaConsumerService" level="WARN" />
|
||||||
<logger name="io.kestra.ee.runner.kafka.services.KafkaProducerService" level="WARN" />
|
<logger name="io.kestra.ee.runner.kafka.services.KafkaProducerService" level="WARN" />
|
||||||
|
|||||||
@@ -170,10 +170,11 @@ class JsonSchemaGeneratorTest {
|
|||||||
|
|
||||||
Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class);
|
Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class);
|
||||||
assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf(
|
assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf(
|
||||||
Matchers.aMapWithSize(3),
|
Matchers.aMapWithSize(4),
|
||||||
hasKey("conditions"),
|
hasKey("conditions"),
|
||||||
hasKey("stopAfter"),
|
hasKey("stopAfter"),
|
||||||
hasKey("type")
|
hasKey("type"),
|
||||||
|
hasKey("allowConcurrent")
|
||||||
));
|
));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,24 +1,36 @@
|
|||||||
package io.kestra.core.models.property;
|
package io.kestra.core.models.property;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import io.kestra.core.context.TestRunContextFactory;
|
import io.kestra.core.context.TestRunContextFactory;
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
import io.kestra.core.junit.annotations.KestraTest;
|
import io.kestra.core.junit.annotations.KestraTest;
|
||||||
import io.kestra.core.serializers.FileSerde;
|
import io.kestra.core.serializers.FileSerde;
|
||||||
|
import io.kestra.core.serializers.JacksonMapper;
|
||||||
|
import io.kestra.core.storages.Namespace;
|
||||||
|
import io.kestra.core.storages.NamespaceFile;
|
||||||
import io.kestra.core.storages.StorageInterface;
|
import io.kestra.core.storages.StorageInterface;
|
||||||
|
import io.kestra.plugin.core.namespace.Version;
|
||||||
|
import io.micronaut.core.annotation.Introspected;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.validation.ConstraintViolationException;
|
import jakarta.validation.ConstraintViolationException;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import lombok.experimental.SuperBuilder;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.event.Level;
|
import org.slf4j.event.Level;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||||
import static java.util.Map.entry;
|
import static java.util.Map.entry;
|
||||||
@@ -362,10 +374,43 @@ class PropertyTest {
|
|||||||
assertThat(output.getMessages().getFirst().getValue()).isEqualTo("value1");
|
assertThat(output.getMessages().getFirst().getValue()).isEqualTo("value1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void jsonSubtype() throws JsonProcessingException, IllegalVariableEvaluationException {
|
||||||
|
Optional<WithSubtype> rendered = runContextFactory.of().render(
|
||||||
|
Property.<WithSubtype>ofExpression(JacksonMapper.ofJson().writeValueAsString(new MySubtype()))
|
||||||
|
).as(WithSubtype.class);
|
||||||
|
|
||||||
|
assertThat(rendered).isPresent();
|
||||||
|
assertThat(rendered.get()).isInstanceOf(MySubtype.class);
|
||||||
|
|
||||||
|
List<WithSubtype> renderedList = runContextFactory.of().render(
|
||||||
|
Property.<List<WithSubtype>>ofExpression(JacksonMapper.ofJson().writeValueAsString(List.of(new MySubtype())))
|
||||||
|
).asList(WithSubtype.class);
|
||||||
|
assertThat(renderedList).hasSize(1);
|
||||||
|
assertThat(renderedList.get(0)).isInstanceOf(MySubtype.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
|
||||||
|
@JsonSubTypes({
|
||||||
|
@JsonSubTypes.Type(value = MySubtype.class, name = "mySubtype")
|
||||||
|
})
|
||||||
|
@Getter
|
||||||
|
@NoArgsConstructor
|
||||||
|
@Introspected
|
||||||
|
public abstract static class WithSubtype {
|
||||||
|
abstract public String getType();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
public static class MySubtype extends WithSubtype {
|
||||||
|
private final String type = "mySubtype";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Builder
|
@Builder
|
||||||
@Getter
|
@Getter
|
||||||
private static class TestObj {
|
private static class TestObj {
|
||||||
private String key;
|
private String key;
|
||||||
private String value;
|
private String value;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -60,6 +60,15 @@ class SystemInformationReportTest {
|
|||||||
return setting;
|
return setting;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Setting internalSave(Setting setting) throws ConstraintViolationException {
|
||||||
|
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
|
||||||
|
UUID = setting.getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
return setting;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Setting delete(Setting setting) {
|
public Setting delete(Setting setting) {
|
||||||
return setting;
|
return setting;
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
package io.kestra.core.repositories;
|
package io.kestra.core.repositories;
|
||||||
|
|
||||||
import com.devskiller.friendly_id.FriendlyId;
|
import com.devskiller.friendly_id.FriendlyId;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||||
|
import io.kestra.core.junit.annotations.FlakyTest;
|
||||||
import io.kestra.core.junit.annotations.KestraTest;
|
import io.kestra.core.junit.annotations.KestraTest;
|
||||||
import io.kestra.core.models.Label;
|
import io.kestra.core.models.Label;
|
||||||
import io.kestra.core.models.QueryFilter;
|
import io.kestra.core.models.QueryFilter;
|
||||||
@@ -24,7 +24,6 @@ import io.kestra.core.models.flows.State.Type;
|
|||||||
import io.kestra.core.models.property.Property;
|
import io.kestra.core.models.property.Property;
|
||||||
import io.kestra.core.models.tasks.ResolvedTask;
|
import io.kestra.core.models.tasks.ResolvedTask;
|
||||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||||
import io.kestra.core.serializers.JacksonMapper;
|
|
||||||
import io.kestra.core.utils.IdUtils;
|
import io.kestra.core.utils.IdUtils;
|
||||||
import io.kestra.core.utils.NamespaceUtils;
|
import io.kestra.core.utils.NamespaceUtils;
|
||||||
import io.kestra.core.utils.TestsUtils;
|
import io.kestra.core.utils.TestsUtils;
|
||||||
@@ -42,10 +41,9 @@ import org.junit.jupiter.params.provider.MethodSource;
|
|||||||
import org.slf4j.event.Level;
|
import org.slf4j.event.Level;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.sql.Timestamp;
|
import java.time.Duration;
|
||||||
import java.time.*;
|
import java.time.Instant;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.ZonedDateTime;
|
||||||
import java.time.temporal.ChronoUnit;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
@@ -185,6 +183,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
|||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("filterCombinations")
|
@MethodSource("filterCombinations")
|
||||||
|
@FlakyTest(description = "Filtering tests are sometimes returning 0")
|
||||||
void should_find_all(QueryFilter filter, int expectedSize){
|
void should_find_all(QueryFilter filter, int expectedSize){
|
||||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||||
inject(tenant, "executionTriggerId");
|
inject(tenant, "executionTriggerId");
|
||||||
|
|||||||
@@ -10,82 +10,84 @@ import org.junit.jupiter.api.TestInstance;
|
|||||||
@KestraTest(startRunner = true)
|
@KestraTest(startRunner = true)
|
||||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||||
public abstract class AbstractRunnerConcurrencyTest {
|
public abstract class AbstractRunnerConcurrencyTest {
|
||||||
public static final String TENANT_1 = "tenant1";
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
|
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
|
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel.yml"}, tenantId = "concurrency-cancel")
|
||||||
void concurrencyCancel() throws Exception {
|
void concurrencyCancel() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyCancel();
|
flowConcurrencyCaseTest.flowConcurrencyCancel("concurrency-cancel");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-fail.yml"})
|
@LoadFlows(value = {"flows/valids/flow-concurrency-fail.yml"}, tenantId = "concurrency-fail")
|
||||||
void concurrencyFail() throws Exception {
|
void concurrencyFail() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyFail();
|
flowConcurrencyCaseTest.flowConcurrencyFail("concurrency-fail");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
|
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "concurrency-queue")
|
||||||
void concurrencyQueue() throws Exception {
|
void concurrencyQueue() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyQueue();
|
flowConcurrencyCaseTest.flowConcurrencyQueue("concurrency-queue");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-queue-pause.yml"})
|
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-pause.yml"}, tenantId = "concurrency-queue-pause")
|
||||||
protected void concurrencyQueuePause() throws Exception {
|
protected void concurrencyQueuePause() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyQueuePause();
|
flowConcurrencyCaseTest.flowConcurrencyQueuePause("concurrency-queue-pause");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-cancel-pause.yml"})
|
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel-pause.yml"}, tenantId = "concurrency-cancel-pause")
|
||||||
protected void concurrencyCancelPause() throws Exception {
|
protected void concurrencyCancelPause() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
|
flowConcurrencyCaseTest.flowConcurrencyCancelPause("concurrency-cancel-pause");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
|
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-with-for-each-item")
|
||||||
protected void flowConcurrencyWithForEachItem() throws Exception {
|
protected void flowConcurrencyWithForEachItem() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
|
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem("flow-concurrency-with-for-each-item");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
|
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-fail.yml"}, tenantId = "concurrency-queue-restarted")
|
||||||
protected void concurrencyQueueRestarted() throws Exception {
|
protected void concurrencyQueueRestarted() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
|
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted("concurrency-queue-restarted");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
|
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-after-execution.yml"}, tenantId = "concurrency-queue-after-execution")
|
||||||
void concurrencyQueueAfterExecution() throws Exception {
|
void concurrencyQueueAfterExecution() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
|
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution("concurrency-queue-after-execution");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = TENANT_1)
|
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = "flow-concurrency-subflow")
|
||||||
void flowConcurrencySubflow() throws Exception {
|
void flowConcurrencySubflow() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencySubflow(TENANT_1);
|
flowConcurrencyCaseTest.flowConcurrencySubflow("flow-concurrency-subflow");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@FlakyTest(description = "Only flaky in CI")
|
@FlakyTest(description = "Only flaky in CI")
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"})
|
@LoadFlows(
|
||||||
|
value = {"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"},
|
||||||
|
tenantId = "flow-concurrency-parallel-subflow-kill"
|
||||||
|
)
|
||||||
protected void flowConcurrencyParallelSubflowKill() throws Exception {
|
protected void flowConcurrencyParallelSubflowKill() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill();
|
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill("flow-concurrency-parallel-subflow-kill");
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
|
|
||||||
void flowConcurrencyKilled() throws Exception {
|
|
||||||
flowConcurrencyCaseTest.flowConcurrencyKilled();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@FlakyTest(description = "Only flaky in CI")
|
@FlakyTest(description = "Only flaky in CI")
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
|
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-killed")
|
||||||
|
void flowConcurrencyKilled() throws Exception {
|
||||||
|
flowConcurrencyCaseTest.flowConcurrencyKilled("flow-concurrency-killed");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@FlakyTest(description = "Only flaky in CI")
|
||||||
|
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-queue-killed")
|
||||||
void flowConcurrencyQueueKilled() throws Exception {
|
void flowConcurrencyQueueKilled() throws Exception {
|
||||||
flowConcurrencyCaseTest.flowConcurrencyQueueKilled();
|
flowConcurrencyCaseTest.flowConcurrencyQueueKilled("flow-concurrency-queue-killed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import io.kestra.core.junit.annotations.KestraTest;
|
|||||||
import io.kestra.core.junit.annotations.LoadFlows;
|
import io.kestra.core.junit.annotations.LoadFlows;
|
||||||
import io.kestra.core.models.Label;
|
import io.kestra.core.models.Label;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.executions.TaskRun;
|
||||||
import io.kestra.core.models.flows.Flow;
|
import io.kestra.core.models.flows.Flow;
|
||||||
import io.kestra.core.models.flows.FlowWithSource;
|
import io.kestra.core.models.flows.FlowWithSource;
|
||||||
import io.kestra.core.models.flows.GenericFlow;
|
import io.kestra.core.models.flows.GenericFlow;
|
||||||
@@ -466,4 +467,20 @@ class ExecutionServiceTest {
|
|||||||
assertThat(restart.getTaskRunList()).hasSize(2);
|
assertThat(restart.getTaskRunList()).hasSize(2);
|
||||||
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@LoadFlows({"flows/valids/each-pause.yaml"})
|
||||||
|
void killExecutionWithFlowableTask() throws Exception {
|
||||||
|
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "each-pause");
|
||||||
|
|
||||||
|
TaskRun childTaskRun = execution.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("pause")).toList().getFirst();
|
||||||
|
|
||||||
|
Execution killed = executionService.killParentTaskruns(childTaskRun,execution);
|
||||||
|
|
||||||
|
TaskRun parentTaskRun = killed.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("each_task")).toList().getFirst();
|
||||||
|
|
||||||
|
assertThat(parentTaskRun.getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||||
|
assertThat(parentTaskRun.getAttempts().getLast().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -31,7 +31,6 @@ import java.util.Optional;
|
|||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -57,12 +56,12 @@ public class FlowConcurrencyCaseTest {
|
|||||||
@Named(QueueFactoryInterface.KILL_NAMED)
|
@Named(QueueFactoryInterface.KILL_NAMED)
|
||||||
protected QueueInterface<ExecutionKilled> killQueue;
|
protected QueueInterface<ExecutionKilled> killQueue;
|
||||||
|
|
||||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException {
|
public void flowConcurrencyCancel(String tenantId) throws TimeoutException, QueueException {
|
||||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||||
try {
|
try {
|
||||||
List<Execution> shouldFailExecutions = List.of(
|
List<Execution> shouldFailExecutions = List.of(
|
||||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel"),
|
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel"),
|
||||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel")
|
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel")
|
||||||
);
|
);
|
||||||
assertThat(execution1.getState().isRunning()).isTrue();
|
assertThat(execution1.getState().isRunning()).isTrue();
|
||||||
|
|
||||||
@@ -73,12 +72,12 @@ public class FlowConcurrencyCaseTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flowConcurrencyFail() throws TimeoutException, QueueException {
|
public void flowConcurrencyFail(String tenantId) throws TimeoutException, QueueException {
|
||||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
|
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
|
||||||
try {
|
try {
|
||||||
List<Execution> shouldFailExecutions = List.of(
|
List<Execution> shouldFailExecutions = List.of(
|
||||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail"),
|
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail"),
|
||||||
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail")
|
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail")
|
||||||
);
|
);
|
||||||
|
|
||||||
assertThat(execution1.getState().isRunning()).isTrue();
|
assertThat(execution1.getState().isRunning()).isTrue();
|
||||||
@@ -89,10 +88,10 @@ public class FlowConcurrencyCaseTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flowConcurrencyQueue() throws QueueException {
|
public void flowConcurrencyQueue(String tenantId) throws QueueException {
|
||||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
||||||
Flow flow = flowRepository
|
Flow flow = flowRepository
|
||||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
.findById(tenantId, NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
||||||
.orElseThrow();
|
.orElseThrow();
|
||||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||||
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
||||||
@@ -108,10 +107,10 @@ public class FlowConcurrencyCaseTest {
|
|||||||
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flowConcurrencyQueuePause() throws QueueException {
|
public void flowConcurrencyQueuePause(String tenantId) throws QueueException {
|
||||||
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause");
|
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-queue-pause");
|
||||||
Flow flow = flowRepository
|
Flow flow = flowRepository
|
||||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
|
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
|
||||||
.orElseThrow();
|
.orElseThrow();
|
||||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||||
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
||||||
@@ -127,10 +126,10 @@ public class FlowConcurrencyCaseTest {
|
|||||||
assertThat(secondExecutionResult.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
assertThat(secondExecutionResult.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flowConcurrencyCancelPause() throws QueueException {
|
public void flowConcurrencyCancelPause(String tenantId) throws QueueException {
|
||||||
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause");
|
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-cancel-pause");
|
||||||
Flow flow = flowRepository
|
Flow flow = flowRepository
|
||||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
|
.findById(tenantId, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
|
||||||
.orElseThrow();
|
.orElseThrow();
|
||||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||||
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.CANCELLED), execution2);
|
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.CANCELLED), execution2);
|
||||||
@@ -166,11 +165,11 @@ public class FlowConcurrencyCaseTest {
|
|||||||
.toList()).contains(Type.QUEUED);
|
.toList()).contains(Type.QUEUED);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flowConcurrencyQueueRestarted() throws Exception {
|
public void flowConcurrencyQueueRestarted(String tenantId) throws Exception {
|
||||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE,
|
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE,
|
||||||
"flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
|
"flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
|
||||||
Flow flow = flowRepository
|
Flow flow = flowRepository
|
||||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
|
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
|
||||||
.orElseThrow();
|
.orElseThrow();
|
||||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||||
runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
|
runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
|
||||||
@@ -179,7 +178,10 @@ public class FlowConcurrencyCaseTest {
|
|||||||
// we restart the first one, it should be queued then fail again.
|
// we restart the first one, it should be queued then fail again.
|
||||||
Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1);
|
Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1);
|
||||||
Execution restarted = executionService.restart(failedExecution, null);
|
Execution restarted = executionService.restart(failedExecution, null);
|
||||||
Execution executionResult1 = runnerUtils.restartExecution(e -> e.getState().getCurrent().equals(Type.FAILED), restarted);
|
Execution executionResult1 = runnerUtils.restartExecution(
|
||||||
|
e -> e.getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED) && e.getState().getCurrent().equals(Type.FAILED),
|
||||||
|
restarted
|
||||||
|
);
|
||||||
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2);
|
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2);
|
||||||
|
|
||||||
assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED);
|
assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||||
@@ -193,10 +195,10 @@ public class FlowConcurrencyCaseTest {
|
|||||||
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flowConcurrencyQueueAfterExecution() throws QueueException {
|
public void flowConcurrencyQueueAfterExecution(String tenantId) throws QueueException {
|
||||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
||||||
Flow flow = flowRepository
|
Flow flow = flowRepository
|
||||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
|
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
|
||||||
.orElseThrow();
|
.orElseThrow();
|
||||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||||
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
|
||||||
@@ -216,15 +218,15 @@ public class FlowConcurrencyCaseTest {
|
|||||||
List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel");
|
List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel");
|
||||||
assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED);
|
assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED);
|
||||||
|
|
||||||
// run another execution to be sure that everything work (purge is correctly done)
|
// run another execution to be sure that everything works (purge is correctly done)
|
||||||
Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
|
Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
|
||||||
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||||
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");
|
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flowConcurrencyParallelSubflowKill() throws QueueException {
|
public void flowConcurrencyParallelSubflowKill(String tenantId) throws QueueException {
|
||||||
Execution parent = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
|
Execution parent = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
|
||||||
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
|
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
|
||||||
|
|
||||||
// Kill the parent
|
// Kill the parent
|
||||||
killQueue.emit(ExecutionKilledExecution
|
killQueue.emit(ExecutionKilledExecution
|
||||||
@@ -232,7 +234,7 @@ public class FlowConcurrencyCaseTest {
|
|||||||
.state(ExecutionKilled.State.REQUESTED)
|
.state(ExecutionKilled.State.REQUESTED)
|
||||||
.executionId(parent.getId())
|
.executionId(parent.getId())
|
||||||
.isOnKillCascade(true)
|
.isOnKillCascade(true)
|
||||||
.tenantId(MAIN_TENANT)
|
.tenantId(tenantId)
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -242,11 +244,11 @@ public class FlowConcurrencyCaseTest {
|
|||||||
assertThat(terminated.getTaskRunList()).isNull();
|
assertThat(terminated.getTaskRunList()).isNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flowConcurrencyKilled() throws QueueException, InterruptedException {
|
public void flowConcurrencyKilled(String tenantId) throws QueueException, InterruptedException {
|
||||||
Flow flow = flowRepository
|
Flow flow = flowRepository
|
||||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||||
.orElseThrow();
|
.orElseThrow();
|
||||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||||
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||||
|
|
||||||
@@ -261,7 +263,7 @@ public class FlowConcurrencyCaseTest {
|
|||||||
.state(ExecutionKilled.State.REQUESTED)
|
.state(ExecutionKilled.State.REQUESTED)
|
||||||
.executionId(execution1.getId())
|
.executionId(execution1.getId())
|
||||||
.isOnKillCascade(true)
|
.isOnKillCascade(true)
|
||||||
.tenantId(MAIN_TENANT)
|
.tenantId(tenantId)
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -279,20 +281,19 @@ public class FlowConcurrencyCaseTest {
|
|||||||
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||||
} finally {
|
} finally {
|
||||||
// kill everything to avoid dangling executions
|
// kill everything to avoid dangling executions
|
||||||
runnerUtils.killExecution(execution1);
|
|
||||||
runnerUtils.killExecution(execution2);
|
runnerUtils.killExecution(execution2);
|
||||||
runnerUtils.killExecution(execution3);
|
runnerUtils.killExecution(execution3);
|
||||||
|
|
||||||
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
||||||
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
|
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void flowConcurrencyQueueKilled() throws QueueException, InterruptedException {
|
public void flowConcurrencyQueueKilled(String tenantId) throws QueueException, InterruptedException {
|
||||||
Flow flow = flowRepository
|
Flow flow = flowRepository
|
||||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||||
.orElseThrow();
|
.orElseThrow();
|
||||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||||
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||||
|
|
||||||
@@ -307,7 +308,7 @@ public class FlowConcurrencyCaseTest {
|
|||||||
.state(ExecutionKilled.State.REQUESTED)
|
.state(ExecutionKilled.State.REQUESTED)
|
||||||
.executionId(execution2.getId())
|
.executionId(execution2.getId())
|
||||||
.isOnKillCascade(true)
|
.isOnKillCascade(true)
|
||||||
.tenantId(MAIN_TENANT)
|
.tenantId(tenantId)
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -322,11 +323,10 @@ public class FlowConcurrencyCaseTest {
|
|||||||
} finally {
|
} finally {
|
||||||
// kill everything to avoid dangling executions
|
// kill everything to avoid dangling executions
|
||||||
runnerUtils.killExecution(execution1);
|
runnerUtils.killExecution(execution1);
|
||||||
runnerUtils.killExecution(execution2);
|
|
||||||
runnerUtils.killExecution(execution3);
|
runnerUtils.killExecution(execution3);
|
||||||
|
|
||||||
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
||||||
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
|
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -239,7 +239,7 @@ class FlowInputOutputTest {
|
|||||||
// Then
|
// Then
|
||||||
Assertions.assertEquals(2, values.size());
|
Assertions.assertEquals(2, values.size());
|
||||||
Assertions.assertFalse(values.get(1).enabled());
|
Assertions.assertFalse(values.get(1).enabled());
|
||||||
Assertions.assertNotNull(values.get(1).exception());
|
Assertions.assertNotNull(values.get(1).exceptions());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -257,7 +257,7 @@ class FlowInputOutputTest {
|
|||||||
List<InputAndValue> values = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, data).block();
|
List<InputAndValue> values = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, data).block();
|
||||||
|
|
||||||
// Then
|
// Then
|
||||||
Assertions.assertNull(values.getFirst().exception());
|
Assertions.assertNull(values.getFirst().exceptions());
|
||||||
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
|
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package io.kestra.core.runners;
|
|||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.io.CharStreams;
|
import com.google.common.io.CharStreams;
|
||||||
|
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||||
import io.kestra.core.junit.annotations.KestraTest;
|
import io.kestra.core.junit.annotations.KestraTest;
|
||||||
import io.kestra.core.junit.annotations.LoadFlows;
|
import io.kestra.core.junit.annotations.LoadFlows;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
@@ -137,8 +138,8 @@ public class InputsTest {
|
|||||||
void missingRequired() {
|
void missingRequired() {
|
||||||
HashMap<String, Object> inputs = new HashMap<>(InputsTest.inputs);
|
HashMap<String, Object> inputs = new HashMap<>(InputsTest.inputs);
|
||||||
inputs.put("string", null);
|
inputs.put("string", null);
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(inputs, MAIN_TENANT));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(inputs, MAIN_TENANT));
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `string`, missing required input, but received `null`");
|
assertThat(e.getMessage()).contains("Missing required input:string");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -232,9 +233,9 @@ public class InputsTest {
|
|||||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||||
map.put("validatedString", "foo");
|
map.put("validatedString", "foo");
|
||||||
|
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant4"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant4"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedString`, it must match the pattern");
|
assertThat(e.getMessage()).contains( "Invalid value for input `validatedString`. Cause: it must match the pattern");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -242,15 +243,15 @@ public class InputsTest {
|
|||||||
void inputValidatedIntegerBadValue() {
|
void inputValidatedIntegerBadValue() {
|
||||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||||
mapMin.put("validatedInt", "9");
|
mapMin.put("validatedInt", "9");
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant5"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant5"));
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be more than `10`, but received `9`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be more than `10`");
|
||||||
|
|
||||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||||
mapMax.put("validatedInt", "21");
|
mapMax.put("validatedInt", "21");
|
||||||
|
|
||||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant5"));
|
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant5"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be less than `20`, but received `21`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be less than `20`");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -258,15 +259,15 @@ public class InputsTest {
|
|||||||
void inputValidatedDateBadValue() {
|
void inputValidatedDateBadValue() {
|
||||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||||
mapMin.put("validatedDate", "2022-01-01");
|
mapMin.put("validatedDate", "2022-01-01");
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant6"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant6"));
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be after `2023-01-01`, but received `2022-01-01`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be after `2023-01-01`");
|
||||||
|
|
||||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||||
mapMax.put("validatedDate", "2024-01-01");
|
mapMax.put("validatedDate", "2024-01-01");
|
||||||
|
|
||||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant6"));
|
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant6"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be before `2023-12-31`, but received `2024-01-01`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be before `2023-12-31`");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -274,15 +275,15 @@ public class InputsTest {
|
|||||||
void inputValidatedDateTimeBadValue() {
|
void inputValidatedDateTimeBadValue() {
|
||||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||||
mapMin.put("validatedDateTime", "2022-01-01T00:00:00Z");
|
mapMin.put("validatedDateTime", "2022-01-01T00:00:00Z");
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant7"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant7"));
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be after `2023-01-01T00:00:00Z`, but received `2022-01-01T00:00:00Z`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be after `2023-01-01T00:00:00Z`");
|
||||||
|
|
||||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||||
mapMax.put("validatedDateTime", "2024-01-01T00:00:00Z");
|
mapMax.put("validatedDateTime", "2024-01-01T00:00:00Z");
|
||||||
|
|
||||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant7"));
|
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant7"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be before `2023-12-31T23:59:59Z`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be before `2023-12-31T23:59:59Z`");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -290,15 +291,15 @@ public class InputsTest {
|
|||||||
void inputValidatedDurationBadValue() {
|
void inputValidatedDurationBadValue() {
|
||||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||||
mapMin.put("validatedDuration", "PT1S");
|
mapMin.put("validatedDuration", "PT1S");
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant8"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant8"));
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be more than `PT10S`, but received `PT1S`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be more than `PT10S`");
|
||||||
|
|
||||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||||
mapMax.put("validatedDuration", "PT30S");
|
mapMax.put("validatedDuration", "PT30S");
|
||||||
|
|
||||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant8"));
|
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant8"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be less than `PT20S`, but received `PT30S`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be less than `PT20S`");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -306,15 +307,15 @@ public class InputsTest {
|
|||||||
void inputValidatedFloatBadValue() {
|
void inputValidatedFloatBadValue() {
|
||||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||||
mapMin.put("validatedFloat", "0.01");
|
mapMin.put("validatedFloat", "0.01");
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant9"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant9"));
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be more than `0.1`, but received `0.01`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be more than `0.1`");
|
||||||
|
|
||||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||||
mapMax.put("validatedFloat", "1.01");
|
mapMax.put("validatedFloat", "1.01");
|
||||||
|
|
||||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant9"));
|
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant9"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be less than `0.5`, but received `1.01`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be less than `0.5`");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -322,15 +323,15 @@ public class InputsTest {
|
|||||||
void inputValidatedTimeBadValue() {
|
void inputValidatedTimeBadValue() {
|
||||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||||
mapMin.put("validatedTime", "00:00:01");
|
mapMin.put("validatedTime", "00:00:01");
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant10"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant10"));
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be after `01:00`, but received `00:00:01`");
|
assertThat(e.getMessage()).contains( "Invalid value for input `validatedTime`. Cause: it must be after `01:00`");
|
||||||
|
|
||||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||||
mapMax.put("validatedTime", "14:00:00");
|
mapMax.put("validatedTime", "14:00:00");
|
||||||
|
|
||||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant10"));
|
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant10"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be before `11:59:59`, but received `14:00:00`");
|
assertThat(e.getMessage()).contains("Invalid value for input `validatedTime`. Cause: it must be before `11:59:59`");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -339,9 +340,9 @@ public class InputsTest {
|
|||||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||||
map.put("uri", "http:/bla");
|
map.put("uri", "http:/bla");
|
||||||
|
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant11"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant11"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `uri`, Expected `URI` but received `http:/bla`, but received `http:/bla`");
|
assertThat(e.getMessage()).contains( "Invalid value for input `uri`. Cause: Invalid URI format." );
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -350,9 +351,9 @@ public class InputsTest {
|
|||||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||||
map.put("enum", "INVALID");
|
map.put("enum", "INVALID");
|
||||||
|
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant12"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant12"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).isEqualTo("enum: Invalid input for `enum`, it must match the values `[ENUM_VALUE, OTHER_ONE]`, but received `INVALID`");
|
assertThat(e.getMessage()).isEqualTo("Invalid value for input `enum`. Cause: it must match the values `[ENUM_VALUE, OTHER_ONE]`");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -361,9 +362,9 @@ public class InputsTest {
|
|||||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||||
map.put("array", "[\"s1\", \"s2\"]");
|
map.put("array", "[\"s1\", \"s2\"]");
|
||||||
|
|
||||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant13"));
|
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant13"));
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `array`, Unable to parse array element as `INT` on `s1`, but received `[\"s1\", \"s2\"]`");
|
assertThat(e.getMessage()).contains( "Invalid value for input `array`. Cause: Unable to parse array element as `INT` on `s1`");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -467,7 +468,20 @@ public class InputsTest {
|
|||||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
assertThat((String) execution.findTaskRunsByTaskId("file").getFirst().getOutputs().get("value")).isEqualTo(file.toString());
|
assertThat((String) execution.findTaskRunsByTaskId("file").getFirst().getOutputs().get("value")).isEqualTo(file.toString());
|
||||||
}
|
}
|
||||||
|
@Test
|
||||||
|
@LoadFlows(value = "flows/invalids/inputs-with-multiple-constraint-violations.yaml")
|
||||||
|
void multipleConstraintViolations() {
|
||||||
|
InputOutputValidationException ex = assertThrows(InputOutputValidationException.class, ()-> runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "inputs-with-multiple-constraint-violations", null,
|
||||||
|
(f, e) ->flowIO.readExecutionInputs(f, e , Map.of("multi", List.of("F", "H")) )));
|
||||||
|
|
||||||
|
List<String> messages = Arrays.asList(ex.getMessage().split(System.lineSeparator()));
|
||||||
|
|
||||||
|
assertThat(messages).containsExactlyInAnyOrder(
|
||||||
|
"Invalid value for input `multi`. Cause: you can't define both `values` and `options`",
|
||||||
|
"Invalid value for input `multi`. Cause: value `F` doesn't match the values `[A, B, C]`",
|
||||||
|
"Invalid value for input `multi`. Cause: value `H` doesn't match the values `[A, B, C]`"
|
||||||
|
);
|
||||||
|
}
|
||||||
private URI createFile() throws IOException {
|
private URI createFile() throws IOException {
|
||||||
File tempFile = File.createTempFile("file", ".txt");
|
File tempFile = File.createTempFile("file", ".txt");
|
||||||
Files.write(tempFile.toPath(), "Hello World".getBytes());
|
Files.write(tempFile.toPath(), "Hello World".getBytes());
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package io.kestra.core.runners;
|
package io.kestra.core.runners;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||||
import io.kestra.core.junit.annotations.LoadFlows;
|
import io.kestra.core.junit.annotations.LoadFlows;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
@@ -71,6 +72,6 @@ public class NoEncryptionConfiguredTest implements TestPropertyProvider {
|
|||||||
.flowId(flow.getId())
|
.flowId(flow.getId())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
assertThrows(ConstraintViolationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs));
|
assertThrows(InputOutputValidationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,15 +1,24 @@
|
|||||||
package io.kestra.core.utils;
|
package io.kestra.core.utils;
|
||||||
|
|
||||||
import io.kestra.core.junit.annotations.KestraTest;
|
import io.kestra.core.models.Setting;
|
||||||
|
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||||
|
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
@KestraTest
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@MicronautTest
|
||||||
public class EditionProviderTest {
|
public class EditionProviderTest {
|
||||||
@Inject
|
@Inject
|
||||||
private EditionProvider editionProvider;
|
private EditionProvider editionProvider;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private SettingRepositoryInterface settingRepository;
|
||||||
|
|
||||||
protected EditionProvider.Edition expectedEdition() {
|
protected EditionProvider.Edition expectedEdition() {
|
||||||
return EditionProvider.Edition.OSS;
|
return EditionProvider.Edition.OSS;
|
||||||
}
|
}
|
||||||
@@ -17,5 +26,10 @@ public class EditionProviderTest {
|
|||||||
@Test
|
@Test
|
||||||
void shouldReturnCurrentEdition() {
|
void shouldReturnCurrentEdition() {
|
||||||
Assertions.assertEquals(expectedEdition(), editionProvider.get());
|
Assertions.assertEquals(expectedEdition(), editionProvider.get());
|
||||||
|
|
||||||
|
// check that the edition is persisted in settings
|
||||||
|
Optional<Setting> editionSettings = settingRepository.findByKey(Setting.INSTANCE_EDITION);
|
||||||
|
assertThat(editionSettings).isPresent();
|
||||||
|
assertThat(editionSettings.get().getValue()).isEqualTo(expectedEdition().name());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,48 +1,107 @@
|
|||||||
package io.kestra.core.utils;
|
package io.kestra.core.utils;
|
||||||
|
|
||||||
|
import ch.qos.logback.classic.Logger;
|
||||||
|
import ch.qos.logback.classic.LoggerContext;
|
||||||
|
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||||
|
import ch.qos.logback.core.AppenderBase;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.models.executions.TaskRun;
|
import io.kestra.core.models.executions.TaskRun;
|
||||||
import io.kestra.core.models.flows.Flow;
|
import io.kestra.core.models.flows.Flow;
|
||||||
import io.kestra.core.models.triggers.TriggerContext;
|
import io.kestra.core.models.triggers.TriggerContext;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.slf4j.event.Level;
|
import org.slf4j.event.Level;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
class LogsTest {
|
class LogsTest {
|
||||||
|
|
||||||
|
private static final InMemoryAppender MEMORY_APPENDER = new InMemoryAppender();
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setupLogger() {
|
||||||
|
Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
|
||||||
|
MEMORY_APPENDER.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
|
||||||
|
MEMORY_APPENDER.start();
|
||||||
|
logger.addAppender(MEMORY_APPENDER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
void clearLogs() {
|
||||||
|
MEMORY_APPENDER.clear();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void logFlow() {
|
void logFlow() {
|
||||||
var flow = Flow.builder().namespace("namespace").id("flow").build();
|
var flow = Flow.builder().tenantId("tenant").namespace("namespace").id("flow").build();
|
||||||
Logs.logExecution(flow, log, Level.INFO, "Some log");
|
Logs.logExecution(flow, log, Level.INFO, "Some log");
|
||||||
Logs.logExecution(flow, log, Level.INFO, "Some log with an {}", "attribute");
|
Logs.logExecution(flow, log, Level.INFO, "Some log with an {}", "attribute");
|
||||||
Logs.logExecution(flow, log, Level.ERROR, "Some log with an {} and an error", "attribute", new RuntimeException("Test Exception"));
|
Logs.logExecution(flow, log, Level.ERROR, "Some log with an {} and an error", "attribute", new RuntimeException("Test Exception"));
|
||||||
|
|
||||||
|
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||||
|
assertThat(logs).hasSize(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void logExecution() {
|
void logExecution() {
|
||||||
var execution = Execution.builder().namespace("namespace").flowId("flow").id("execution").build();
|
var execution = Execution.builder().tenantId("tenant").namespace("namespace").flowId("flow").id("execution").build();
|
||||||
Logs.logExecution(execution, log, Level.INFO, "Some log");
|
|
||||||
Logs.logExecution(execution, log, Level.INFO, "Some log with an {}", "attribute");
|
|
||||||
Logs.logExecution(execution, Level.INFO, "Some log");
|
Logs.logExecution(execution, Level.INFO, "Some log");
|
||||||
|
Logs.logExecution(execution, Level.INFO, "Some log with an {}", "attribute");
|
||||||
|
Logs.logExecution(execution, Level.INFO, "Some log");
|
||||||
|
|
||||||
|
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||||
|
assertThat(logs).hasSize(3);
|
||||||
|
assertThat(logs.getFirst().getLoggerName()).isEqualTo("executor.tenant.namespace.flow");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void logTrigger() {
|
void logTrigger() {
|
||||||
var trigger = TriggerContext.builder().namespace("namespace").flowId("flow").triggerId("trigger").build();
|
var trigger = TriggerContext.builder().tenantId("tenant").namespace("namespace").flowId("flow").triggerId("trigger").build();
|
||||||
Logs.logTrigger(trigger, log, Level.INFO, "Some log");
|
|
||||||
Logs.logTrigger(trigger, log, Level.INFO, "Some log with an {}", "attribute");
|
|
||||||
Logs.logTrigger(trigger, Level.INFO, "Some log");
|
Logs.logTrigger(trigger, Level.INFO, "Some log");
|
||||||
|
Logs.logTrigger(trigger, Level.INFO, "Some log with an {}", "attribute");
|
||||||
|
Logs.logTrigger(trigger, Level.INFO, "Some log");
|
||||||
|
|
||||||
|
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||||
|
assertThat(logs).hasSize(3);
|
||||||
|
assertThat(logs.getFirst().getLoggerName()).isEqualTo("scheduler.tenant.namespace.flow.trigger");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void logTaskRun() {
|
void logTaskRun() {
|
||||||
var taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
|
var taskRun = TaskRun.builder().tenantId("tenant").namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
|
||||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
|
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
|
||||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
|
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
|
||||||
|
|
||||||
taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").value("value").build();
|
taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").value("value").build();
|
||||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
|
Logs.logTaskRun(taskRun, Level.INFO, "Some log");
|
||||||
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
|
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
|
||||||
|
|
||||||
|
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
|
||||||
|
assertThat(logs).hasSize(4);
|
||||||
|
assertThat(logs.getFirst().getLoggerName()).isEqualTo("worker.tenant.namespace.flow.task");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class InMemoryAppender extends AppenderBase<ILoggingEvent> {
|
||||||
|
private final List<ILoggingEvent> logs = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void append(ILoggingEvent event) {
|
||||||
|
logs.add(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ILoggingEvent> getLogs() {
|
||||||
|
return logs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clear() {
|
||||||
|
logs.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -216,4 +216,23 @@ class MapUtilsTest {
|
|||||||
"k1.k4", "v2"
|
"k1.k4", "v2"
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
void mergeShouldNotDuplicateListElements() {
|
||||||
|
Map<String, Object> first = Map.of(
|
||||||
|
"key1", "value1",
|
||||||
|
"key2", List.of("something", "else")
|
||||||
|
);
|
||||||
|
Map<String, Object> second = Map.of(
|
||||||
|
"key2", List.of("something", "other"),
|
||||||
|
"key3", "value3"
|
||||||
|
);
|
||||||
|
|
||||||
|
Map<String, Object> results = MapUtils.merge(first, second);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(3);
|
||||||
|
List<String> list = (List<String>) results.get("key2");
|
||||||
|
assertThat(list).hasSize(3);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -20,7 +20,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
|
|||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.net.URI;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
@@ -45,9 +44,6 @@ class NamespaceFilesUtilsTest {
|
|||||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||||
QueueInterface<LogEntry> workerTaskLogQueue;
|
QueueInterface<LogEntry> workerTaskLogQueue;
|
||||||
|
|
||||||
@Inject
|
|
||||||
NamespaceFilesUtils namespaceFilesUtils;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
NamespaceFactory namespaceFactory;
|
NamespaceFactory namespaceFactory;
|
||||||
|
|
||||||
@@ -66,7 +62,7 @@ class NamespaceFilesUtilsTest {
|
|||||||
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
|
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
|
||||||
}
|
}
|
||||||
|
|
||||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
|
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
|
||||||
|
|
||||||
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
||||||
receive.blockLast();
|
receive.blockLast();
|
||||||
@@ -91,7 +87,7 @@ class NamespaceFilesUtilsTest {
|
|||||||
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
|
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
|
||||||
}
|
}
|
||||||
|
|
||||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||||
|
|
||||||
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
||||||
receive.blockLast();
|
receive.blockLast();
|
||||||
@@ -116,7 +112,7 @@ class NamespaceFilesUtilsTest {
|
|||||||
namespaceStorage.putFile(Path.of("/folder2/test.txt"), data);
|
namespaceStorage.putFile(Path.of("/folder2/test.txt"), data);
|
||||||
namespaceStorage.putFile(Path.of("/test.txt"), data);
|
namespaceStorage.putFile(Path.of("/test.txt"), data);
|
||||||
|
|
||||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
|
||||||
|
|
||||||
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
|
||||||
receive.blockLast();
|
receive.blockLast();
|
||||||
@@ -141,7 +137,7 @@ class NamespaceFilesUtilsTest {
|
|||||||
namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data);
|
namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data);
|
||||||
namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data);
|
namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data);
|
||||||
|
|
||||||
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
|
NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
|
||||||
.namespaces(Property.ofValue(List.of(ns1, ns2)))
|
.namespaces(Property.ofValue(List.of(ns1, ns2)))
|
||||||
.folderPerNamespace(Property.ofValue(true))
|
.folderPerNamespace(Property.ofValue(true))
|
||||||
.build());
|
.build());
|
||||||
|
|||||||
@@ -0,0 +1,30 @@
|
|||||||
|
package io.kestra.core.utils;
|
||||||
|
|
||||||
|
import io.kestra.core.models.Setting;
|
||||||
|
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||||
|
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@MicronautTest
|
||||||
|
class VersionProviderTest {
|
||||||
|
@Inject
|
||||||
|
private VersionProvider versionProvider;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private SettingRepositoryInterface settingRepository;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldResolveVersion() {
|
||||||
|
assertThat(versionProvider.getVersion()).endsWith("-SNAPSHOT");
|
||||||
|
|
||||||
|
// check that the version is persisted in settings
|
||||||
|
Optional<Setting> versionSettings = settingRepository.findByKey(Setting.INSTANCE_VERSION);
|
||||||
|
assertThat(versionSettings).isPresent();
|
||||||
|
assertThat(versionSettings.get().getValue()).isEqualTo(versionProvider.getVersion());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -9,9 +9,15 @@ import io.kestra.core.utils.TestsUtils;
|
|||||||
import io.kestra.core.junit.annotations.KestraTest;
|
import io.kestra.core.junit.annotations.KestraTest;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import io.kestra.core.models.validations.ValidateConstraintViolation;
|
||||||
|
import io.kestra.core.services.FlowService;
|
||||||
import jakarta.validation.ConstraintViolationException;
|
import jakarta.validation.ConstraintViolationException;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.core.JsonLocation;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@@ -23,6 +29,107 @@ class FlowValidationTest {
|
|||||||
@Inject
|
@Inject
|
||||||
private ModelValidator modelValidator;
|
private ModelValidator modelValidator;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private FlowService flowService;
|
||||||
|
|
||||||
|
private static final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
// Helper class to create JsonProcessingException with location
|
||||||
|
private static class TestJsonProcessingException extends JsonProcessingException {
|
||||||
|
public TestJsonProcessingException(String msg, JsonLocation location) {
|
||||||
|
super(msg, location);
|
||||||
|
}
|
||||||
|
public TestJsonProcessingException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testFormatYamlErrorMessage_WithExpectedFieldName() throws JsonProcessingException {
|
||||||
|
JsonProcessingException e = new TestJsonProcessingException("Expected a field name", new JsonLocation(null, 100, 5, 10));
|
||||||
|
Object dummyTarget = new Object(); // Dummy target for toConstraintViolationException
|
||||||
|
|
||||||
|
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||||
|
|
||||||
|
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").contains("(at line 5)");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testFormatYamlErrorMessage_WithMappingStartEvent() throws JsonProcessingException {
|
||||||
|
JsonProcessingException e = new TestJsonProcessingException("MappingStartEvent", new JsonLocation(null, 200, 3, 5));
|
||||||
|
Object dummyTarget = new Object();
|
||||||
|
|
||||||
|
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||||
|
|
||||||
|
assertThat(result.getMessage()).contains("YAML syntax error: Unexpected mapping start").contains("(at line 3)");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testFormatYamlErrorMessage_WithScalarValue() throws JsonProcessingException {
|
||||||
|
JsonProcessingException e = new TestJsonProcessingException("Scalar value", new JsonLocation(null, 150, 7, 12));
|
||||||
|
Object dummyTarget = new Object();
|
||||||
|
|
||||||
|
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||||
|
|
||||||
|
assertThat(result.getMessage()).contains("YAML syntax error: Expected a simple value").contains("(at line 7)");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testFormatYamlErrorMessage_GenericError() throws JsonProcessingException {
|
||||||
|
JsonProcessingException e = new TestJsonProcessingException("Some other error", new JsonLocation(null, 50, 2, 8));
|
||||||
|
Object dummyTarget = new Object();
|
||||||
|
|
||||||
|
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||||
|
|
||||||
|
assertThat(result.getMessage()).contains("YAML parsing error: Some other error").contains("(at line 2)");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testFormatYamlErrorMessage_NoLocation() throws JsonProcessingException {
|
||||||
|
JsonProcessingException e = new TestJsonProcessingException("Expected a field name");
|
||||||
|
Object dummyTarget = new Object();
|
||||||
|
|
||||||
|
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
|
||||||
|
|
||||||
|
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").doesNotContain("at line");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testValidateFlowWithYamlSyntaxError() {
|
||||||
|
String invalidYaml = """
|
||||||
|
id: test-flow
|
||||||
|
namespace: io.kestra.unittest
|
||||||
|
tasks:
|
||||||
|
- id:hello
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: {{ abc }}
|
||||||
|
|
||||||
|
""";
|
||||||
|
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", invalidYaml);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.getFirst().getConstraints()).contains("YAML parsing error").contains("at line");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testValidateFlowWithUndefinedVariable() {
|
||||||
|
String yamlWithUndefinedVar = """
|
||||||
|
id: test-flow
|
||||||
|
namespace: io.kestra.unittest
|
||||||
|
tasks:
|
||||||
|
- id: hello
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: {{ undefinedVar }}
|
||||||
|
""";
|
||||||
|
|
||||||
|
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", yamlWithUndefinedVar);
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.getFirst().getConstraints()).contains("Validation error");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void invalidRecursiveFlow() {
|
void invalidRecursiveFlow() {
|
||||||
Flow flow = this.parse("flows/invalids/recursive-flow.yaml");
|
Flow flow = this.parse("flows/invalids/recursive-flow.yaml");
|
||||||
@@ -130,4 +237,4 @@ class FlowValidationTest {
|
|||||||
|
|
||||||
return YamlParser.parse(file, Flow.class);
|
return YamlParser.parse(file, Flow.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
package io.kestra.plugin.core.flow;
|
package io.kestra.plugin.core.flow;
|
||||||
|
|
||||||
import com.google.common.io.CharStreams;
|
import com.google.common.io.CharStreams;
|
||||||
|
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||||
import io.kestra.core.junit.annotations.FlakyTest;
|
import io.kestra.core.junit.annotations.FlakyTest;
|
||||||
import io.kestra.core.junit.annotations.KestraTest;
|
import io.kestra.core.junit.annotations.KestraTest;
|
||||||
@@ -328,12 +329,12 @@ public class PauseTest {
|
|||||||
|
|
||||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.PAUSED);
|
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.PAUSED);
|
||||||
|
|
||||||
ConstraintViolationException e = assertThrows(
|
InputOutputValidationException e = assertThrows(
|
||||||
ConstraintViolationException.class,
|
InputOutputValidationException.class,
|
||||||
() -> executionService.resume(execution, flow, State.Type.RUNNING, Mono.empty(), Pause.Resumed.now()).block()
|
() -> executionService.resume(execution, flow, State.Type.RUNNING, Mono.empty(), Pause.Resumed.now()).block()
|
||||||
);
|
);
|
||||||
|
|
||||||
assertThat(e.getMessage()).contains("Invalid input for `asked`, missing required input, but received `null`");
|
assertThat(e.getMessage()).contains( "Missing required input:asked");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.Output;
|
|||||||
import io.kestra.core.models.flows.State;
|
import io.kestra.core.models.flows.State;
|
||||||
import io.kestra.core.models.flows.State.History;
|
import io.kestra.core.models.flows.State.History;
|
||||||
import io.kestra.core.runners.DefaultRunContext;
|
import io.kestra.core.runners.DefaultRunContext;
|
||||||
|
import io.kestra.core.runners.InputAndOutput;
|
||||||
import io.kestra.core.runners.SubflowExecutionResult;
|
import io.kestra.core.runners.SubflowExecutionResult;
|
||||||
import io.kestra.core.services.VariablesService;
|
import io.kestra.core.services.VariablesService;
|
||||||
import io.micronaut.context.ApplicationContext;
|
import io.micronaut.context.ApplicationContext;
|
||||||
@@ -46,11 +47,15 @@ class SubflowTest {
|
|||||||
@Mock
|
@Mock
|
||||||
private ApplicationContext applicationContext;
|
private ApplicationContext applicationContext;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private InputAndOutput inputAndOutput;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void beforeEach() {
|
void beforeEach() {
|
||||||
Mockito.when(applicationContext.getBean(VariablesService.class)).thenReturn(new VariablesService());
|
Mockito.when(applicationContext.getBean(VariablesService.class)).thenReturn(new VariablesService());
|
||||||
Mockito.when(runContext.logger()).thenReturn(LOG);
|
Mockito.when(runContext.logger()).thenReturn(LOG);
|
||||||
Mockito.when(runContext.getApplicationContext()).thenReturn(applicationContext);
|
Mockito.when(runContext.getApplicationContext()).thenReturn(applicationContext);
|
||||||
|
Mockito.when(runContext.inputAndOutput()).thenReturn(inputAndOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -118,7 +123,7 @@ class SubflowTest {
|
|||||||
|
|
||||||
Map<String, Object> outputs = Map.of("key", "value");
|
Map<String, Object> outputs = Map.of("key", "value");
|
||||||
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(outputs);
|
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(outputs);
|
||||||
|
Mockito.when(inputAndOutput.renderOutputs(Mockito.anyList())).thenReturn(Map.of("key", "value"));
|
||||||
|
|
||||||
Subflow subflow = Subflow.builder()
|
Subflow subflow = Subflow.builder()
|
||||||
.outputs(outputs)
|
.outputs(outputs)
|
||||||
@@ -159,6 +164,7 @@ class SubflowTest {
|
|||||||
|
|
||||||
Output output = Output.builder().id("key").value("value").build();
|
Output output = Output.builder().id("key").value("value").build();
|
||||||
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue()));
|
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue()));
|
||||||
|
Mockito.when(inputAndOutput.typedOutputs(Mockito.any(), Mockito.any(), Mockito.anyMap())).thenReturn(Map.of("key", "value"));
|
||||||
Flow flow = Flow.builder()
|
Flow flow = Flow.builder()
|
||||||
.outputs(List.of(output))
|
.outputs(List.of(output))
|
||||||
.build();
|
.build();
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ class ScheduleOnDatesTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() throws Exception {
|
public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() {
|
||||||
// given
|
// given
|
||||||
var now = ZonedDateTime.now();
|
var now = ZonedDateTime.now();
|
||||||
var before = now.minusMinutes(1).truncatedTo(ChronoUnit.SECONDS);
|
var before = now.minusMinutes(1).truncatedTo(ChronoUnit.SECONDS);
|
||||||
@@ -75,7 +75,7 @@ class ScheduleOnDatesTest {
|
|||||||
ZonedDateTime nextDate = scheduleOnDates.nextEvaluationDate(conditionContext, Optional.empty());
|
ZonedDateTime nextDate = scheduleOnDates.nextEvaluationDate(conditionContext, Optional.empty());
|
||||||
|
|
||||||
// then
|
// then
|
||||||
assertThat(nextDate).isEqualTo(before);
|
assertThat(nextDate).isEqualTo(after);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import io.kestra.core.models.executions.Execution;
|
|||||||
import io.kestra.core.models.flows.Flow;
|
import io.kestra.core.models.flows.Flow;
|
||||||
import io.kestra.core.models.flows.Type;
|
import io.kestra.core.models.flows.Type;
|
||||||
import io.kestra.core.models.flows.input.StringInput;
|
import io.kestra.core.models.flows.input.StringInput;
|
||||||
|
import io.kestra.core.models.flows.input.MultiselectInput;
|
||||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||||
import io.kestra.core.models.triggers.TriggerContext;
|
import io.kestra.core.models.triggers.TriggerContext;
|
||||||
import io.kestra.core.runners.RunContextFactory;
|
import io.kestra.core.runners.RunContextFactory;
|
||||||
@@ -103,8 +104,9 @@ class ScheduleTest {
|
|||||||
);
|
);
|
||||||
|
|
||||||
assertThat(evaluate.isPresent()).isTrue();
|
assertThat(evaluate.isPresent()).isTrue();
|
||||||
assertThat(evaluate.get().getLabels()).hasSize(3);
|
assertThat(evaluate.get().getLabels()).hasSize(4);
|
||||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||||
|
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||||
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
||||||
var vars = evaluate.get().getTrigger().getVariables();
|
var vars = evaluate.get().getTrigger().getVariables();
|
||||||
var inputs = evaluate.get().getInputs();
|
var inputs = evaluate.get().getInputs();
|
||||||
@@ -137,8 +139,9 @@ class ScheduleTest {
|
|||||||
);
|
);
|
||||||
|
|
||||||
assertThat(evaluate.isPresent()).isTrue();
|
assertThat(evaluate.isPresent()).isTrue();
|
||||||
assertThat(evaluate.get().getLabels()).hasSize(3);
|
assertThat(evaluate.get().getLabels()).hasSize(4);
|
||||||
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
|
||||||
|
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
|
||||||
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
||||||
var inputs = evaluate.get().getInputs();
|
var inputs = evaluate.get().getInputs();
|
||||||
|
|
||||||
@@ -475,6 +478,81 @@ class ScheduleTest {
|
|||||||
assertThat(result.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
assertThat(result.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void successWithMultiselectInputDefaults() throws Exception {
|
||||||
|
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
|
||||||
|
|
||||||
|
ZonedDateTime date = ZonedDateTime.now()
|
||||||
|
.withDayOfMonth(1)
|
||||||
|
.withHour(0)
|
||||||
|
.withMinute(0)
|
||||||
|
.withSecond(0)
|
||||||
|
.truncatedTo(ChronoUnit.SECONDS)
|
||||||
|
.minusMonths(1);
|
||||||
|
|
||||||
|
Optional<Execution> evaluate = trigger.evaluate(
|
||||||
|
conditionContextWithMultiselectInput(trigger),
|
||||||
|
triggerContext(date, trigger));
|
||||||
|
|
||||||
|
assertThat(evaluate.isPresent()).isTrue();
|
||||||
|
var inputs = evaluate.get().getInputs();
|
||||||
|
|
||||||
|
// Verify MULTISELECT input with explicit defaults works correctly
|
||||||
|
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option1", "option2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void successWithMultiselectInputAutoSelectFirst() throws Exception {
|
||||||
|
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
|
||||||
|
|
||||||
|
ZonedDateTime date = ZonedDateTime.now()
|
||||||
|
.withDayOfMonth(1)
|
||||||
|
.withHour(0)
|
||||||
|
.withMinute(0)
|
||||||
|
.withSecond(0)
|
||||||
|
.truncatedTo(ChronoUnit.SECONDS)
|
||||||
|
.minusMonths(1);
|
||||||
|
|
||||||
|
Optional<Execution> evaluate = trigger.evaluate(
|
||||||
|
conditionContextWithMultiselectAutoSelectFirst(trigger),
|
||||||
|
triggerContext(date, trigger));
|
||||||
|
|
||||||
|
assertThat(evaluate.isPresent()).isTrue();
|
||||||
|
var inputs = evaluate.get().getInputs();
|
||||||
|
|
||||||
|
// Verify MULTISELECT input with autoSelectFirst defaults to first option
|
||||||
|
assertThat(inputs.get("multiselectAutoSelect")).isEqualTo(List.of("first"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void successWithMultiselectInputProvidedValue() throws Exception {
|
||||||
|
// Test that provided values override defaults for MULTISELECT
|
||||||
|
Schedule trigger = Schedule.builder()
|
||||||
|
.id("schedule")
|
||||||
|
.type(Schedule.class.getName())
|
||||||
|
.cron("0 0 1 * *")
|
||||||
|
.inputs(Map.of("multiselectInput", List.of("option3")))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
ZonedDateTime date = ZonedDateTime.now()
|
||||||
|
.withDayOfMonth(1)
|
||||||
|
.withHour(0)
|
||||||
|
.withMinute(0)
|
||||||
|
.withSecond(0)
|
||||||
|
.truncatedTo(ChronoUnit.SECONDS)
|
||||||
|
.minusMonths(1);
|
||||||
|
|
||||||
|
Optional<Execution> evaluate = trigger.evaluate(
|
||||||
|
conditionContextWithMultiselectInput(trigger),
|
||||||
|
triggerContext(date, trigger));
|
||||||
|
|
||||||
|
assertThat(evaluate.isPresent()).isTrue();
|
||||||
|
var inputs = evaluate.get().getInputs();
|
||||||
|
|
||||||
|
// Verify provided value overrides defaults
|
||||||
|
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option3"));
|
||||||
|
}
|
||||||
|
|
||||||
private ConditionContext conditionContext(AbstractTrigger trigger) {
|
private ConditionContext conditionContext(AbstractTrigger trigger) {
|
||||||
Flow flow = Flow.builder()
|
Flow flow = Flow.builder()
|
||||||
.id(IdUtils.create())
|
.id(IdUtils.create())
|
||||||
@@ -504,17 +582,79 @@ class ScheduleTest {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ConditionContext conditionContextWithMultiselectInput(AbstractTrigger trigger) {
|
||||||
|
Flow flow = Flow.builder()
|
||||||
|
.id(IdUtils.create())
|
||||||
|
.namespace("io.kestra.tests")
|
||||||
|
.labels(
|
||||||
|
List.of(
|
||||||
|
new Label("flow-label-1", "flow-label-1"),
|
||||||
|
new Label("flow-label-2", "flow-label-2")))
|
||||||
|
.variables(Map.of("custom_var", "VARIABLE VALUE"))
|
||||||
|
.inputs(List.of(
|
||||||
|
MultiselectInput.builder()
|
||||||
|
.id("multiselectInput")
|
||||||
|
.type(Type.MULTISELECT)
|
||||||
|
.values(List.of("option1", "option2", "option3"))
|
||||||
|
.defaults(Property.ofValue(List.of("option1", "option2")))
|
||||||
|
.build()))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
TriggerContext triggerContext = TriggerContext.builder()
|
||||||
|
.namespace(flow.getNamespace())
|
||||||
|
.flowId(flow.getId())
|
||||||
|
.triggerId(trigger.getId())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return ConditionContext.builder()
|
||||||
|
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
|
||||||
|
triggerContext, trigger))
|
||||||
|
.flow(flow)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ConditionContext conditionContextWithMultiselectAutoSelectFirst(AbstractTrigger trigger) {
|
||||||
|
Flow flow = Flow.builder()
|
||||||
|
.id(IdUtils.create())
|
||||||
|
.namespace("io.kestra.tests")
|
||||||
|
.labels(
|
||||||
|
List.of(
|
||||||
|
new Label("flow-label-1", "flow-label-1"),
|
||||||
|
new Label("flow-label-2", "flow-label-2")))
|
||||||
|
.variables(Map.of("custom_var", "VARIABLE VALUE"))
|
||||||
|
.inputs(List.of(
|
||||||
|
MultiselectInput.builder()
|
||||||
|
.id("multiselectAutoSelect")
|
||||||
|
.type(Type.MULTISELECT)
|
||||||
|
.values(List.of("first", "second", "third"))
|
||||||
|
.autoSelectFirst(true)
|
||||||
|
.build()))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
TriggerContext triggerContext = TriggerContext.builder()
|
||||||
|
.namespace(flow.getNamespace())
|
||||||
|
.flowId(flow.getId())
|
||||||
|
.triggerId(trigger.getId())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return ConditionContext.builder()
|
||||||
|
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
|
||||||
|
triggerContext, trigger))
|
||||||
|
.flow(flow)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
private ZonedDateTime dateFromVars(String date, ZonedDateTime expexted) {
|
private ZonedDateTime dateFromVars(String date, ZonedDateTime expexted) {
|
||||||
return ZonedDateTime.parse(date).withZoneSameInstant(expexted.getZone());
|
return ZonedDateTime.parse(date).withZoneSameInstant(expexted.getZone());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void shouldGetNextExecutionDateWithConditionMatchingFutureDate() throws InternalException {
|
void shouldGetNextExecutionDateWithConditionMatchingFutureDate() throws InternalException {
|
||||||
|
|
||||||
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
|
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
|
||||||
OffsetTime before = now.minusHours(1).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
OffsetTime before = now.minusHours(1).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||||
OffsetTime after = now.minusHours(4).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
OffsetTime after = now.minusHours(4).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||||
|
|
||||||
Schedule trigger = Schedule.builder()
|
Schedule trigger = Schedule.builder()
|
||||||
.id("schedule").type(Schedule.class.getName())
|
.id("schedule").type(Schedule.class.getName())
|
||||||
.cron("0 * * * *") // every hour
|
.cron("0 * * * *") // every hour
|
||||||
@@ -527,25 +667,25 @@ class ScheduleTest {
|
|||||||
.build()
|
.build()
|
||||||
))
|
))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
|
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
|
||||||
|
|
||||||
ConditionContext conditionContext = ConditionContext.builder()
|
ConditionContext conditionContext = ConditionContext.builder()
|
||||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
|
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
|
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
|
||||||
assertThat(result).isNotEmpty();
|
assertThat(result).isNotEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void shouldGetNextExecutionDateWithConditionMatchingCurrentDate() throws InternalException {
|
void shouldGetNextExecutionDateWithConditionMatchingCurrentDate() throws InternalException {
|
||||||
|
|
||||||
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
|
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
|
||||||
|
|
||||||
OffsetTime before = now.plusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
OffsetTime before = now.plusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||||
OffsetTime after = now.minusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
OffsetTime after = now.minusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
|
||||||
|
|
||||||
Schedule trigger = Schedule.builder()
|
Schedule trigger = Schedule.builder()
|
||||||
.id("schedule").type(Schedule.class.getName())
|
.id("schedule").type(Schedule.class.getName())
|
||||||
.cron("*/30 * * * * *")
|
.cron("*/30 * * * * *")
|
||||||
@@ -558,13 +698,13 @@ class ScheduleTest {
|
|||||||
.build()
|
.build()
|
||||||
))
|
))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
|
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
|
||||||
|
|
||||||
ConditionContext conditionContext = ConditionContext.builder()
|
ConditionContext conditionContext = ConditionContext.builder()
|
||||||
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
|
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
|
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
|
||||||
assertThat(result).isNotEmpty();
|
assertThat(result).isNotEmpty();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,18 @@
|
|||||||
|
id: inputs-with-multiple-constraint-violations
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
inputs:
|
||||||
|
- id: multi
|
||||||
|
type: MULTISELECT
|
||||||
|
values:
|
||||||
|
- A
|
||||||
|
- B
|
||||||
|
- C
|
||||||
|
options:
|
||||||
|
- X
|
||||||
|
- Y
|
||||||
|
- Z
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: validMultiSelect
|
||||||
|
type: io.kestra.plugin.core.debug.Return
|
||||||
|
format: "{{inputs.multi}}"
|
||||||
10
core/src/test/resources/flows/valids/each-pause.yaml
Normal file
10
core/src/test/resources/flows/valids/each-pause.yaml
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
id: each-pause
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: each_task
|
||||||
|
type: io.kestra.plugin.core.flow.ForEach
|
||||||
|
values: '["a", "b"]'
|
||||||
|
tasks:
|
||||||
|
- id: pause
|
||||||
|
type: io.kestra.plugin.core.flow.Pause
|
||||||
@@ -8,4 +8,4 @@ concurrency:
|
|||||||
tasks:
|
tasks:
|
||||||
- id: sleep
|
- id: sleep
|
||||||
type: io.kestra.plugin.core.flow.Sleep
|
type: io.kestra.plugin.core.flow.Sleep
|
||||||
duration: PT10S
|
duration: PT2S
|
||||||
|
|||||||
@@ -402,10 +402,11 @@ public class ExecutorService {
|
|||||||
|
|
||||||
if (flow.getOutputs() != null) {
|
if (flow.getOutputs() != null) {
|
||||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||||
|
var inputAndOutput = runContext.inputAndOutput();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
|
Map<String, Object> outputs = inputAndOutput.renderOutputs(flow.getOutputs());
|
||||||
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
|
outputs = inputAndOutput.typedOutputs(flow, executor.getExecution(), outputs);
|
||||||
newExecution = newExecution.withOutputs(outputs);
|
newExecution = newExecution.withOutputs(outputs);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Logs.logExecution(
|
Logs.logExecution(
|
||||||
|
|||||||
@@ -0,0 +1,22 @@
|
|||||||
|
DROP INDEX logs_execution_id;
|
||||||
|
DROP INDEX logs_execution_id__task_id;
|
||||||
|
DROP INDEX logs_execution_id__taskrun_id;
|
||||||
|
DROP INDEX logs_namespace_flow;
|
||||||
|
|
||||||
|
ALTER table logs drop column "deleted";
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS logs_execution_id ON logs ("execution_id");
|
||||||
|
CREATE INDEX IF NOT EXISTS logs_execution_id__task_id ON logs ("execution_id", "task_id");
|
||||||
|
CREATE INDEX IF NOT EXISTS logs_execution_id__taskrun_id ON logs ("execution_id", "taskrun_id");
|
||||||
|
CREATE INDEX IF NOT EXISTS logs_namespace_flow ON logs ("tenant_id", "timestamp", "level", "namespace", "flow_id");
|
||||||
|
|
||||||
|
|
||||||
|
DROP INDEX IF EXISTS metrics_flow_id;
|
||||||
|
DROP INDEX IF EXISTS metrics_execution_id;
|
||||||
|
DROP INDEX IF EXISTS metrics_timestamp;
|
||||||
|
|
||||||
|
ALTER TABLE metrics drop column "deleted";
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS metrics_flow_id ON metrics ("tenant_id", "namespace", "flow_id");
|
||||||
|
CREATE INDEX IF NOT EXISTS metrics_execution_id ON metrics ("execution_id");
|
||||||
|
CREATE INDEX IF NOT EXISTS metrics_timestamp ON metrics ("tenant_id", "timestamp");
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
package io.kestra.runner.h2;
|
package io.kestra.runner.h2;
|
||||||
|
|
||||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||||
|
|
||||||
public class H2RunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
public class H2RunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,22 @@
|
|||||||
|
ALTER TABLE logs DROP INDEX ix_execution_id;
|
||||||
|
ALTER TABLE logs DROP INDEX ix_execution_id__task_id;
|
||||||
|
ALTER TABLE logs DROP INDEX ix_execution_id__taskrun_id;
|
||||||
|
ALTER TABLE logs DROP INDEX ix_namespace_flow;
|
||||||
|
|
||||||
|
ALTER table logs drop column `deleted`;
|
||||||
|
|
||||||
|
ALTER TABLE logs ADD INDEX ix_execution_id (`execution_id`), ALGORITHM=INPLACE, LOCK=NONE;
|
||||||
|
ALTER TABLE logs ADD INDEX ix_execution_id__task_id (`execution_id`, `task_id`), ALGORITHM=INPLACE, LOCK=NONE;
|
||||||
|
ALTER TABLE logs ADD INDEX ix_execution_id__taskrun_id (`execution_id`, `taskrun_id`), ALGORITHM=INPLACE, LOCK=NONE;
|
||||||
|
ALTER TABLE logs ADD INDEX ix_namespace_flow (`tenant_id`, `timestamp`, `level`, `namespace`, `flow_id`), ALGORITHM=INPLACE, LOCK=NONE;
|
||||||
|
|
||||||
|
|
||||||
|
ALTER TABLE metrics DROP INDEX metrics_flow_id;
|
||||||
|
ALTER TABLE metrics DROP INDEX ix_metrics_execution_id;
|
||||||
|
ALTER TABLE metrics DROP INDEX metrics_timestamp;
|
||||||
|
|
||||||
|
ALTER TABLE metrics drop column `deleted`;
|
||||||
|
|
||||||
|
ALTER TABLE metrics ADD INDEX ix_metrics_flow_id (`tenant_id`, `namespace`, `flow_id`), ALGORITHM=INPLACE, LOCK=NONE;
|
||||||
|
ALTER TABLE metrics ADD INDEX ix_metrics_execution_id (`execution_id`), ALGORITHM=INPLACE, LOCK=NONE;
|
||||||
|
ALTER TABLE metrics ADD INDEX ix_metrics_timestamp (`tenant_id`, `timestamp`), ALGORITHM=INPLACE, LOCK=NONE;
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
package io.kestra.runner.mysql;
|
package io.kestra.runner.mysql;
|
||||||
|
|
||||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||||
|
|
||||||
public class MysqlRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
public class MysqlRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||||
}
|
}
|
||||||
|
|||||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user