mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
36 Commits
docs/purge
...
feat/asset
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c07ecd3d4 | ||
|
|
950223e0a8 | ||
|
|
d48f3b9bd9 | ||
|
|
291fba3281 | ||
|
|
db3b3236ac | ||
|
|
5a8a631b47 | ||
|
|
2da191896f | ||
|
|
111026369b | ||
|
|
e3a0e59e9c | ||
|
|
f352be5746 | ||
|
|
5fc6d0b5d7 | ||
|
|
de5750f656 | ||
|
|
fa870b8df2 | ||
|
|
fa4bf64a23 | ||
|
|
27f81b5b6d | ||
|
|
90d322cd67 | ||
|
|
04246ace13 | ||
|
|
e0e745cb91 | ||
|
|
c69ecd7200 | ||
|
|
4b3419bc15 | ||
|
|
352d4eb194 | ||
|
|
e433833e62 | ||
|
|
d16a8de90f | ||
|
|
4784e459d6 | ||
|
|
2abea0fcde | ||
|
|
5d5165b7b9 | ||
|
|
44d0c10713 | ||
|
|
167734e32a | ||
|
|
24e61c81c0 | ||
|
|
379764a033 | ||
|
|
d55dd275c3 | ||
|
|
f409657e8a | ||
|
|
22f0b3ffdf | ||
|
|
0d99dc6862 | ||
|
|
fd3adc48b8 | ||
|
|
1a8a47c8cd |
2
.github/pull_request_template.md
vendored
2
.github/pull_request_template.md
vendored
@@ -12,7 +12,7 @@ _Example: Replaces legacy scroll directive with the new API._
|
||||
### 🔗 Related Issue
|
||||
|
||||
Which issue does this PR resolve? Use [GitHub Keywords](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue) to automatically link the pull request to the issue.
|
||||
_Example: Closes https://github.com/kestra-io/kestra/issues/12345._
|
||||
_Example: Closes https://github.com/kestra-io/kestra/issues/ISSUE_NUMBER._
|
||||
|
||||
### 🎨 Frontend Checklist
|
||||
|
||||
|
||||
2
.github/workflows/vulnerabilities-check.yml
vendored
2
.github/workflows/vulnerabilities-check.yml
vendored
@@ -43,7 +43,7 @@ jobs:
|
||||
|
||||
# Upload dependency check report
|
||||
- name: Upload dependency check report
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: dependency-check-report
|
||||
|
||||
170
build.gradle
170
build.gradle
@@ -171,13 +171,22 @@ allprojects {
|
||||
subprojects {subProj ->
|
||||
|
||||
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
|
||||
|
||||
apply plugin: "com.adarshr.test-logger"
|
||||
apply plugin: 'jacoco'
|
||||
|
||||
java {
|
||||
sourceCompatibility = targetJavaVersion
|
||||
targetCompatibility = targetJavaVersion
|
||||
}
|
||||
|
||||
configurations {
|
||||
agent {
|
||||
canBeResolved = true
|
||||
canBeConsumed = true
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
// Platform
|
||||
testAnnotationProcessor enforcedPlatform(project(":platform"))
|
||||
@@ -204,9 +213,16 @@ subprojects {subProj ->
|
||||
|
||||
//assertj
|
||||
testImplementation 'org.assertj:assertj-core'
|
||||
|
||||
agent "org.aspectj:aspectjweaver:1.9.25.1"
|
||||
|
||||
testImplementation platform("io.qameta.allure:allure-bom")
|
||||
testImplementation "io.qameta.allure:allure-junit5"
|
||||
}
|
||||
|
||||
def commonTestConfig = { Test t ->
|
||||
t.ignoreFailures = true
|
||||
|
||||
// set Xmx for test workers
|
||||
t.maxHeapSize = '4g'
|
||||
|
||||
@@ -232,6 +248,52 @@ subprojects {subProj ->
|
||||
// }
|
||||
}
|
||||
|
||||
tasks.register('integrationTest', Test) { Test t ->
|
||||
description = 'Runs integration tests'
|
||||
group = 'verification'
|
||||
|
||||
useJUnitPlatform {
|
||||
includeTags 'integration'
|
||||
}
|
||||
|
||||
testClassesDirs = sourceSets.test.output.classesDirs
|
||||
classpath = sourceSets.test.runtimeClasspath
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
|
||||
// Integration tests typically not parallel (but you can enable)
|
||||
maxParallelForks = 1
|
||||
commonTestConfig(t)
|
||||
}
|
||||
|
||||
tasks.register('unitTest', Test) { Test t ->
|
||||
description = 'Runs unit tests'
|
||||
group = 'verification'
|
||||
|
||||
useJUnitPlatform {
|
||||
excludeTags 'flaky', 'integration'
|
||||
}
|
||||
|
||||
testClassesDirs = sourceSets.test.output.classesDirs
|
||||
classpath = sourceSets.test.runtimeClasspath
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
|
||||
commonTestConfig(t)
|
||||
}
|
||||
|
||||
tasks.register('flakyTest', Test) { Test t ->
|
||||
group = 'verification'
|
||||
description = 'Runs tests tagged @Flaky but does not fail the build.'
|
||||
@@ -239,7 +301,6 @@ subprojects {subProj ->
|
||||
useJUnitPlatform {
|
||||
includeTags 'flaky'
|
||||
}
|
||||
ignoreFailures = true
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
@@ -249,10 +310,13 @@ subprojects {subProj ->
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
|
||||
}
|
||||
commonTestConfig(t)
|
||||
|
||||
}
|
||||
|
||||
test {
|
||||
// test task (default)
|
||||
tasks.named('test', Test) { Test t ->
|
||||
group = 'verification'
|
||||
description = 'Runs all non-flaky tests.'
|
||||
|
||||
useJUnitPlatform {
|
||||
excludeTags 'flaky'
|
||||
}
|
||||
@@ -263,10 +327,12 @@ subprojects {subProj ->
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
commonTestConfig(it)
|
||||
commonTestConfig(t)
|
||||
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
|
||||
}
|
||||
|
||||
|
||||
finalizedBy(tasks.named('flakyTest'))
|
||||
tasks.named('check') {
|
||||
dependsOn(tasks.named('test'))// default behaviour
|
||||
}
|
||||
|
||||
testlogger {
|
||||
@@ -282,83 +348,25 @@ subprojects {subProj ->
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* End-to-End Tests
|
||||
**********************************************************************************************************************/
|
||||
def e2eTestsCheck = tasks.register('e2eTestsCheck') {
|
||||
group = 'verification'
|
||||
description = "Runs the 'check' task for all e2e-tests modules"
|
||||
doFirst {
|
||||
project.ext.set("e2e-tests", true)
|
||||
}
|
||||
}
|
||||
|
||||
subprojects {
|
||||
// Add e2e-tests modules check tasks to e2eTestsCheck
|
||||
if (project.name.startsWith("e2e-tests")) {
|
||||
test {
|
||||
onlyIf {
|
||||
project.hasProperty("e2e-tests")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
afterEvaluate {
|
||||
// Add e2e-tests modules check tasks to e2eTestsCheck
|
||||
if (project.name.startsWith("e2e-tests")) {
|
||||
e2eTestsCheck.configure {
|
||||
finalizedBy(check)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* Allure Reports
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
dependencies {
|
||||
testImplementation platform("io.qameta.allure:allure-bom")
|
||||
testImplementation "io.qameta.allure:allure-junit5"
|
||||
}
|
||||
|
||||
configurations {
|
||||
agent {
|
||||
canBeResolved = true
|
||||
canBeConsumed = true
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
agent "org.aspectj:aspectjweaver:1.9.25.1"
|
||||
}
|
||||
|
||||
test {
|
||||
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* Jacoco
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
apply plugin: 'jacoco'
|
||||
|
||||
test {
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
jacocoTestReport {
|
||||
dependsOn test
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tasks.named('check') {
|
||||
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
tasks.register('unitTest') {
|
||||
// No jacocoTestReport here, because it depends by default on :test,
|
||||
// and that would make :test being run twice in our CI.
|
||||
// In practice the report will be generated later in the CI by :check.
|
||||
}
|
||||
|
||||
tasks.register('integrationTest') {
|
||||
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
tasks.register('flakyTest') {
|
||||
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
tasks.named('testCodeCoverageReport') {
|
||||
|
||||
@@ -42,7 +42,7 @@ import picocli.CommandLine.Option;
|
||||
@Introspected
|
||||
public abstract class AbstractCommand implements Callable<Integer> {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
protected ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
private EndpointDefaultConfiguration endpointConfiguration;
|
||||
|
||||
@@ -18,7 +18,8 @@ import picocli.CommandLine;
|
||||
FlowDotCommand.class,
|
||||
FlowExportCommand.class,
|
||||
FlowUpdateCommand.class,
|
||||
FlowUpdatesCommand.class
|
||||
FlowUpdatesCommand.class,
|
||||
FlowsSyncFromSourceCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.List;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "syncFromSource",
|
||||
description = "Update a single flow",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
public class FlowsSyncFromSourceCommand extends AbstractApiCommand {
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
FlowRepositoryInterface repository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
String tenant = tenantService.getTenantId(tenantId);
|
||||
|
||||
List<FlowWithSource> persistedFlows = repository.findAllWithSource(tenant);
|
||||
|
||||
int count = 0;
|
||||
for (FlowWithSource persistedFlow : persistedFlows) {
|
||||
// Ensure exactly one trailing newline. We need this new line
|
||||
// because when we update a flow from its source,
|
||||
// we don't update it if no change is detected.
|
||||
// The goal here is to force an update from the source for every flows
|
||||
GenericFlow flow = GenericFlow.fromYaml(tenant,persistedFlow.getSource() + System.lineSeparator());
|
||||
repository.update(flow, persistedFlow);
|
||||
stdOut("- %s.%s".formatted(flow.getNamespace(), flow.getId()));
|
||||
count++;
|
||||
}
|
||||
stdOut("%s flow(s) successfully updated!".formatted(count));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected boolean loadExternalPlugins() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class FlowsSyncFromSourceCommandTest {
|
||||
@Test
|
||||
void updateAllFlowsFromSource() {
|
||||
URL directory = FlowUpdatesCommandTest.class.getClassLoader().getResource("flows");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"--delete",
|
||||
directory.getPath(),
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("successfully updated !");
|
||||
out.reset();
|
||||
|
||||
FlowRepositoryInterface repository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
List<Flow> flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(1);
|
||||
}
|
||||
|
||||
args = new String[]{
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word"
|
||||
|
||||
};
|
||||
PicocliRunner.call(FlowsSyncFromSourceCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("4 flow(s) successfully updated!");
|
||||
assertThat(out.toString()).contains("- io.kestra.outsider.quattro");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.second");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.third");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.first");
|
||||
|
||||
flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,9 @@ dependencies {
|
||||
// reactor
|
||||
api "io.projectreactor:reactor-core"
|
||||
|
||||
// awaitility
|
||||
api "org.awaitility:awaitility"
|
||||
|
||||
// micronaut
|
||||
api "io.micronaut.data:micronaut-data-model"
|
||||
implementation "io.micronaut:micronaut-http-server-netty"
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.docs;
|
||||
|
||||
import com.fasterxml.classmate.ResolvedType;
|
||||
import com.fasterxml.classmate.members.HierarchicType;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
@@ -22,6 +23,8 @@ import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.CustomAsset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
@@ -42,13 +45,12 @@ import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.*;
|
||||
import java.time.*;
|
||||
@@ -64,7 +66,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class JsonSchemaGenerator {
|
||||
|
||||
|
||||
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
|
||||
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
|
||||
|
||||
@@ -277,10 +279,10 @@ public class JsonSchemaGenerator {
|
||||
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
|
||||
.with(Option.PLAIN_DEFINITION_KEYS)
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);
|
||||
|
||||
|
||||
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
|
||||
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
|
||||
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
|
||||
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider() {
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
try {
|
||||
@@ -299,7 +301,9 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
// default value
|
||||
builder.forFields().withDefaultResolver(this::defaults);
|
||||
builder.forFields()
|
||||
.withIgnoreCheck(fieldScope -> fieldScope.getAnnotation(Hidden.class) != null)
|
||||
.withDefaultResolver(this::defaults);
|
||||
|
||||
// def name
|
||||
builder.forTypesInGeneral()
|
||||
@@ -320,7 +324,7 @@ public class JsonSchemaGenerator {
|
||||
// inline some type
|
||||
builder.forTypesInGeneral()
|
||||
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
|
||||
|
||||
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
|
||||
@@ -588,11 +592,31 @@ public class JsonSchemaGenerator {
|
||||
// The `const` property is used by editors for auto-completion based on that schema.
|
||||
builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> {
|
||||
final Class<?> pluginType = scope.getType().getErasedType();
|
||||
if (pluginType.getAnnotation(Plugin.class) != null) {
|
||||
Plugin pluginAnnotation = pluginType.getAnnotation(Plugin.class);
|
||||
if (pluginAnnotation != null) {
|
||||
ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties");
|
||||
if (properties != null) {
|
||||
String typeConst = pluginType.getName();
|
||||
// This is needed so that assets can have arbitrary types while still being able to be identified as assets.
|
||||
if (pluginType == CustomAsset.class) {
|
||||
properties.set("type", context.getGeneratorConfig().createObjectNode()
|
||||
.put("type", "string")
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (Asset.class.isAssignableFrom(pluginType)) {
|
||||
// For Asset types, we want to be able to use a simple-string type. Convention is that first alias is that string type.
|
||||
typeConst = pluginAnnotation.aliases().length > 0 ? pluginAnnotation.aliases()[0] : pluginType.getName();
|
||||
Arrays.stream(pluginType.getDeclaredMethods())
|
||||
.filter(m -> m.isAnnotationPresent(JsonProperty.class))
|
||||
.forEach(m -> properties.set(m.getAnnotation(JsonProperty.class).value(), context.getGeneratorConfig().createObjectNode()
|
||||
.put("type", "string")
|
||||
));
|
||||
}
|
||||
|
||||
properties.set("type", context.getGeneratorConfig().createObjectNode()
|
||||
.put("const", pluginType.getName())
|
||||
.put("const", typeConst)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -763,6 +787,14 @@ public class JsonSchemaGenerator {
|
||||
consumer.accept(typeContext.resolve(clz));
|
||||
}
|
||||
}).toList();
|
||||
} else if (declaredType.getErasedType() == Asset.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getAssets().stream())
|
||||
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.map(typeContext::resolve)
|
||||
.toList();
|
||||
}
|
||||
|
||||
return null;
|
||||
@@ -809,9 +841,9 @@ public class JsonSchemaGenerator {
|
||||
// we don't return base properties unless specified with @PluginProperty and hidden is false
|
||||
builder
|
||||
.forFields()
|
||||
.withIgnoreCheck(fieldScope -> base != null &&
|
||||
.withIgnoreCheck(fieldScope -> (base != null &&
|
||||
(fieldScope.getAnnotation(PluginProperty.class) == null || fieldScope.getAnnotation(PluginProperty.class).hidden()) &&
|
||||
fieldScope.getDeclaringType().getTypeName().equals(base.getName())
|
||||
fieldScope.getDeclaringType().getTypeName().equals(base.getName())) || fieldScope.getAnnotation(Hidden.class) != null
|
||||
);
|
||||
|
||||
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Exception that can be thrown when Inputs/Outputs have validation problems.
|
||||
*/
|
||||
public class InputOutputValidationException extends KestraRuntimeException {
|
||||
public InputOutputValidationException(String message) {
|
||||
super(message);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Input<?> input){
|
||||
String inputMessage = "Invalid value for input" + " `" + input.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(inputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Output output){
|
||||
String outputMessage = "Invalid value for output" + " `" + output.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(outputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of(String message){
|
||||
return new InputOutputValidationException(message);
|
||||
}
|
||||
|
||||
public static InputOutputValidationException merge(Set<InputOutputValidationException> exceptions){
|
||||
String combinedMessage = exceptions.stream()
|
||||
.map(InputOutputValidationException::getMessage)
|
||||
.collect(Collectors.joining(System.lineSeparator()));
|
||||
throw new InputOutputValidationException(combinedMessage);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
|
||||
|
||||
@@ -103,12 +103,48 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
|
||||
}
|
||||
},
|
||||
METADATA("metadata") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
|
||||
}
|
||||
},
|
||||
FLOW_ID("flowId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
|
||||
}
|
||||
},
|
||||
FLOW_REVISION("flowRevision") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
ID("id") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
ASSET_ID("assetId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
TYPE("type") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
CREATED("created") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
UPDATED("updated") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -151,12 +187,30 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
TRIGGER_STATE("triggerState"){
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
EXECUTION_ID("executionId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
TASK_ID("taskId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
TASK_RUN_ID("taskRunId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
CHILD_FILTER("childFilter") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -271,7 +325,7 @@ public record QueryFilter(
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID,
|
||||
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
|
||||
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID, Field.TRIGGER_STATE
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -306,6 +360,34 @@ public record QueryFilter(
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
},
|
||||
ASSET {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.QUERY,
|
||||
Field.ID,
|
||||
Field.TYPE,
|
||||
Field.NAMESPACE,
|
||||
Field.METADATA,
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
},
|
||||
ASSET_USAGE {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.ASSET_ID,
|
||||
Field.NAMESPACE,
|
||||
Field.FLOW_ID,
|
||||
Field.FLOW_REVISION,
|
||||
Field.EXECUTION_ID,
|
||||
Field.TASK_ID,
|
||||
Field.TASK_RUN_ID,
|
||||
Field.CREATED
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
public abstract List<Field> supportedField();
|
||||
|
||||
111
core/src/main/java/io/kestra/core/models/assets/Asset.java
Normal file
111
core/src/main/java/io/kestra/core/models/assets/Asset.java
Normal file
@@ -0,0 +1,111 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAnySetter;
|
||||
import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public abstract class Asset implements HasUID, DeletedInterface, Plugin {
|
||||
@Hidden
|
||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||
protected String tenantId;
|
||||
|
||||
@Pattern(regexp = "^[a-z0-9][a-z0-9._-]*")
|
||||
@Size(min = 1, max = 150)
|
||||
protected String namespace;
|
||||
|
||||
@NotBlank
|
||||
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
|
||||
@Size(min = 1, max = 150)
|
||||
protected String id;
|
||||
|
||||
@NotBlank
|
||||
protected String type;
|
||||
|
||||
protected String displayName;
|
||||
|
||||
protected String description;
|
||||
|
||||
protected Map<String, Object> metadata;
|
||||
|
||||
@Nullable
|
||||
@Hidden
|
||||
private Instant created;
|
||||
|
||||
@Nullable
|
||||
@Hidden
|
||||
private Instant updated;
|
||||
|
||||
@Hidden
|
||||
private boolean deleted;
|
||||
|
||||
public Asset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String type,
|
||||
String displayName,
|
||||
String description,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
this.tenantId = tenantId;
|
||||
this.namespace = namespace;
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.displayName = displayName;
|
||||
this.description = description;
|
||||
this.metadata = Optional.ofNullable(metadata).map(HashMap::new).orElse(new HashMap<>());
|
||||
Instant now = Instant.now();
|
||||
this.created = Optional.ofNullable(created).orElse(now);
|
||||
this.updated = Optional.ofNullable(updated).orElse(now);
|
||||
this.deleted = deleted;
|
||||
}
|
||||
|
||||
public <T extends Asset> T toUpdated() {
|
||||
if (this.created == null) {
|
||||
this.created = Instant.now();
|
||||
}
|
||||
this.updated = Instant.now();
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
public Asset toDeleted() {
|
||||
this.deleted = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
@JsonAnySetter
|
||||
public void setMetadata(String name, Object value) {
|
||||
metadata.put(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return Asset.uid(tenantId, id);
|
||||
}
|
||||
|
||||
public static String uid(String tenantId, String id) {
|
||||
return IdUtils.fromParts(tenantId, id);
|
||||
}
|
||||
|
||||
public Asset withTenantId(String tenantId) {
|
||||
this.tenantId = tenantId;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
|
||||
public record AssetIdentifier(@Hidden String tenantId, @Hidden String namespace, String id){
|
||||
|
||||
public AssetIdentifier withTenantId(String tenantId) {
|
||||
return new AssetIdentifier(tenantId, this.namespace, this.id);
|
||||
}
|
||||
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(tenantId, id);
|
||||
}
|
||||
|
||||
public static AssetIdentifier of(Asset asset) {
|
||||
return new AssetIdentifier(asset.getTenantId(), asset.getNamespace(), asset.getId());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
|
||||
/**
|
||||
* Represents an entity that used an asset
|
||||
*/
|
||||
public record AssetUser(String tenantId, String namespace, String flowId, Integer flowRevision, String executionId, String taskId, String taskRunId) implements HasUID {
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(tenantId, namespace, flowId, String.valueOf(flowRevision), executionId, taskRunId);
|
||||
}
|
||||
|
||||
public FlowId toFlowId() {
|
||||
return FlowId.of(tenantId, namespace, flowId, flowRevision);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Getter
|
||||
public class AssetsDeclaration extends AssetsInOut {
|
||||
private boolean enableAuto;
|
||||
|
||||
@JsonCreator
|
||||
public AssetsDeclaration(Boolean enableAuto, List<AssetIdentifier> inputs, List<Asset> outputs) {
|
||||
super(inputs, outputs);
|
||||
|
||||
this.enableAuto = Optional.ofNullable(enableAuto).orElse(false);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Getter
|
||||
public class AssetsInOut {
|
||||
private List<AssetIdentifier> inputs;
|
||||
|
||||
private List<Asset> outputs;
|
||||
|
||||
@JsonCreator
|
||||
public AssetsInOut(List<AssetIdentifier> inputs, List<Asset> outputs) {
|
||||
this.inputs = Optional.ofNullable(inputs).orElse(Collections.emptyList());
|
||||
this.outputs = Optional.ofNullable(outputs).orElse(Collections.emptyList());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
public class CustomAsset extends Asset {
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public CustomAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String type,
|
||||
String displayName,
|
||||
String description,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, type, displayName, description, metadata, created, updated, deleted);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = DatasetAsset.ASSET_TYPE)
|
||||
public class DatasetAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "DATASET";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public DatasetAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String system,
|
||||
String location,
|
||||
String format,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setSystem(system);
|
||||
this.setLocation(location);
|
||||
this.setFormat(format);
|
||||
}
|
||||
|
||||
@JsonProperty("system")
|
||||
public String getSystem() {
|
||||
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("location")
|
||||
public String getLocation() {
|
||||
return Optional.ofNullable(metadata.get("location")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("format")
|
||||
public String getFormat() {
|
||||
return Optional.ofNullable(metadata.get("format")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setSystem(String system) {
|
||||
if (system != null) {
|
||||
metadata.put("system", system);
|
||||
}
|
||||
}
|
||||
|
||||
public void setLocation(String location) {
|
||||
if (location != null) {
|
||||
metadata.put("location", location);
|
||||
}
|
||||
}
|
||||
|
||||
public void setFormat(String format) {
|
||||
if (format != null) {
|
||||
metadata.put("format", format);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = ExternalAsset.ASSET_TYPE)
|
||||
public class ExternalAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "EXTERNAL";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public ExternalAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = FileAsset.ASSET_TYPE)
|
||||
public class FileAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "FILE";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public FileAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String system,
|
||||
String path,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setSystem(system);
|
||||
this.setPath(path);
|
||||
}
|
||||
|
||||
@JsonProperty("system")
|
||||
public String getSystem() {
|
||||
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("path")
|
||||
public String getPath() {
|
||||
return Optional.ofNullable(metadata.get("path")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setSystem(String system) {
|
||||
if (system != null) {
|
||||
metadata.put("system", system);
|
||||
}
|
||||
}
|
||||
|
||||
public void setPath(String path) {
|
||||
if (path != null) {
|
||||
metadata.put("path", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = TableAsset.ASSET_TYPE)
|
||||
public class TableAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "TABLE";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public TableAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String system,
|
||||
String database,
|
||||
String schema,
|
||||
String name,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setSystem(system);
|
||||
this.setDatabase(database);
|
||||
this.setSchema(schema);
|
||||
this.setName(name);
|
||||
}
|
||||
|
||||
@JsonProperty("system")
|
||||
public String getSystem() {
|
||||
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("database")
|
||||
public String getDatabase() {
|
||||
return Optional.ofNullable(metadata.get("database")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("schema")
|
||||
public String getSchema() {
|
||||
return Optional.ofNullable(metadata.get("schema")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("name")
|
||||
public String getName() {
|
||||
return Optional.ofNullable(metadata.get("name")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setSystem(String system) {
|
||||
if (system != null) {
|
||||
metadata.put("system", system);
|
||||
}
|
||||
}
|
||||
|
||||
public void setDatabase(String database) {
|
||||
if (database != null) {
|
||||
metadata.put("database", database);
|
||||
}
|
||||
}
|
||||
|
||||
public void setSchema(String schema) {
|
||||
if (schema != null) {
|
||||
metadata.put("schema", schema);
|
||||
}
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
if (name != null) {
|
||||
metadata.put("name", name);
|
||||
}
|
||||
}
|
||||
}
|
||||
73
core/src/main/java/io/kestra/core/models/assets/VmAsset.java
Normal file
73
core/src/main/java/io/kestra/core/models/assets/VmAsset.java
Normal file
@@ -0,0 +1,73 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = VmAsset.ASSET_TYPE)
|
||||
public class VmAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "VM";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public VmAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String provider,
|
||||
String region,
|
||||
String state,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setProvider(provider);
|
||||
this.setRegion(region);
|
||||
this.setState(state);
|
||||
}
|
||||
|
||||
@JsonProperty("provider")
|
||||
public String getProvider() {
|
||||
return Optional.ofNullable(metadata.get("provider")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("region")
|
||||
public String getRegion() {
|
||||
return Optional.ofNullable(metadata.get("region")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("state")
|
||||
public String getState() {
|
||||
return Optional.ofNullable(metadata.get("state")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setProvider(String provider) {
|
||||
if (provider != null) {
|
||||
metadata.put("provider", provider);
|
||||
}
|
||||
}
|
||||
|
||||
public void setRegion(String region) {
|
||||
if (region != null) {
|
||||
metadata.put("region", region);
|
||||
}
|
||||
}
|
||||
|
||||
public void setState(String state) {
|
||||
if (state != null) {
|
||||
metadata.put("state", state);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,10 +2,9 @@ package io.kestra.core.models.executions;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.assets.AssetsInOut;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
@@ -59,6 +58,10 @@ public class TaskRun implements TenantInterface {
|
||||
@Schema(implementation = Object.class)
|
||||
Variables outputs;
|
||||
|
||||
@With
|
||||
@Nullable
|
||||
AssetsInOut assets;
|
||||
|
||||
@NotNull
|
||||
State state;
|
||||
|
||||
@@ -89,14 +92,23 @@ public class TaskRun implements TenantInterface {
|
||||
this.value,
|
||||
this.attempts,
|
||||
this.outputs,
|
||||
this.assets,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
);
|
||||
}
|
||||
public TaskRun withStateAndAttempt(State.Type state) {
|
||||
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
|
||||
|
||||
if (newAttempts.isEmpty()) {
|
||||
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
|
||||
} else {
|
||||
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
|
||||
newAttempts.set(newAttempts.size() - 1, updatedLast);
|
||||
}
|
||||
|
||||
public TaskRun replaceState(State newState) {
|
||||
return new TaskRun(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
@@ -106,9 +118,10 @@ public class TaskRun implements TenantInterface {
|
||||
this.taskId,
|
||||
this.parentTaskRunId,
|
||||
this.value,
|
||||
this.attempts,
|
||||
newAttempts,
|
||||
this.outputs,
|
||||
newState,
|
||||
this.assets,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
@@ -131,6 +144,7 @@ public class TaskRun implements TenantInterface {
|
||||
this.value,
|
||||
newAttempts,
|
||||
this.outputs,
|
||||
this.assets,
|
||||
this.state.withState(State.Type.FAILED),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
@@ -150,6 +164,7 @@ public class TaskRun implements TenantInterface {
|
||||
.value(this.getValue())
|
||||
.attempts(this.getAttempts())
|
||||
.outputs(this.getOutputs())
|
||||
.assets(this.getAssets())
|
||||
.state(state == null ? this.getState() : state)
|
||||
.iteration(this.getIteration())
|
||||
.build();
|
||||
@@ -236,6 +251,7 @@ public class TaskRun implements TenantInterface {
|
||||
", parentTaskRunId=" + this.getParentTaskRunId() +
|
||||
", state=" + this.getState().getCurrent().toString() +
|
||||
", outputs=" + this.getOutputs() +
|
||||
", assets=" + this.getAssets() +
|
||||
", attempts=" + this.getAttempts() +
|
||||
")";
|
||||
}
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
/**
|
||||
* Interface for defining an identifiable and typed data.
|
||||
@@ -29,16 +27,4 @@ public interface Data {
|
||||
*/
|
||||
String getDisplayName();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
default ConstraintViolationException toConstraintViolationException(String message, Object value) {
|
||||
Class<Data> cls = (Class<Data>) this.getClass();
|
||||
|
||||
return ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid " + (this instanceof Output ? "output" : "input") + " for `" + getId() + "`, " + message + ", but received `" + value + "`",
|
||||
this,
|
||||
cls,
|
||||
this.getId(),
|
||||
value
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Represents an input along with its associated value and validation state.
|
||||
*
|
||||
@@ -12,15 +14,15 @@ import jakarta.validation.constraints.NotNull;
|
||||
* @param value The provided value for the input.
|
||||
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
|
||||
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
|
||||
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
|
||||
* @param exceptions The validation exceptions, if the input value is invalid; {@code null} otherwise.
|
||||
*/
|
||||
public record InputAndValue(
|
||||
Input<?> input,
|
||||
Object value,
|
||||
boolean enabled,
|
||||
boolean isDefault,
|
||||
ConstraintViolationException exception) {
|
||||
|
||||
Set<InputOutputValidationException> exceptions) {
|
||||
|
||||
/**
|
||||
* Creates a new {@link InputAndValue} instance.
|
||||
*
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.validations.Regex;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
@@ -14,10 +15,7 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
@SuperBuilder
|
||||
@@ -77,30 +75,35 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
@Override
|
||||
public void validate(List<String> inputs) throws ConstraintViolationException {
|
||||
Set<ConstraintViolation<?>> violations = new HashSet<>();
|
||||
|
||||
if (values != null && options != null) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
violations.add( ManualConstraintViolation.of(
|
||||
"you can't define both `values` and `options`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
""
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
if (!this.getAllowCustomValue()) {
|
||||
for (String input : inputs) {
|
||||
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
|
||||
if (!finalValues.contains(input)) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"it must match the values `" + finalValues + "`",
|
||||
violations.add(ManualConstraintViolation.of(
|
||||
"value `" + input + "` doesn't match the values `" + finalValues + "`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
input
|
||||
);
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!violations.isEmpty()) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(violations);
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@@ -145,7 +148,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -125,7 +125,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
SelectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextProperty;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -156,9 +157,9 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class)
|
||||
* @see RunContextProperty#as(Class)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return as(property, context, clazz, Map.of());
|
||||
@@ -167,25 +168,57 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
||||
* @see RunContextProperty#as(Class, Map)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.skipCache || property.value == null) {
|
||||
String rendered = context.render(property.expression, variables);
|
||||
property.value = MAPPER.convertValue(rendered, clazz);
|
||||
property.value = deserialize(rendered, clazz);
|
||||
}
|
||||
|
||||
return property.value;
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, clazz);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, clazz);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, JavaType type) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, type);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, type);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
|
||||
* @see RunContextProperty#asList(Class)
|
||||
*/
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
return asList(property, context, itemClazz, Map.of());
|
||||
@@ -194,37 +227,39 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
||||
* @see RunContextProperty#asList(Class, Map)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.skipCache || property.value == null) {
|
||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||
try {
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = MAPPER.readValue(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
if (item instanceof String str) {
|
||||
return MAPPER.convertValue(context.render(str, variables), itemClazz);
|
||||
} else if (item instanceof Map map) {
|
||||
return MAPPER.convertValue(context.render(map, variables), itemClazz);
|
||||
}
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = deserialize(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = deserialize(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
Object rendered = null;
|
||||
if (item instanceof String str) {
|
||||
rendered = context.render(str, variables);
|
||||
} else if (item instanceof Map map) {
|
||||
rendered = context.render(map, variables);
|
||||
}
|
||||
|
||||
if (rendered != null) {
|
||||
return deserialize(rendered, itemClazz);
|
||||
}
|
||||
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,9 +269,9 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property then convert it as a map of target types.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
|
||||
* @see RunContextProperty#asMap(Class, Class)
|
||||
*/
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
return asMap(property, runContext, keyClass, valueClass, Map.of());
|
||||
@@ -248,7 +283,7 @@ public class Property<T> {
|
||||
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
|
||||
* @see RunContextProperty#asMap(Class, Class, Map)
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
@@ -260,12 +295,12 @@ public class Property<T> {
|
||||
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a map.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(runContext.render(property.expression, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(property.expression, variables), targetMapType);
|
||||
}
|
||||
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
|
||||
else {
|
||||
Map asRawMap = MAPPER.readValue(property.expression, Map.class);
|
||||
property.value = MAPPER.convertValue(runContext.render(asRawMap, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(asRawMap, variables), targetMapType);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
|
||||
@@ -5,11 +5,13 @@ import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.plugin.core.flow.WorkingDirectory;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Builder;
|
||||
@@ -78,6 +80,11 @@ abstract public class Task implements TaskInterface {
|
||||
@Valid
|
||||
private Cache taskCache;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
@Valid
|
||||
@Nullable
|
||||
private Property<AssetsDeclaration> assets;
|
||||
|
||||
public Optional<Task> findById(String id) {
|
||||
if (this.getId().equals(id)) {
|
||||
return Optional.of(this);
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.runners.AssetEmitter;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
import org.slf4j.spi.LoggingEventBuilder;
|
||||
@@ -18,6 +21,7 @@ import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
/**
|
||||
* Service for matching and capturing structured data from task execution logs.
|
||||
@@ -76,6 +80,18 @@ public class TaskLogLineMatcher {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (match.assets() != null) {
|
||||
try {
|
||||
AssetEmitter assetEmitter = runContext.assets();
|
||||
match.assets().forEach(throwConsumer(assetEmitter::upsert));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
logger.warn("Unable to get asset emitter for log '{}'", data, e);
|
||||
} catch (QueueException e) {
|
||||
logger.warn("Unable to emit asset for log '{}'", data, e);
|
||||
}
|
||||
}
|
||||
|
||||
return match;
|
||||
}
|
||||
|
||||
@@ -94,8 +110,9 @@ public class TaskLogLineMatcher {
|
||||
public record TaskLogMatch(
|
||||
Map<String, Object> outputs,
|
||||
List<AbstractMetricEntry<?>> metrics,
|
||||
List<LogLine> logs
|
||||
) {
|
||||
List<LogLine> logs,
|
||||
List<Asset> assets
|
||||
) {
|
||||
@Override
|
||||
public Map<String, Object> outputs() {
|
||||
return Optional.ofNullable(outputs).orElse(Map.of());
|
||||
|
||||
@@ -6,8 +6,10 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
@@ -88,6 +90,9 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
)
|
||||
private boolean allowConcurrent = false;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private Property<AssetsDeclaration> assets;
|
||||
|
||||
/**
|
||||
* For backward compatibility: we rename minLogLevel to logLevel.
|
||||
* @deprecated use {@link #logLevel} instead
|
||||
|
||||
@@ -67,6 +67,11 @@ public class ManualConstraintViolation<T> implements ConstraintViolation<T> {
|
||||
invalidValue
|
||||
)));
|
||||
}
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(
|
||||
Set<? extends ConstraintViolation<?>> constraintViolations
|
||||
) {
|
||||
return new ConstraintViolationException(constraintViolations);
|
||||
}
|
||||
|
||||
public String getMessageTemplate() {
|
||||
return "{messageTemplate}";
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.plugins;
|
||||
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -11,6 +12,7 @@ import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.logs.LogExporter;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.serdes.AssetDeserializer;
|
||||
import io.kestra.core.plugins.serdes.PluginDeserializer;
|
||||
import io.kestra.core.secret.SecretPluginInterface;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -45,5 +47,6 @@ public class PluginModule extends SimpleModule {
|
||||
addDeserializer(SecretPluginInterface.class, new PluginDeserializer<>());
|
||||
addDeserializer(AppPluginInterface.class, new PluginDeserializer<>());
|
||||
addDeserializer(LogExporter.class, new PluginDeserializer<>());
|
||||
addDeserializer(Asset.class, new AssetDeserializer());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
|
||||
import io.kestra.core.app.AppBlockInterface;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -108,6 +109,7 @@ public class PluginScanner {
|
||||
List<Class<? extends StorageInterface>> storages = new ArrayList<>();
|
||||
List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>();
|
||||
List<Class<? extends TaskRunner<?>>> taskRunners = new ArrayList<>();
|
||||
List<Class<? extends Asset>> assets = new ArrayList<>();
|
||||
List<Class<? extends AppPluginInterface>> apps = new ArrayList<>();
|
||||
List<Class<? extends AppBlockInterface>> appBlocks = new ArrayList<>();
|
||||
List<Class<? extends Chart<?>>> charts = new ArrayList<>();
|
||||
@@ -155,6 +157,10 @@ public class PluginScanner {
|
||||
//noinspection unchecked
|
||||
taskRunners.add((Class<? extends TaskRunner<?>>) runner.getClass());
|
||||
}
|
||||
case Asset asset -> {
|
||||
log.debug("Loading Asset plugin: '{}'", plugin.getClass());
|
||||
assets.add(asset.getClass());
|
||||
}
|
||||
case AppPluginInterface app -> {
|
||||
log.debug("Loading App plugin: '{}'", plugin.getClass());
|
||||
apps.add(app.getClass());
|
||||
@@ -223,6 +229,7 @@ public class PluginScanner {
|
||||
.conditions(conditions)
|
||||
.storages(storages)
|
||||
.secrets(secrets)
|
||||
.assets(assets)
|
||||
.apps(apps)
|
||||
.appBlocks(appBlocks)
|
||||
.taskRunners(taskRunners)
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
|
||||
import io.kestra.core.app.AppBlockInterface;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -39,6 +40,7 @@ public class RegisteredPlugin {
|
||||
public static final String STORAGES_GROUP_NAME = "storages";
|
||||
public static final String SECRETS_GROUP_NAME = "secrets";
|
||||
public static final String TASK_RUNNERS_GROUP_NAME = "task-runners";
|
||||
public static final String ASSETS_GROUP_NAME = "assets";
|
||||
public static final String APPS_GROUP_NAME = "apps";
|
||||
public static final String APP_BLOCKS_GROUP_NAME = "app-blocks";
|
||||
public static final String CHARTS_GROUP_NAME = "charts";
|
||||
@@ -56,6 +58,7 @@ public class RegisteredPlugin {
|
||||
private final List<Class<? extends StorageInterface>> storages;
|
||||
private final List<Class<? extends SecretPluginInterface>> secrets;
|
||||
private final List<Class<? extends TaskRunner<?>>> taskRunners;
|
||||
private final List<Class<? extends Asset>> assets;
|
||||
private final List<Class<? extends AppPluginInterface>> apps;
|
||||
private final List<Class<? extends AppBlockInterface>> appBlocks;
|
||||
private final List<Class<? extends Chart<?>>> charts;
|
||||
@@ -74,6 +77,7 @@ public class RegisteredPlugin {
|
||||
!storages.isEmpty() ||
|
||||
!secrets.isEmpty() ||
|
||||
!taskRunners.isEmpty() ||
|
||||
!assets.isEmpty() ||
|
||||
!apps.isEmpty() ||
|
||||
!appBlocks.isEmpty() ||
|
||||
!charts.isEmpty() ||
|
||||
@@ -145,6 +149,10 @@ public class RegisteredPlugin {
|
||||
return AppPluginInterface.class;
|
||||
}
|
||||
|
||||
if (this.getAssets().stream().anyMatch(r -> r.getName().equals(cls))) {
|
||||
return Asset.class;
|
||||
}
|
||||
|
||||
if (this.getLogExporters().stream().anyMatch(r -> r.getName().equals(cls))) {
|
||||
return LogExporter.class;
|
||||
}
|
||||
@@ -180,6 +188,7 @@ public class RegisteredPlugin {
|
||||
result.put(STORAGES_GROUP_NAME, Arrays.asList(this.getStorages().toArray(Class[]::new)));
|
||||
result.put(SECRETS_GROUP_NAME, Arrays.asList(this.getSecrets().toArray(Class[]::new)));
|
||||
result.put(TASK_RUNNERS_GROUP_NAME, Arrays.asList(this.getTaskRunners().toArray(Class[]::new)));
|
||||
result.put(ASSETS_GROUP_NAME, Arrays.asList(this.getAssets().toArray(Class[]::new)));
|
||||
result.put(APPS_GROUP_NAME, Arrays.asList(this.getApps().toArray(Class[]::new)));
|
||||
result.put(APP_BLOCKS_GROUP_NAME, Arrays.asList(this.getAppBlocks().toArray(Class[]::new)));
|
||||
result.put(CHARTS_GROUP_NAME, Arrays.asList(this.getCharts().toArray(Class[]::new)));
|
||||
@@ -359,6 +368,12 @@ public class RegisteredPlugin {
|
||||
b.append("] ");
|
||||
}
|
||||
|
||||
if (!this.getAssets().isEmpty()) {
|
||||
b.append("[Assets: ");
|
||||
b.append(this.getAssets().stream().map(Class::getName).collect(Collectors.joining(", ")));
|
||||
b.append("] ");
|
||||
}
|
||||
|
||||
if (!this.getApps().isEmpty()) {
|
||||
b.append("[Apps: ");
|
||||
b.append(this.getApps().stream().map(Class::getName).collect(Collectors.joining(", ")));
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package io.kestra.core.plugins.notifications;
|
||||
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface ExecutionInterface {
|
||||
@Schema(
|
||||
title = "The execution id to use",
|
||||
description = "Default is the current execution, " +
|
||||
"change it to {{ trigger.executionId }} if you use this task with a Flow trigger to use the original execution."
|
||||
)
|
||||
Property<String> getExecutionId();
|
||||
|
||||
@Schema(
|
||||
title = "Custom fields to be added on notification"
|
||||
)
|
||||
Property<Map<String, Object>> getCustomFields();
|
||||
|
||||
@Schema(
|
||||
title = "Custom message to be added on notification"
|
||||
)
|
||||
Property<String> getCustomMessage();
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
package io.kestra.core.plugins.notifications;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.retrys.Exponential;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.RetryUtils;
|
||||
import io.kestra.core.utils.UriProvider;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
|
||||
public final class ExecutionService {
|
||||
private ExecutionService() {}
|
||||
|
||||
public static Execution findExecution(RunContext runContext, Property<String> executionId) throws IllegalVariableEvaluationException, NoSuchElementException {
|
||||
ExecutionRepositoryInterface executionRepository = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionRepositoryInterface.class);
|
||||
|
||||
RetryUtils.Instance<Execution, NoSuchElementException> retryInstance = RetryUtils
|
||||
.of(Exponential.builder()
|
||||
.delayFactor(2.0)
|
||||
.interval(Duration.ofSeconds(1))
|
||||
.maxInterval(Duration.ofSeconds(15))
|
||||
.maxAttempts(-1)
|
||||
.maxDuration(Duration.ofMinutes(10))
|
||||
.build(),
|
||||
runContext.logger()
|
||||
);
|
||||
|
||||
var executionRendererId = runContext.render(executionId).as(String.class).orElse(null);
|
||||
var flowTriggerExecutionState = getOptionalFlowTriggerExecutionState(runContext);
|
||||
|
||||
var flowVars = (Map<String, String>) runContext.getVariables().get("flow");
|
||||
var isCurrentExecution = isCurrentExecution(runContext, executionRendererId);
|
||||
if (isCurrentExecution) {
|
||||
runContext.logger().info("Loading execution data for the current execution.");
|
||||
}
|
||||
|
||||
return retryInstance.run(
|
||||
NoSuchElementException.class,
|
||||
() -> executionRepository.findById(flowVars.get("tenantId"), executionRendererId)
|
||||
.filter(foundExecution -> isExecutionInTheWantedState(foundExecution, isCurrentExecution, flowTriggerExecutionState))
|
||||
.orElseThrow(() -> new NoSuchElementException("Unable to find execution '" + executionRendererId + "'"))
|
||||
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* ExecutionRepository can be out of sync in ElasticSearch stack, with this filter we try to mitigate that
|
||||
*
|
||||
* @param execution the Execution we fetched from ExecutionRepository
|
||||
* @param isCurrentExecution true if this *Execution Task is configured to send a notification for the current Execution
|
||||
* @param flowTriggerExecutionState the Execution State that triggered the Flow trigger, if any
|
||||
* @return true if we think we fetched the right Execution data for our usecase
|
||||
*/
|
||||
public static boolean isExecutionInTheWantedState(Execution execution, boolean isCurrentExecution, Optional<String> flowTriggerExecutionState) {
|
||||
if (isCurrentExecution) {
|
||||
// we don't wait for current execution to be terminated as it could not be possible as long as this task is running
|
||||
return true;
|
||||
}
|
||||
|
||||
if (flowTriggerExecutionState.isPresent()) {
|
||||
// we were triggered by a Flow trigger that can be, for example: PAUSED
|
||||
if (flowTriggerExecutionState.get().equals(State.Type.RUNNING.toString())) {
|
||||
// RUNNING special case: we take the first state we got
|
||||
return true;
|
||||
} else {
|
||||
// to handle the case where the ExecutionRepository is out of sync in ElasticSearch stack,
|
||||
// we try to match an Execution with the same state
|
||||
return execution.getState().getCurrent().name().equals(flowTriggerExecutionState.get());
|
||||
}
|
||||
} else {
|
||||
return execution.getState().getCurrent().isTerminated();
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> executionMap(RunContext runContext, ExecutionInterface executionInterface) throws IllegalVariableEvaluationException {
|
||||
Execution execution = findExecution(runContext, executionInterface.getExecutionId());
|
||||
UriProvider uriProvider = ((DefaultRunContext) runContext).getApplicationContext().getBean(UriProvider.class);
|
||||
|
||||
Map<String, Object> templateRenderMap = new HashMap<>();
|
||||
templateRenderMap.put("duration", execution.getState().humanDuration());
|
||||
templateRenderMap.put("startDate", execution.getState().getStartDate());
|
||||
templateRenderMap.put("link", uriProvider.executionUrl(execution));
|
||||
templateRenderMap.put("execution", JacksonMapper.toMap(execution));
|
||||
|
||||
runContext.render(executionInterface.getCustomMessage())
|
||||
.as(String.class)
|
||||
.ifPresent(s -> templateRenderMap.put("customMessage", s));
|
||||
|
||||
final Map<String, Object> renderedCustomFields = runContext.render(executionInterface.getCustomFields()).asMap(String.class, Object.class);
|
||||
if (!renderedCustomFields.isEmpty()) {
|
||||
templateRenderMap.put("customFields", renderedCustomFields);
|
||||
}
|
||||
|
||||
var isCurrentExecution = isCurrentExecution(runContext, execution.getId());
|
||||
|
||||
List<TaskRun> taskRuns;
|
||||
|
||||
if (isCurrentExecution) {
|
||||
taskRuns = execution.getTaskRunList();
|
||||
} else {
|
||||
taskRuns = execution.getTaskRunList().stream()
|
||||
.filter(t -> (execution.hasFailed() ? State.Type.FAILED : State.Type.SUCCESS).equals(t.getState().getCurrent()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
if (!ListUtils.isEmpty(taskRuns)) {
|
||||
TaskRun lastTaskRun = taskRuns.getLast();
|
||||
templateRenderMap.put("firstFailed", State.Type.FAILED.equals(lastTaskRun.getState().getCurrent()) ? lastTaskRun : false);
|
||||
templateRenderMap.put("lastTask", lastTaskRun);
|
||||
}
|
||||
|
||||
return templateRenderMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* if there is a state, we assume this is a Flow trigger with type: {@link io.kestra.plugin.core.trigger.Flow.Output}
|
||||
*
|
||||
* @return the state of the execution that triggered the Flow trigger, or empty if another usecase/trigger
|
||||
*/
|
||||
private static Optional<String> getOptionalFlowTriggerExecutionState(RunContext runContext) {
|
||||
var triggerVar = Optional.ofNullable(
|
||||
runContext.getVariables().get("trigger")
|
||||
);
|
||||
return triggerVar.map(trigger -> ((Map<String, String>) trigger).get("state"));
|
||||
}
|
||||
|
||||
private static boolean isCurrentExecution(RunContext runContext, String executionId) {
|
||||
var executionVars = (Map<String, String>) runContext.getVariables().get("execution");
|
||||
return executionId.equals(executionVars.get("id"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package io.kestra.core.plugins.serdes;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.CustomAsset;
|
||||
|
||||
/**
|
||||
* Specific {@link JsonDeserializer} for deserializing {@link Asset}.
|
||||
*/
|
||||
public final class AssetDeserializer extends PluginDeserializer<Asset> {
|
||||
@Override
|
||||
protected Class<? extends Plugin> fallbackClass() {
|
||||
return CustomAsset.class;
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import io.kestra.core.models.dashboards.charts.DataChart;
|
||||
import io.kestra.core.plugins.DefaultPluginRegistry;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micronaut.context.exceptions.NoSuchBeanException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -28,7 +29,7 @@ import java.util.Optional;
|
||||
* The {@link PluginDeserializer} uses the {@link PluginRegistry} to found the plugin class corresponding to
|
||||
* a plugin type.
|
||||
*/
|
||||
public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
|
||||
public class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PluginDeserializer.class);
|
||||
|
||||
@@ -72,7 +73,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
// By default, if no plugin-registry is configured retrieve
|
||||
// the one configured from the static Kestra's context.
|
||||
pluginRegistry = KestraContext.getContext().getPluginRegistry();
|
||||
} catch (IllegalStateException ignore) {
|
||||
} catch (IllegalStateException | NoSuchBeanException ignore) {
|
||||
// This error can only happen if the KestraContext is not initialized (i.e. in unit tests).
|
||||
log.error("No plugin registry was initialized. Use default implementation.");
|
||||
pluginRegistry = DefaultPluginRegistry.getOrCreate();
|
||||
@@ -92,6 +93,10 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
identifier
|
||||
);
|
||||
pluginType = pluginRegistry.findClassByIdentifier(identifier);
|
||||
|
||||
if (pluginType == null) {
|
||||
pluginType = fallbackClass();
|
||||
}
|
||||
}
|
||||
|
||||
if (pluginType == null) {
|
||||
@@ -152,4 +157,8 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
|
||||
return isVersioningSupported && version != null && !version.isEmpty() ? type + ":" + version : type;
|
||||
}
|
||||
|
||||
protected Class<? extends Plugin> fallbackClass() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
12
core/src/main/java/io/kestra/core/runners/AssetEmitter.java
Normal file
12
core/src/main/java/io/kestra/core/runners/AssetEmitter.java
Normal file
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface AssetEmitter {
|
||||
void upsert(Asset asset) throws QueueException;
|
||||
|
||||
List<Asset> outputs();
|
||||
}
|
||||
@@ -6,11 +6,13 @@ import com.google.common.base.CaseFormat;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.services.AssetManagerFactory;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
import io.kestra.core.services.KVStoreService;
|
||||
import io.kestra.core.storages.Storage;
|
||||
@@ -54,6 +56,7 @@ public class DefaultRunContext extends RunContext {
|
||||
private MetricRegistry meterRegistry;
|
||||
private VersionProvider version;
|
||||
private KVStoreService kvStoreService;
|
||||
private AssetManagerFactory assetManagerFactory;
|
||||
private Optional<String> secretKey;
|
||||
private WorkingDir workingDir;
|
||||
private Validator validator;
|
||||
@@ -73,6 +76,8 @@ public class DefaultRunContext extends RunContext {
|
||||
private Task task;
|
||||
private AbstractTrigger trigger;
|
||||
|
||||
private volatile AssetEmitter assetEmitter;
|
||||
|
||||
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
|
||||
|
||||
|
||||
@@ -161,6 +166,7 @@ public class DefaultRunContext extends RunContext {
|
||||
this.secretKey = applicationContext.getProperty("kestra.encryption.secret-key", String.class);
|
||||
this.validator = applicationContext.getBean(Validator.class);
|
||||
this.localPath = applicationContext.getBean(LocalPathFactory.class).createLocalPath(this);
|
||||
this.assetManagerFactory = applicationContext.getBean(AssetManagerFactory.class);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -537,6 +543,23 @@ public class DefaultRunContext extends RunContext {
|
||||
return flow != null ? flow.get("tenantId") : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public TaskRunInfo taskRunInfo() {
|
||||
Optional<Map<String, Object>> maybeTaskRunMap = Optional.ofNullable(this.getVariables().get("taskrun"))
|
||||
.map(Map.class::cast);
|
||||
return new TaskRunInfo(
|
||||
(String) this.getVariables().get("executionId"),
|
||||
(String) this.getVariables().get("taskId"),
|
||||
maybeTaskRunMap.map(m -> (String) m.get("id"))
|
||||
.orElse(null),
|
||||
maybeTaskRunMap.map(m -> (String) m.get("value"))
|
||||
.orElse(null)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@@ -545,12 +568,7 @@ public class DefaultRunContext extends RunContext {
|
||||
public FlowInfo flowInfo() {
|
||||
Map<String, Object> flow = (Map<String, Object>) this.getVariables().get("flow");
|
||||
// normally only tests should not have the flow variable
|
||||
return flow == null ? new FlowInfo(null, null, null, null) : new FlowInfo(
|
||||
(String) flow.get("tenantId"),
|
||||
(String) flow.get("namespace"),
|
||||
(String) flow.get("id"),
|
||||
(Integer) flow.get("revision")
|
||||
);
|
||||
return flow == null ? new FlowInfo(null, null, null, null) : FlowInfo.from(flow);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -594,6 +612,25 @@ public class DefaultRunContext extends RunContext {
|
||||
return new AclCheckerImpl(this.applicationContext, flowInfo());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AssetEmitter assets() throws IllegalVariableEvaluationException {
|
||||
if (this.assetEmitter == null) {
|
||||
synchronized (this) {
|
||||
if (this.assetEmitter == null) {
|
||||
this.assetEmitter = assetManagerFactory.of(
|
||||
Optional.ofNullable(task).map(Task::getAssets)
|
||||
.or(() -> Optional.ofNullable(trigger).map(AbstractTrigger::getAssets))
|
||||
.flatMap(throwFunction(asset -> this.render(asset).as(AssetsDeclaration.class)))
|
||||
.map(AssetsDeclaration::isEnableAuto)
|
||||
.orElse(false)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return this.assetEmitter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocalPath localPath() {
|
||||
return localPath;
|
||||
|
||||
@@ -3,6 +3,8 @@ package io.kestra.core.runners;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
@@ -17,7 +19,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.property.URIFetcher;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -207,8 +208,8 @@ public class FlowInputOutput {
|
||||
.filter(InputAndValue::enabled)
|
||||
.map(it -> {
|
||||
//TODO check to return all exception at-once.
|
||||
if (it.exception() != null) {
|
||||
throw it.exception();
|
||||
if (it.exceptions() != null && !it.exceptions().isEmpty()) {
|
||||
throw InputOutputValidationException.merge(it.exceptions());
|
||||
}
|
||||
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
|
||||
})
|
||||
@@ -292,13 +293,9 @@ public class FlowInputOutput {
|
||||
try {
|
||||
isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get()));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
resolvable.resolveWithError(ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid condition: " + e.getMessage(),
|
||||
input,
|
||||
(Class<Input>)input.getClass(),
|
||||
input.getId(),
|
||||
this
|
||||
));
|
||||
resolvable.resolveWithError(
|
||||
InputOutputValidationException.of("Invalid condition: " + e.getMessage())
|
||||
);
|
||||
isInputEnabled = false;
|
||||
}
|
||||
}
|
||||
@@ -331,7 +328,7 @@ public class FlowInputOutput {
|
||||
// validate and parse input value
|
||||
if (value == null) {
|
||||
if (input.getRequired()) {
|
||||
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
|
||||
resolvable.resolveWithError(InputOutputValidationException.of("Missing required input:" + input.getId()));
|
||||
} else {
|
||||
resolvable.resolveWithValue(null);
|
||||
}
|
||||
@@ -341,17 +338,18 @@ public class FlowInputOutput {
|
||||
parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue()));
|
||||
parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue()));
|
||||
} catch (ConstraintViolationException e) {
|
||||
ConstraintViolationException exception = e.getConstraintViolations().size() == 1 ?
|
||||
input.toConstraintViolationException(List.copyOf(e.getConstraintViolations()).getFirst().getMessage(), value) :
|
||||
input.toConstraintViolationException(e.getMessage(), value);
|
||||
resolvable.resolveWithError(exception);
|
||||
Input<?> finalInput = input;
|
||||
Set<InputOutputValidationException> exceptions = e.getConstraintViolations().stream()
|
||||
.map(c-> InputOutputValidationException.of(c.getMessage(), finalInput))
|
||||
.collect(Collectors.toSet());
|
||||
resolvable.resolveWithError(exceptions);
|
||||
}
|
||||
}
|
||||
} catch (ConstraintViolationException e) {
|
||||
resolvable.resolveWithError(e);
|
||||
} catch (Exception e) {
|
||||
ConstraintViolationException exception = input.toConstraintViolationException(e instanceof IllegalArgumentException ? e.getMessage() : e.toString(), resolvable.get().value());
|
||||
resolvable.resolveWithError(exception);
|
||||
} catch (IllegalArgumentException e){
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage(), input));
|
||||
}
|
||||
catch (Exception e) {
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage()));
|
||||
}
|
||||
|
||||
return resolvable.get();
|
||||
@@ -439,8 +437,12 @@ public class FlowInputOutput {
|
||||
}
|
||||
return entry;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw output.toConstraintViolationException(e.getMessage(), current);
|
||||
}
|
||||
catch (IllegalArgumentException e){
|
||||
throw InputOutputValidationException.of(e.getMessage(), output);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw InputOutputValidationException.of(e.getMessage());
|
||||
}
|
||||
})
|
||||
.filter(Optional::isPresent)
|
||||
@@ -503,7 +505,7 @@ public class FlowInputOutput {
|
||||
if (matcher.matches()) {
|
||||
yield current.toString();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Expected `URI` but received `" + current + "`");
|
||||
throw new IllegalArgumentException("Invalid URI format.");
|
||||
}
|
||||
}
|
||||
case ARRAY, MULTISELECT -> {
|
||||
@@ -533,7 +535,7 @@ public class FlowInputOutput {
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw e;
|
||||
} catch (Throwable e) {
|
||||
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
|
||||
throw new Exception(" errors:\n```\n" + e.getMessage() + "\n```");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -565,27 +567,30 @@ public class FlowInputOutput {
|
||||
}
|
||||
|
||||
public void isDefault(boolean isDefault) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exceptions());
|
||||
}
|
||||
|
||||
public void setInput(final Input<?> input) {
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
}
|
||||
|
||||
public void resolveWithEnabled(boolean enabled) {
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithValue(@Nullable Object value) {
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithError(@Nullable ConstraintViolationException exception) {
|
||||
public void resolveWithError(@Nullable Set<InputOutputValidationException> exception) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
|
||||
markAsResolved();
|
||||
}
|
||||
private void resolveWithError(@Nullable InputOutputValidationException exception){
|
||||
resolveWithError(Collections.singleton(exception));
|
||||
}
|
||||
|
||||
private void markAsResolved() {
|
||||
this.isResolved = true;
|
||||
|
||||
@@ -143,6 +143,8 @@ public abstract class RunContext implements PropertyContext {
|
||||
@Deprecated(forRemoval = true)
|
||||
public abstract String tenantId();
|
||||
|
||||
public abstract TaskRunInfo taskRunInfo();
|
||||
|
||||
public abstract FlowInfo flowInfo();
|
||||
|
||||
/**
|
||||
@@ -190,7 +192,19 @@ public abstract class RunContext implements PropertyContext {
|
||||
*/
|
||||
public abstract LocalPath localPath();
|
||||
|
||||
public record TaskRunInfo(String executionId, String taskId, String taskRunId, Object value) {
|
||||
|
||||
}
|
||||
|
||||
public record FlowInfo(String tenantId, String namespace, String id, Integer revision) {
|
||||
public static FlowInfo from(Map<String, Object> flowInfoMap) {
|
||||
return new FlowInfo(
|
||||
(String) flowInfoMap.get("tenantId"),
|
||||
(String) flowInfoMap.get("namespace"),
|
||||
(String) flowInfoMap.get("id"),
|
||||
(Integer) flowInfoMap.get("revision")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -206,6 +220,11 @@ public abstract class RunContext implements PropertyContext {
|
||||
*/
|
||||
public abstract AclChecker acl();
|
||||
|
||||
/**
|
||||
* Get access to the Assets handler.
|
||||
*/
|
||||
public abstract AssetEmitter assets() throws IllegalVariableEvaluationException;
|
||||
|
||||
/**
|
||||
* Clone this run context for a specific plugin.
|
||||
* @return a new run context with the plugin configuration of the given plugin.
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
@@ -10,6 +11,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.With;
|
||||
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
@@ -21,8 +23,7 @@ public class WorkerTaskResult implements HasUID {
|
||||
List<TaskRun> dynamicTaskRuns;
|
||||
|
||||
public WorkerTaskResult(TaskRun taskRun) {
|
||||
this.taskRun = taskRun;
|
||||
this.dynamicTaskRuns = new ArrayList<>();
|
||||
this(taskRun, new ArrayList<>());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.micronaut.core.annotation.Nullable;
|
||||
import io.pebbletemplates.pebble.PebbleEngine;
|
||||
import io.pebbletemplates.pebble.extension.Extension;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.lexer.Syntax;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -37,6 +38,13 @@ public class PebbleEngineFactory {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithCustomSyntax(Syntax syntax, Class<? extends Extension> extension) {
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder()
|
||||
.syntax(syntax);
|
||||
this.applicationContext.getBeansOfType(extension).forEach(builder::extension);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
|
||||
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
|
||||
@@ -35,6 +35,10 @@ public final class YamlParser {
|
||||
return read(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(String input, Class<T> cls, Boolean strict) {
|
||||
return strict ? read(input, cls, type(cls)) : readNonStrict(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(Map<String, Object> input, Class<T> cls, Boolean strict) {
|
||||
ObjectMapper currentMapper = strict ? STRICT_MAPPER : NON_STRICT_MAPPER;
|
||||
|
||||
@@ -81,6 +85,13 @@ public final class YamlParser {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
private static <T> T readNonStrict(String input, Class<T> objectClass, String resource) {
|
||||
try {
|
||||
return NON_STRICT_MAPPER.readValue(input, objectClass);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
private static String formatYamlErrorMessage(String originalMessage, JsonProcessingException e) {
|
||||
StringBuilder friendlyMessage = new StringBuilder();
|
||||
if (originalMessage.contains("Expected a field name")) {
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.runners.AssetEmitter;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class AssetManagerFactory {
|
||||
public AssetEmitter of(boolean enabled) {
|
||||
return new AssetEmitter() {
|
||||
@Override
|
||||
public void upsert(Asset asset) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Asset> outputs() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
32
core/src/main/java/io/kestra/core/services/AssetService.java
Normal file
32
core/src/main/java/io/kestra/core/services/AssetService.java
Normal file
@@ -0,0 +1,32 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.AssetIdentifier;
|
||||
import io.kestra.core.models.assets.AssetUser;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class AssetService implements Runnable {
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
this.run();
|
||||
}
|
||||
|
||||
public void asyncUpsert(AssetUser assetUser, Asset asset) throws QueueException {
|
||||
// No-op
|
||||
}
|
||||
|
||||
public void assetLineage(AssetUser assetUser, List<AssetIdentifier> inputs, List<AssetIdentifier> outputs) throws QueueException {
|
||||
// No-op
|
||||
}
|
||||
|
||||
public void run() {
|
||||
// No-op
|
||||
}
|
||||
}
|
||||
@@ -754,7 +754,7 @@ public class ExecutionService {
|
||||
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
|
||||
Execution newExecution = execution;
|
||||
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED));
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withStateAndAttempt(State.Type.KILLED));
|
||||
}
|
||||
if (parentTaskRun.getParentTaskRunId() != null) {
|
||||
return killParentTaskruns(parentTaskRun, newExecution);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.test.flow;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -8,6 +9,7 @@ import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Getter
|
||||
@@ -25,5 +27,7 @@ public class TaskFixture {
|
||||
|
||||
private Map<String, Object> outputs;
|
||||
|
||||
private List<Asset> assets;
|
||||
|
||||
private Property<String> description;
|
||||
}
|
||||
|
||||
@@ -70,4 +70,12 @@ public class ListUtils {
|
||||
.map(Object::toString)
|
||||
.toList();
|
||||
}
|
||||
|
||||
public static <T> List<List<T>> partition(List<T> list, int size) {
|
||||
List<List<T>> parts = new ArrayList<>();
|
||||
for (int i = 0; i < list.size(); i += size) {
|
||||
parts.add(list.subList(i, Math.min(i + size, list.size())));
|
||||
}
|
||||
return parts;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,15 +19,7 @@ import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -155,6 +147,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
violations.addAll(assetsViolations(allTasks));
|
||||
|
||||
if (!invalidTasks.isEmpty()) {
|
||||
violations.add("Invalid output reference: use outputs[key-name] instead of outputs.key-name — keys with dashes require bracket notation, offending tasks:" +
|
||||
" [" + String.join(", ", invalidTasks) + "]");
|
||||
@@ -181,6 +175,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
}
|
||||
}
|
||||
|
||||
protected List<String> assetsViolations(List<Task> allTasks) {
|
||||
return allTasks.stream().filter(task -> task.getAssets() != null)
|
||||
.map(taskWithAssets -> "Task '" + taskWithAssets.getId() + "' can't have any `assets` because assets are only available in Enterprise Edition.")
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static boolean checkObjectFieldsWithPatterns(Object object, List<Pattern> patterns) {
|
||||
if (object == null) {
|
||||
return true;
|
||||
|
||||
@@ -2,10 +2,8 @@ package io.kestra.plugin.core.namespace;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.plugin.core.kv.Version;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@@ -26,28 +26,25 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Purge namespace files for one or multiple namespaces.",
|
||||
description = "This task purges namespace files (and their versions) stored in Kestra. You can restrict the purge to specific namespaces (or a namespace glob pattern), optionally include child namespaces, and filter files by a glob pattern. The purge strategy is controlled via `behavior` (e.g. keep the last N versions and/or delete versions older than a given date)."
|
||||
title = "Delete expired keys globally for a specific namespace.",
|
||||
description = "This task will delete expired keys from the Kestra KV store. By default, it will only delete expired keys, but you can choose to delete all keys by setting `expiredOnly` to false. You can also filter keys by a specific pattern and choose to include child namespaces."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Purge old versions of namespace files for a namespace tree.",
|
||||
title = "Delete expired keys globally for a specific namespace, with or without including child namespaces.",
|
||||
full = true,
|
||||
code = """
|
||||
id: purge_namespace_files
|
||||
id: purge_kv_store
|
||||
namespace: system
|
||||
|
||||
|
||||
tasks:
|
||||
- id: purge_files
|
||||
type: io.kestra.plugin.core.namespace.PurgeFiles
|
||||
- id: purge_kv
|
||||
type: io.kestra.plugin.core.kv.PurgeKV
|
||||
expiredOnly: true
|
||||
namespaces:
|
||||
- company
|
||||
includeChildNamespaces: true
|
||||
filePattern: "**/*.sql"
|
||||
behavior:
|
||||
type: version
|
||||
before: "2025-01-01T00:00:00Z"
|
||||
"""
|
||||
)
|
||||
}
|
||||
@@ -119,7 +116,7 @@ public class PurgeFiles extends Task implements PurgeTask<NamespaceFile>, Runnab
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
@Schema(
|
||||
title = "The number of purged namespace file versions"
|
||||
title = "The number of purged KV pairs"
|
||||
)
|
||||
private Long size;
|
||||
}
|
||||
|
||||
268
core/src/test/java/io/kestra/assets/assets/AssetTest.java
Normal file
268
core/src/test/java/io/kestra/assets/assets/AssetTest.java
Normal file
@@ -0,0 +1,268 @@
|
||||
package io.kestra.assets.assets;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.assets.*;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@MicronautTest
|
||||
public class AssetTest {
|
||||
@Test
|
||||
void custom() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String type = "MY_OWN_ASSET_TYPE";
|
||||
String displayName = "My own asset";
|
||||
String description = "This is my asset";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "data-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
type,
|
||||
displayName,
|
||||
description,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(CustomAsset.class);
|
||||
assertThat(asset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(asset.getId()).isEqualTo(id);
|
||||
assertThat(asset.getType()).isEqualTo(type);
|
||||
assertThat(asset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(asset.getDescription()).isEqualTo(description);
|
||||
assertThat(asset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void external() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String type = "EXTERNAL";
|
||||
String displayName = "External asset";
|
||||
String description = "This is an external asset";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "external-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
type,
|
||||
displayName,
|
||||
description,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(ExternalAsset.class);
|
||||
assertThat(asset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(asset.getId()).isEqualTo(id);
|
||||
assertThat(asset.getType()).isEqualTo(type);
|
||||
assertThat(asset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(asset.getDescription()).isEqualTo(description);
|
||||
assertThat(asset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void dataset() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String displayName = "My Dataset";
|
||||
String description = "This is my dataset";
|
||||
String system = "S3";
|
||||
String location = "s3://my-bucket/my-dataset";
|
||||
String format = "parquet";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "data-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
system: %s
|
||||
location: %s
|
||||
format: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
DatasetAsset.ASSET_TYPE,
|
||||
displayName,
|
||||
description,
|
||||
system,
|
||||
location,
|
||||
format,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(DatasetAsset.class);
|
||||
DatasetAsset datasetAsset = (DatasetAsset) asset;
|
||||
assertThat(datasetAsset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(datasetAsset.getId()).isEqualTo(id);
|
||||
assertThat(datasetAsset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(datasetAsset.getDescription()).isEqualTo(description);
|
||||
assertThat(datasetAsset.getSystem()).isEqualTo(system);
|
||||
assertThat(datasetAsset.getLocation()).isEqualTo(location);
|
||||
assertThat(datasetAsset.getFormat()).isEqualTo(format);
|
||||
assertThat(datasetAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void file() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String displayName = "My File";
|
||||
String description = "This is my file";
|
||||
String system = "local";
|
||||
String path = "/data/my-file.txt";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "file-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
system: %s
|
||||
path: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
FileAsset.ASSET_TYPE,
|
||||
displayName,
|
||||
description,
|
||||
system,
|
||||
path,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(FileAsset.class);
|
||||
FileAsset fileAsset = (FileAsset) asset;
|
||||
assertThat(fileAsset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(fileAsset.getId()).isEqualTo(id);
|
||||
assertThat(fileAsset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(fileAsset.getDescription()).isEqualTo(description);
|
||||
assertThat(fileAsset.getSystem()).isEqualTo(system);
|
||||
assertThat(fileAsset.getPath()).isEqualTo(path);
|
||||
assertThat(fileAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void table() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String displayName = "My Table";
|
||||
String description = "This is my table";
|
||||
String system = "postgres";
|
||||
String database = "mydb";
|
||||
String schema = "my_schema";
|
||||
String name = "mytable";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "table-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
system: %s
|
||||
database: %s
|
||||
schema: %s
|
||||
name: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
TableAsset.ASSET_TYPE,
|
||||
displayName,
|
||||
description,
|
||||
system,
|
||||
database,
|
||||
schema,
|
||||
name,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(TableAsset.class);
|
||||
TableAsset tableAsset = (TableAsset) asset;
|
||||
assertThat(tableAsset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(tableAsset.getId()).isEqualTo(id);
|
||||
assertThat(tableAsset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(tableAsset.getDescription()).isEqualTo(description);
|
||||
assertThat(tableAsset.getSystem()).isEqualTo(system);
|
||||
assertThat(tableAsset.getDatabase()).isEqualTo(database);
|
||||
assertThat(tableAsset.getSchema()).isEqualTo(schema);
|
||||
assertThat(tableAsset.getName()).isEqualTo(name);
|
||||
assertThat(tableAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void vm() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String displayName = "My VM";
|
||||
String description = "This is my vm";
|
||||
String provider = "aws";
|
||||
String region = "us-east-1";
|
||||
String state = "running";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "vm-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
provider: %s
|
||||
region: %s
|
||||
state: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
VmAsset.ASSET_TYPE,
|
||||
displayName,
|
||||
description,
|
||||
provider,
|
||||
region,
|
||||
state,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(VmAsset.class);
|
||||
VmAsset vmAsset = (VmAsset) asset;
|
||||
assertThat(vmAsset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(vmAsset.getId()).isEqualTo(id);
|
||||
assertThat(vmAsset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(vmAsset.getDescription()).isEqualTo(description);
|
||||
assertThat(vmAsset.getProvider()).isEqualTo(provider);
|
||||
assertThat(vmAsset.getRegion()).isEqualTo(region);
|
||||
assertThat(vmAsset.getState()).isEqualTo(state);
|
||||
assertThat(vmAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
}
|
||||
@@ -1,16 +1,14 @@
|
||||
package io.kestra.core.contexts;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class KestraContextTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
package io.kestra.core.contexts;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -10,7 +9,7 @@ import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest(environments = "maven")
|
||||
@MicronautTest(environments = "maven")
|
||||
class MavenPluginRepositoryConfigTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -10,7 +10,7 @@ import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.plugin.core.flow.Dag;
|
||||
import io.kestra.plugin.core.flow.Subflow;
|
||||
import io.kestra.plugin.core.state.Set;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
class DocumentationGeneratorTest {
|
||||
@Inject
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.models;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -12,7 +13,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class LabelTest {
|
||||
@Inject
|
||||
private ModelValidator modelValidator;
|
||||
|
||||
@@ -1,14 +1,20 @@
|
||||
package io.kestra.core.models.property;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.serializers.FileSerde;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.event.Level;
|
||||
import reactor.core.publisher.Flux;
|
||||
@@ -19,13 +25,14 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static java.util.Map.entry;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class PropertyTest {
|
||||
|
||||
@Inject
|
||||
@@ -362,10 +369,43 @@ class PropertyTest {
|
||||
assertThat(output.getMessages().getFirst().getValue()).isEqualTo("value1");
|
||||
}
|
||||
|
||||
@Test
|
||||
void jsonSubtype() throws JsonProcessingException, IllegalVariableEvaluationException {
|
||||
Optional<WithSubtype> rendered = runContextFactory.of().render(
|
||||
Property.<WithSubtype>ofExpression(JacksonMapper.ofJson().writeValueAsString(new MySubtype()))
|
||||
).as(WithSubtype.class);
|
||||
|
||||
assertThat(rendered).isPresent();
|
||||
assertThat(rendered.get()).isInstanceOf(MySubtype.class);
|
||||
|
||||
List<WithSubtype> renderedList = runContextFactory.of().render(
|
||||
Property.<List<WithSubtype>>ofExpression(JacksonMapper.ofJson().writeValueAsString(List.of(new MySubtype())))
|
||||
).asList(WithSubtype.class);
|
||||
assertThat(renderedList).hasSize(1);
|
||||
assertThat(renderedList.get(0)).isInstanceOf(MySubtype.class);
|
||||
}
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(value = MySubtype.class, name = "mySubtype")
|
||||
})
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Introspected
|
||||
public abstract static class WithSubtype {
|
||||
abstract public String getType();
|
||||
}
|
||||
|
||||
@Getter
|
||||
public static class MySubtype extends WithSubtype {
|
||||
private final String type = "mySubtype";
|
||||
}
|
||||
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
private static class TestObj {
|
||||
private String key;
|
||||
private String value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
package io.kestra.core.models.property;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
@@ -28,7 +27,7 @@ import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class URIFetcherTest {
|
||||
@Inject
|
||||
private StorageInterface storage;
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
package io.kestra.core.models.triggers;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -17,7 +16,7 @@ import static io.kestra.core.models.triggers.StatefulTriggerService.*;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class StatefulTriggerInterfaceTest {
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import io.kestra.plugin.core.condition.ExecutionFlow;
|
||||
@@ -23,7 +24,7 @@ import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractMultipleConditionStorageTest {
|
||||
private static final String NAMESPACE = "io.kestra.unit";
|
||||
|
||||
|
||||
@@ -1,17 +1,15 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class PluginConfigurationTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -95,4 +95,4 @@ class PluginDeserializerTest {
|
||||
|
||||
public record TestPlugin(String type) implements Plugin {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.reporter.Reportable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -11,7 +11,7 @@ import java.time.ZoneId;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractFeatureUsageReportTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.collectors.ServiceUsage;
|
||||
import io.kestra.core.reporter.Reportable;
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
@@ -8,6 +7,7 @@ import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceInstance;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -20,7 +20,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractServiceUsageReportTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.plugin.core.http.Trigger;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.kestra.plugin.core.trigger.Schedule;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -13,7 +13,7 @@ import java.time.Instant;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class PluginMetricReportTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -16,7 +16,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class SystemInformationReportTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.devskiller.friendly_id.FriendlyId;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.QueryFilter.Field;
|
||||
@@ -33,6 +32,7 @@ import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import io.micronaut.http.HttpStatus;
|
||||
import io.micronaut.http.exceptions.HttpStatusException;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
@@ -58,7 +58,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractExecutionRepositoryTest {
|
||||
public static final String NAMESPACE = "io.kestra.unittest";
|
||||
public static final String FLOW = "full";
|
||||
|
||||
@@ -10,7 +10,7 @@ import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.event.Level;
|
||||
@@ -28,7 +28,7 @@ import java.util.Objects;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractExecutionServiceTest {
|
||||
@Inject
|
||||
ExecutionService executionService;
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.QueryFilter.Field;
|
||||
@@ -26,6 +25,7 @@ import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.micronaut.context.event.ApplicationEventListener;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -49,7 +49,7 @@ import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractFlowRepositoryTest {
|
||||
public static final String TEST_NAMESPACE = "io.kestra.unittest";
|
||||
public static final String TEST_FLOW_ID = "test";
|
||||
|
||||
@@ -3,8 +3,8 @@ package io.kestra.core.repositories;
|
||||
import io.kestra.core.models.topologies.FlowNode;
|
||||
import io.kestra.core.models.topologies.FlowRelation;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -12,7 +12,7 @@ import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractFlowTopologyRepositoryTest {
|
||||
@Inject
|
||||
private FlowTopologyRepositoryInterface flowTopologyRepository;
|
||||
|
||||
@@ -4,8 +4,8 @@ import io.kestra.core.models.FetchVersion;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -18,7 +18,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractKvMetadataRepositoryTest {
|
||||
@Inject
|
||||
protected KvMetadataRepositoryInterface kvMetadataRepositoryInterface;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.QueryFilter.Field;
|
||||
import io.kestra.core.models.QueryFilter.Op;
|
||||
@@ -14,9 +13,9 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.dashboard.data.Executions;
|
||||
import io.kestra.plugin.core.dashboard.data.Logs;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
@@ -37,7 +36,7 @@ import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractLogRepositoryTest {
|
||||
@Inject
|
||||
protected LogRepositoryInterface logRepository;
|
||||
|
||||
@@ -10,10 +10,9 @@ import io.kestra.core.models.executions.metrics.MetricAggregations;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.ZonedDateTime;
|
||||
@@ -21,7 +20,7 @@ import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractMetricRepositoryTest {
|
||||
@Inject
|
||||
protected MetricRepositoryInterface metricRepository;
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.FetchVersion;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
@@ -21,7 +21,7 @@ import java.util.stream.Stream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractNamespaceFileMetadataRepositoryTest {
|
||||
@Inject
|
||||
protected NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepositoryInterface;
|
||||
|
||||
@@ -2,8 +2,8 @@ package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.utils.VersionProvider;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -12,7 +12,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractSettingRepositoryTest {
|
||||
@Inject
|
||||
protected SettingRepositoryInterface settingRepository;
|
||||
|
||||
@@ -10,7 +10,7 @@ import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.context.event.ApplicationEventListener;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import java.time.Duration;
|
||||
@@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
public abstract class AbstractTemplateRepositoryTest {
|
||||
@Inject
|
||||
protected TemplateRepositoryInterface templateRepository;
|
||||
|
||||
@@ -12,6 +12,7 @@ import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
@@ -28,7 +29,7 @@ import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest(transactional = false)
|
||||
public abstract class AbstractTriggerRepositoryTest {
|
||||
private static final String TEST_NAMESPACE = "io.kestra.unittest";
|
||||
|
||||
|
||||
@@ -78,6 +78,7 @@ public abstract class AbstractRunnerConcurrencyTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@FlakyTest(description = "Only flaky in CI")
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-killed")
|
||||
void flowConcurrencyKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyKilled("flow-concurrency-killed");
|
||||
|
||||
@@ -81,6 +81,9 @@ public abstract class AbstractRunnerTest {
|
||||
@Inject
|
||||
private AfterExecutionTestCase afterExecutionTestCase;
|
||||
|
||||
@Inject
|
||||
private AssetTestCase assetTestCase;
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/full.yaml")
|
||||
void full(Execution execution) {
|
||||
@@ -558,4 +561,10 @@ public abstract class AbstractRunnerTest {
|
||||
public void shouldCallTasksAfterListener(Execution execution) {
|
||||
afterExecutionTestCase.shouldCallTasksAfterListener(execution);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = "flows/valids/assets.yaml", tenantId = "abstract-runner-test-assets")
|
||||
public void assets() throws QueueException, TimeoutException {
|
||||
assetTestCase.staticAndDynamicAssets("abstract-runner-test-assets");
|
||||
}
|
||||
}
|
||||
|
||||
220
core/src/test/java/io/kestra/core/runners/AssetTestCase.java
Normal file
220
core/src/test/java/io/kestra/core/runners/AssetTestCase.java
Normal file
@@ -0,0 +1,220 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.assets.*;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.services.AssetManagerFactory;
|
||||
import io.kestra.core.services.AssetService;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.context.annotation.Replaces;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Singleton
|
||||
public class AssetTestCase {
|
||||
@Inject
|
||||
private AssetService mockedAssetService;
|
||||
@Inject
|
||||
private TestRunnerUtils testRunnerUtils;
|
||||
|
||||
private static final List<Asset> capturedAsyncCreate = new CopyOnWriteArrayList<>();
|
||||
private static final List<Pair<AssetUser, Pair<List<AssetIdentifier>, List<AssetIdentifier>>>> capturedAssetLineage = new CopyOnWriteArrayList<>();
|
||||
private static final List<Asset> capturedEnabledDynamicAssets = new CopyOnWriteArrayList<>();
|
||||
private static final List<Asset> capturedDisabledDynamicAssets = new CopyOnWriteArrayList<>();
|
||||
|
||||
public void staticAndDynamicAssets(String tenantId) throws QueueException, TimeoutException {
|
||||
Execution execution = testRunnerUtils.runOne(tenantId, "io.kestra.tests", "assets");
|
||||
|
||||
Mockito.verify(mockedAssetService, Mockito.times(1)).run();
|
||||
|
||||
// region assets-in-taskruns
|
||||
List<TaskRun> taskRuns = execution.getTaskRunList().stream().toList();
|
||||
assertThat(taskRuns).map(TaskRun::getAssets).map(AssetsInOut::getInputs).satisfiesExactlyInAnyOrder(
|
||||
assets -> assertThat(assets).isEmpty(),
|
||||
assets -> assertThat(assets).isEmpty(),
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-non-existing-input-asset-uid")
|
||||
),
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-existing-input-uid")
|
||||
)
|
||||
);
|
||||
assertThat(taskRuns).map(TaskRun::getAssets).map(AssetsInOut::getOutputs).satisfiesExactlyInAnyOrder(
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
AssetTestCase::assertEnabledDynamicAsset,
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-emit-asset")
|
||||
),
|
||||
assets -> assertThat(assets).isEmpty(),
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-non-existing-output-uid")
|
||||
),
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-existing-output-uid")
|
||||
)
|
||||
);
|
||||
// endregion
|
||||
|
||||
// region dynamic-assets
|
||||
assertThat(capturedEnabledDynamicAssets).anySatisfy(AssetTestCase::assertEnabledDynamicAsset);
|
||||
assertThat(capturedEnabledDynamicAssets).noneMatch(asset -> asset.getId().equals("assets-flow-emit-asset-auto-false-uid"));
|
||||
|
||||
assertThat(capturedDisabledDynamicAssets).anySatisfy(AssetTestCase::assertDisabledDynamicAsset);
|
||||
assertThat(capturedDisabledDynamicAssets).noneMatch(asset -> asset.getId().equals("assets-flow-emit-asset-uid"));
|
||||
// endregion
|
||||
|
||||
// region asset-creation
|
||||
assertThat(capturedAsyncCreate).satisfiesExactlyInAnyOrder(
|
||||
AssetTestCase::assertEnabledDynamicAsset,
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-emit-asset"),
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-non-existing-output-uid"),
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-existing-output-uid")
|
||||
);
|
||||
assertThat(capturedAsyncCreate).noneMatch(asset -> asset.getId().equals("assets-flow-emit-asset-auto-false-uid"));
|
||||
// endregion
|
||||
|
||||
// region asset-lineage
|
||||
assertThat(capturedAssetLineage).satisfiesExactlyInAnyOrder(
|
||||
assetLineage -> {
|
||||
AssetUser assetUser = assetLineage.getLeft();
|
||||
assertAssetExecution(tenantId, assetUser, execution);
|
||||
assertThat(assetUser.taskRunId()).isEqualTo(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getTaskId().equals("emit-asset"))
|
||||
.findFirst().map(TaskRun::getId).orElseThrow());
|
||||
// No input assets
|
||||
assertThat(assetLineage.getRight().getLeft()).isEmpty();
|
||||
assertThat(assetLineage.getRight().getRight()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-emit-asset-uid"),
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-emit-asset")
|
||||
);
|
||||
},
|
||||
// No lineage for the second taskrun due to `enableAuto: false`, below is for the third one
|
||||
assetLineage -> {
|
||||
AssetUser assetUser = assetLineage.getLeft();
|
||||
assertAssetExecution(tenantId, assetUser, execution);
|
||||
assertThat(assetUser.taskRunId()).isEqualTo(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getTaskId().equals("static-asset-non-existing-input"))
|
||||
.findFirst().map(TaskRun::getId).orElseThrow());
|
||||
assertThat(assetLineage.getRight().getLeft()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-non-existing-input-asset-uid")
|
||||
);
|
||||
assertThat(assetLineage.getRight().getRight()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-non-existing-output-uid")
|
||||
);
|
||||
},
|
||||
assetLineage -> {
|
||||
AssetUser assetUser = assetLineage.getLeft();
|
||||
assertAssetExecution(tenantId, assetUser, execution);
|
||||
assertThat(assetUser.taskRunId()).isEqualTo(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getTaskId().equals("static-asset-existing-input"))
|
||||
.findFirst().map(TaskRun::getId).orElseThrow());
|
||||
assertThat(assetLineage.getRight().getLeft()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-existing-input-uid")
|
||||
);
|
||||
assertThat(assetLineage.getRight().getRight()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-existing-output-uid")
|
||||
);
|
||||
}
|
||||
);
|
||||
// endregion
|
||||
}
|
||||
|
||||
private static void assertAssetExecution(String tenantId, AssetUser assetUser, Execution execution) {
|
||||
assertThat(assetUser.tenantId()).isEqualTo(tenantId);
|
||||
assertThat(assetUser.namespace()).isEqualTo("io.kestra.tests");
|
||||
assertThat(assetUser.flowId()).isEqualTo("assets");
|
||||
assertThat(assetUser.flowRevision()).isEqualTo(execution.getFlowRevision());
|
||||
assertThat(assetUser.executionId()).isEqualTo(execution.getId());
|
||||
}
|
||||
|
||||
private static void assertEnabledDynamicAsset(Asset asset) {
|
||||
assertThat(asset).isInstanceOf(TableAsset.class);
|
||||
TableAsset tableAsset = (TableAsset) asset;
|
||||
assertThat(tableAsset.getId()).isEqualTo("assets-flow-emit-asset-uid");
|
||||
assertThat(tableAsset.getType()).isEqualTo(TableAsset.ASSET_TYPE);
|
||||
assertThat(tableAsset.getDisplayName()).isEqualTo("My Table Asset");
|
||||
assertThat(tableAsset.getDescription()).isEqualTo("This is my table asset");
|
||||
assertThat(tableAsset.getSystem()).isEqualTo("MY_DB_SYSTEM");
|
||||
assertThat(tableAsset.getDatabase()).isEqualTo("my_database");
|
||||
assertThat(tableAsset.getSchema()).isEqualTo("my_schema");
|
||||
assertThat(tableAsset.getName()).isEqualTo("my_table");
|
||||
assertThat(tableAsset.getMetadata().get("owner")).isEqualTo("data-team");
|
||||
}
|
||||
|
||||
private static void assertDisabledDynamicAsset(Asset asset) {
|
||||
assertThat(asset).isInstanceOf(TableAsset.class);
|
||||
TableAsset tableAsset = (TableAsset) asset;
|
||||
assertThat(tableAsset.getId()).isEqualTo("assets-flow-emit-asset-auto-false-uid");
|
||||
assertThat(tableAsset.getType()).isEqualTo(TableAsset.ASSET_TYPE);
|
||||
assertThat(tableAsset.getDisplayName()).isEqualTo("My Table Asset");
|
||||
assertThat(tableAsset.getDescription()).isEqualTo("This is my table asset");
|
||||
assertThat(tableAsset.getSystem()).isEqualTo("MY_DB_SYSTEM");
|
||||
assertThat(tableAsset.getDatabase()).isEqualTo("my_database");
|
||||
assertThat(tableAsset.getSchema()).isEqualTo("my_schema");
|
||||
assertThat(tableAsset.getName()).isEqualTo("my_table");
|
||||
assertThat(tableAsset.getMetadata().get("owner")).isEqualTo("data-team");
|
||||
}
|
||||
|
||||
@Factory
|
||||
static class MockFactory {
|
||||
@Singleton
|
||||
@Replaces(AssetService.class)
|
||||
public AssetService mockedAssetService() {
|
||||
return Mockito.spy(new AssetService() {
|
||||
@Override
|
||||
public void asyncUpsert(AssetUser assetUser, Asset asset) {
|
||||
capturedAsyncCreate.add(asset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assetLineage(AssetUser assetUser, List<AssetIdentifier> inputs, List<AssetIdentifier> outputs) {
|
||||
capturedAssetLineage.add(Pair.of(assetUser, Pair.of(inputs, outputs)));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Singleton
|
||||
@Replaces(AssetManagerFactory.class)
|
||||
public AssetManagerFactory mockedAssetManagerFactory() {
|
||||
return Mockito.spy(new AssetManagerFactory() {
|
||||
@Override
|
||||
public AssetEmitter of(boolean enabled) {
|
||||
if (!enabled) {
|
||||
return new AssetEmitter() {
|
||||
@Override
|
||||
public void upsert(Asset asset) {
|
||||
capturedDisabledDynamicAssets.add(asset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Asset> outputs() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return new AssetEmitter() {
|
||||
private final List<Asset> localCapturedAssets = new CopyOnWriteArrayList<>();
|
||||
|
||||
@Override
|
||||
public void upsert(Asset asset) {
|
||||
localCapturedAssets.add(asset);
|
||||
capturedEnabledDynamicAssets.add(asset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Asset> outputs() {
|
||||
return localCapturedAssets;
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
@@ -466,4 +467,20 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.getTaskRunList()).hasSize(2);
|
||||
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/each-pause.yaml"})
|
||||
void killExecutionWithFlowableTask() throws Exception {
|
||||
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "each-pause");
|
||||
|
||||
TaskRun childTaskRun = execution.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("pause")).toList().getFirst();
|
||||
|
||||
Execution killed = executionService.killParentTaskruns(childTaskRun,execution);
|
||||
|
||||
TaskRun parentTaskRun = killed.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("each_task")).toList().getFirst();
|
||||
|
||||
assertThat(parentTaskRun.getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(parentTaskRun.getAttempts().getLast().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.multipart.CompletedFileUpload;
|
||||
import io.micronaut.http.multipart.CompletedPart;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
@@ -42,7 +43,7 @@ import java.util.Optional;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class FlowInputOutputTest {
|
||||
|
||||
private static final String TEST_SECRET_VALUE = "test-secret-value";
|
||||
@@ -239,7 +240,7 @@ class FlowInputOutputTest {
|
||||
// Then
|
||||
Assertions.assertEquals(2, values.size());
|
||||
Assertions.assertFalse(values.get(1).enabled());
|
||||
Assertions.assertNotNull(values.get(1).exception());
|
||||
Assertions.assertNotNull(values.get(1).exceptions());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -257,7 +258,7 @@ class FlowInputOutputTest {
|
||||
List<InputAndValue> values = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, data).block();
|
||||
|
||||
// Then
|
||||
Assertions.assertNull(values.getFirst().exception());
|
||||
Assertions.assertNull(values.getFirst().exceptions());
|
||||
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.CharStreams;
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -137,8 +138,8 @@ public class InputsTest {
|
||||
void missingRequired() {
|
||||
HashMap<String, Object> inputs = new HashMap<>(InputsTest.inputs);
|
||||
inputs.put("string", null);
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(inputs, MAIN_TENANT));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `string`, missing required input, but received `null`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(inputs, MAIN_TENANT));
|
||||
assertThat(e.getMessage()).contains("Missing required input:string");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -232,9 +233,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("validatedString", "foo");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant4"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant4"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedString`, it must match the pattern");
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `validatedString`. Cause: it must match the pattern");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -242,15 +243,15 @@ public class InputsTest {
|
||||
void inputValidatedIntegerBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedInt", "9");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant5"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be more than `10`, but received `9`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant5"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be more than `10`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedInt", "21");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant5"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant5"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be less than `20`, but received `21`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be less than `20`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -258,15 +259,15 @@ public class InputsTest {
|
||||
void inputValidatedDateBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedDate", "2022-01-01");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant6"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be after `2023-01-01`, but received `2022-01-01`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant6"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be after `2023-01-01`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedDate", "2024-01-01");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant6"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant6"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be before `2023-12-31`, but received `2024-01-01`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be before `2023-12-31`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -274,15 +275,15 @@ public class InputsTest {
|
||||
void inputValidatedDateTimeBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedDateTime", "2022-01-01T00:00:00Z");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant7"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be after `2023-01-01T00:00:00Z`, but received `2022-01-01T00:00:00Z`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant7"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be after `2023-01-01T00:00:00Z`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedDateTime", "2024-01-01T00:00:00Z");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant7"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant7"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be before `2023-12-31T23:59:59Z`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be before `2023-12-31T23:59:59Z`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -290,15 +291,15 @@ public class InputsTest {
|
||||
void inputValidatedDurationBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedDuration", "PT1S");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant8"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be more than `PT10S`, but received `PT1S`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant8"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be more than `PT10S`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedDuration", "PT30S");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant8"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant8"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be less than `PT20S`, but received `PT30S`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be less than `PT20S`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -306,15 +307,15 @@ public class InputsTest {
|
||||
void inputValidatedFloatBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedFloat", "0.01");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant9"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be more than `0.1`, but received `0.01`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant9"));
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be more than `0.1`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedFloat", "1.01");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant9"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant9"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be less than `0.5`, but received `1.01`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be less than `0.5`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -322,15 +323,15 @@ public class InputsTest {
|
||||
void inputValidatedTimeBadValue() {
|
||||
HashMap<String, Object> mapMin = new HashMap<>(inputs);
|
||||
mapMin.put("validatedTime", "00:00:01");
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant10"));
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be after `01:00`, but received `00:00:01`");
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant10"));
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `validatedTime`. Cause: it must be after `01:00`");
|
||||
|
||||
HashMap<String, Object> mapMax = new HashMap<>(inputs);
|
||||
mapMax.put("validatedTime", "14:00:00");
|
||||
|
||||
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant10"));
|
||||
e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant10"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be before `11:59:59`, but received `14:00:00`");
|
||||
assertThat(e.getMessage()).contains("Invalid value for input `validatedTime`. Cause: it must be before `11:59:59`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -339,9 +340,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("uri", "http:/bla");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant11"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant11"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `uri`, Expected `URI` but received `http:/bla`, but received `http:/bla`");
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `uri`. Cause: Invalid URI format." );
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -350,9 +351,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("enum", "INVALID");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant12"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant12"));
|
||||
|
||||
assertThat(e.getMessage()).isEqualTo("enum: Invalid input for `enum`, it must match the values `[ENUM_VALUE, OTHER_ONE]`, but received `INVALID`");
|
||||
assertThat(e.getMessage()).isEqualTo("Invalid value for input `enum`. Cause: it must match the values `[ENUM_VALUE, OTHER_ONE]`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -361,9 +362,9 @@ public class InputsTest {
|
||||
HashMap<String, Object> map = new HashMap<>(inputs);
|
||||
map.put("array", "[\"s1\", \"s2\"]");
|
||||
|
||||
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant13"));
|
||||
InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant13"));
|
||||
|
||||
assertThat(e.getMessage()).contains("Invalid input for `array`, Unable to parse array element as `INT` on `s1`, but received `[\"s1\", \"s2\"]`");
|
||||
assertThat(e.getMessage()).contains( "Invalid value for input `array`. Cause: Unable to parse array element as `INT` on `s1`");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -467,7 +468,20 @@ public class InputsTest {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat((String) execution.findTaskRunsByTaskId("file").getFirst().getOutputs().get("value")).isEqualTo(file.toString());
|
||||
}
|
||||
@Test
|
||||
@LoadFlows(value = "flows/invalids/inputs-with-multiple-constraint-violations.yaml")
|
||||
void multipleConstraintViolations() {
|
||||
InputOutputValidationException ex = assertThrows(InputOutputValidationException.class, ()-> runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "inputs-with-multiple-constraint-violations", null,
|
||||
(f, e) ->flowIO.readExecutionInputs(f, e , Map.of("multi", List.of("F", "H")) )));
|
||||
|
||||
List<String> messages = Arrays.asList(ex.getMessage().split(System.lineSeparator()));
|
||||
|
||||
assertThat(messages).containsExactlyInAnyOrder(
|
||||
"Invalid value for input `multi`. Cause: you can't define both `values` and `options`",
|
||||
"Invalid value for input `multi`. Cause: value `F` doesn't match the values `[A, B, C]`",
|
||||
"Invalid value for input `multi`. Cause: value `H` doesn't match the values `[A, B, C]`"
|
||||
);
|
||||
}
|
||||
private URI createFile() throws IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
Files.write(tempFile.toPath(), "Hello World".getBytes());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -71,6 +72,6 @@ public class NoEncryptionConfiguredTest implements TestPropertyProvider {
|
||||
.flowId(flow.getId())
|
||||
.build();
|
||||
|
||||
assertThrows(ConstraintViolationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs));
|
||||
assertThrows(InputOutputValidationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
@@ -24,6 +23,7 @@ import io.kestra.core.utils.IdUtils;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
@@ -36,7 +36,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class RunVariablesTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -14,7 +14,7 @@ import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class VariableRendererTest {
|
||||
|
||||
@Inject
|
||||
|
||||
@@ -6,7 +6,7 @@ import com.google.common.collect.ImmutableSet;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.utils.Rethrow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
@@ -18,7 +18,7 @@ import jakarta.inject.Inject;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class PebbleVariableRendererTest {
|
||||
@Inject
|
||||
VariableRenderer variableRenderer;
|
||||
|
||||
@@ -6,7 +6,7 @@ import com.google.common.collect.ImmutableSet;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -15,7 +15,7 @@ import java.util.Collections;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
@Property(name = "kestra.variables.recursive-rendering", value = "true")
|
||||
class RecursivePebbleVariableRendererTest {
|
||||
@Inject
|
||||
|
||||
@@ -3,7 +3,7 @@ package io.kestra.core.runners.pebble.functions;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -13,7 +13,7 @@ import java.util.Map;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class EncryptDecryptFunctionTest {
|
||||
@Inject
|
||||
private VariableRenderer variableRenderer;
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.repositories.LogRepositoryInterface;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -18,7 +18,7 @@ import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
@Property(name = "kestra.server-type", value = "WORKER")
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
class ErrorLogsFunctionTest {
|
||||
|
||||
@@ -4,7 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -13,7 +13,7 @@ import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class FetchContextFunctionTest {
|
||||
@Inject
|
||||
VariableRenderer variableRenderer;
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -30,7 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
@KestraTest(rebuildContext = true)
|
||||
@MicronautTest(rebuildContext = true)
|
||||
class FileExistsFunctionTest {
|
||||
|
||||
private static final String NAMESPACE = "my.namespace";
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -27,10 +26,9 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.hibernate.validator.internal.util.Contracts.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest(rebuildContext = true)
|
||||
@MicronautTest(rebuildContext = true)
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
public class FileSizeFunctionTest {
|
||||
private static final String FLOW = "flow";
|
||||
|
||||
@@ -5,14 +5,14 @@ import java.util.Map;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class FileURIFunctionTest {
|
||||
@Inject
|
||||
private VariableRenderer variableRenderer;
|
||||
|
||||
@@ -2,11 +2,11 @@ package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.serializers.FileSerde;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -21,7 +21,7 @@ import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@MicronautTest
|
||||
class FromIonFunctionTest {
|
||||
@Inject
|
||||
VariableRenderer variableRenderer;
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user