Compare commits

...

10 Commits

Author SHA1 Message Date
Loïc Mathieu
9067ec9942 feat(assets): add an AssetShipper task 2025-12-23 17:05:02 +01:00
Loïc Mathieu
6c07ecd3d4 feat(assets): continue Assets implementation 2025-12-23 13:31:46 +01:00
brian.mulier
950223e0a8 feat(assets): introduce Assets 2025-12-23 11:31:31 +01:00
Roman Acevedo
d48f3b9bd9 fix: concurrency-limit was included in Flows and Executions openapis #13801 2025-12-23 11:08:58 +01:00
Mustafa Tarek
291fba3281 feat(core): add trigger state filter (#12779)
* feat(core): add trigger state filter

* feat(ui): add translations for trigger state filter

* fix(core): default to return all triggers if no state match to either enabled or disabled

* refactor(ui): replace search text with drop down select with two states enabled and disabled with adding translations to both

* refactor(ui): remove all translations except en.json

* feat(core): add trigger state filter

* feat(ui): add translations for trigger state filter

* fix(core): default to return all triggers if no state match to either enabled or disabled

* refactor(ui): replace search text with drop down select with two states enabled and disabled with adding translations to both

* refactor(ui): remove all translations except en.json

* fix(ui): resolve merge conflict at triggerFilter.ts

* feat(core): add disabled column for triggers table

* refactor(core): change trigger state implementation to check against disabled column directly instead of json value

* chore(system): update triggers_disabled.sql migration versions

* fix(translations): update translations

* fix(core): remove duplicates

---------

Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-12-23 11:03:15 +01:00
github-actions[bot]
db3b3236ac chore(core): localize to languages other than english (#13798)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-12-22 15:31:30 +01:00
Miloš Paunović
5a8a631b47 feat(core): improve left menu structure (#13684)
Related to https://github.com/kestra-io/kestra-ee/pull/6152.

Closes https://github.com/kestra-io/kestra-ee/issues/5415.
2025-12-22 15:27:58 +01:00
Malay Dewangan
2da191896f feat(): introduce notification utility service for plugins 2025-12-22 19:40:20 +05:30
Roman Acevedo
111026369b test: fix gradle :test task being run twice
because first :unitTest task was implicitly depending on :test because of dependency on :jacocoTestReport
2025-12-22 14:45:08 +01:00
Florian Hussonnois
e3a0e59e9c chore(test): rework and fix gradle check phase #13425
- cleanup and refactor gradle :check Task to have more control over it
- separate integration test from unit test in gradle :check
- By default, all tests annotated with KestraTest are considered to be integration-tests (for the moment).
- fix: gradle :check to handle all test failure cases (this should help for https://github.com/kestra-io/kestra-ee/issues/6066)
- fix core/src/main/java/io/kestra/core/plugins/serdes/PluginDeserializer.java checkState() to work regardless of test ordering (if a @MicronautTest ran before a Junit only test if could lead to a fail where KestraContext was already init and required a Bean)

- advance on https://www.notion.so/kestra-io/Flaky-tests-and-KestraTest-2c636907f7b580fb9e39fb0ca62eb473?source=copy_link#2c636907f7b5805b9564ef4d6ba6f80b
2025-12-22 12:44:57 +01:00
74 changed files with 2519 additions and 484 deletions

View File

@@ -171,13 +171,22 @@ allprojects {
subprojects {subProj ->
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
apply plugin: "com.adarshr.test-logger"
apply plugin: 'jacoco'
java {
sourceCompatibility = targetJavaVersion
targetCompatibility = targetJavaVersion
}
configurations {
agent {
canBeResolved = true
canBeConsumed = true
}
}
dependencies {
// Platform
testAnnotationProcessor enforcedPlatform(project(":platform"))
@@ -204,9 +213,16 @@ subprojects {subProj ->
//assertj
testImplementation 'org.assertj:assertj-core'
agent "org.aspectj:aspectjweaver:1.9.25.1"
testImplementation platform("io.qameta.allure:allure-bom")
testImplementation "io.qameta.allure:allure-junit5"
}
def commonTestConfig = { Test t ->
t.ignoreFailures = true
// set Xmx for test workers
t.maxHeapSize = '4g'
@@ -232,6 +248,52 @@ subprojects {subProj ->
// }
}
tasks.register('integrationTest', Test) { Test t ->
description = 'Runs integration tests'
group = 'verification'
useJUnitPlatform {
includeTags 'integration'
}
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
// Integration tests typically not parallel (but you can enable)
maxParallelForks = 1
commonTestConfig(t)
}
tasks.register('unitTest', Test) { Test t ->
description = 'Runs unit tests'
group = 'verification'
useJUnitPlatform {
excludeTags 'flaky', 'integration'
}
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(t)
}
tasks.register('flakyTest', Test) { Test t ->
group = 'verification'
description = 'Runs tests tagged @Flaky but does not fail the build.'
@@ -239,7 +301,6 @@ subprojects {subProj ->
useJUnitPlatform {
includeTags 'flaky'
}
ignoreFailures = true
reports {
junitXml.required = true
@@ -249,10 +310,13 @@ subprojects {subProj ->
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
}
commonTestConfig(t)
}
test {
// test task (default)
tasks.named('test', Test) { Test t ->
group = 'verification'
description = 'Runs all non-flaky tests.'
useJUnitPlatform {
excludeTags 'flaky'
}
@@ -263,10 +327,12 @@ subprojects {subProj ->
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(it)
commonTestConfig(t)
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
}
finalizedBy(tasks.named('flakyTest'))
tasks.named('check') {
dependsOn(tasks.named('test'))// default behaviour
}
testlogger {
@@ -282,83 +348,25 @@ subprojects {subProj ->
}
}
/**********************************************************************************************************************\
* End-to-End Tests
**********************************************************************************************************************/
def e2eTestsCheck = tasks.register('e2eTestsCheck') {
group = 'verification'
description = "Runs the 'check' task for all e2e-tests modules"
doFirst {
project.ext.set("e2e-tests", true)
}
}
subprojects {
// Add e2e-tests modules check tasks to e2eTestsCheck
if (project.name.startsWith("e2e-tests")) {
test {
onlyIf {
project.hasProperty("e2e-tests")
}
}
}
afterEvaluate {
// Add e2e-tests modules check tasks to e2eTestsCheck
if (project.name.startsWith("e2e-tests")) {
e2eTestsCheck.configure {
finalizedBy(check)
}
}
}
}
/**********************************************************************************************************************\
* Allure Reports
**********************************************************************************************************************/
subprojects {
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
dependencies {
testImplementation platform("io.qameta.allure:allure-bom")
testImplementation "io.qameta.allure:allure-junit5"
}
configurations {
agent {
canBeResolved = true
canBeConsumed = true
}
}
dependencies {
agent "org.aspectj:aspectjweaver:1.9.25.1"
}
test {
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
}
}
}
/**********************************************************************************************************************\
* Jacoco
**********************************************************************************************************************/
subprojects {
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
apply plugin: 'jacoco'
test {
finalizedBy jacocoTestReport
}
jacocoTestReport {
dependsOn test
}
}
}
tasks.named('check') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
finalizedBy jacocoTestReport
}
tasks.register('unitTest') {
// No jacocoTestReport here, because it depends by default on :test,
// and that would make :test being run twice in our CI.
// In practice the report will be generated later in the CI by :check.
}
tasks.register('integrationTest') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
finalizedBy jacocoTestReport
}
tasks.register('flakyTest') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
finalizedBy jacocoTestReport
}
tasks.named('testCodeCoverageReport') {

View File

@@ -24,6 +24,9 @@ dependencies {
// reactor
api "io.projectreactor:reactor-core"
// awaitility
api "org.awaitility:awaitility"
// micronaut
api "io.micronaut.data:micronaut-data-model"
implementation "io.micronaut:micronaut-http-server-netty"

View File

@@ -2,6 +2,7 @@ package io.kestra.core.docs;
import com.fasterxml.classmate.ResolvedType;
import com.fasterxml.classmate.members.HierarchicType;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
@@ -22,6 +23,8 @@ import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.CustomAsset;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
@@ -63,7 +66,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -276,10 +279,10 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
try {
@@ -321,7 +324,7 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -589,11 +592,31 @@ public class JsonSchemaGenerator {
// The `const` property is used by editors for auto-completion based on that schema.
builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> {
final Class<?> pluginType = scope.getType().getErasedType();
if (pluginType.getAnnotation(Plugin.class) != null) {
Plugin pluginAnnotation = pluginType.getAnnotation(Plugin.class);
if (pluginAnnotation != null) {
ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties");
if (properties != null) {
String typeConst = pluginType.getName();
// This is needed so that assets can have arbitrary types while still being able to be identified as assets.
if (pluginType == CustomAsset.class) {
properties.set("type", context.getGeneratorConfig().createObjectNode()
.put("type", "string")
);
return;
}
if (Asset.class.isAssignableFrom(pluginType)) {
// For Asset types, we want to be able to use a simple-string type. Convention is that first alias is that string type.
typeConst = pluginAnnotation.aliases().length > 0 ? pluginAnnotation.aliases()[0] : pluginType.getName();
Arrays.stream(pluginType.getDeclaredMethods())
.filter(m -> m.isAnnotationPresent(JsonProperty.class))
.forEach(m -> properties.set(m.getAnnotation(JsonProperty.class).value(), context.getGeneratorConfig().createObjectNode()
.put("type", "string")
));
}
properties.set("type", context.getGeneratorConfig().createObjectNode()
.put("const", pluginType.getName())
.put("const", typeConst)
);
}
}
@@ -764,6 +787,14 @@ public class JsonSchemaGenerator {
consumer.accept(typeContext.resolve(clz));
}
}).toList();
} else if (declaredType.getErasedType() == Asset.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getAssets().stream())
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.map(typeContext::resolve)
.toList();
}
return null;

View File

@@ -1,7 +1,10 @@
package io.kestra.core.exceptions;
import io.kestra.core.utils.ListUtils;
import java.io.Serial;
import java.util.List;
import java.util.stream.Collectors;
/**
* General exception that can be throws when a Kestra entity field is query, but is not valid or existing.
@@ -9,7 +12,7 @@ import java.util.List;
public class InvalidQueryFiltersException extends KestraRuntimeException {
@Serial
private static final long serialVersionUID = 1L;
private static final String INVALID_QUERY_FILTER_MESSAGE = "Provided query filters are invalid";
private static final String INVALID_QUERY_FILTER_MESSAGE = "Provided query filters are invalid: %s";
private transient final List<String> invalids;
@@ -19,10 +22,14 @@ public class InvalidQueryFiltersException extends KestraRuntimeException {
* @param invalids the invalid filters.
*/
public InvalidQueryFiltersException(final List<String> invalids) {
super(INVALID_QUERY_FILTER_MESSAGE);
super(INVALID_QUERY_FILTER_MESSAGE.formatted(computeInvalids(invalids)));
this.invalids = invalids;
}
private static String computeInvalids(List<String> invalids) {
return String.join("\n- ", ListUtils.emptyOnNull(invalids));
}
/**
* Creates a new {@link InvalidQueryFiltersException} instance.
*

View File

@@ -103,12 +103,48 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
}
},
METADATA("metadata") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
}
},
FLOW_ID("flowId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
}
},
FLOW_REVISION("flowRevision") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
}
},
ID("id") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
ASSET_ID("assetId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
TYPE("type") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
CREATED("created") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
UPDATED("updated") {
@Override
public List<Op> supportedOp() {
@@ -151,12 +187,30 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
TRIGGER_STATE("triggerState"){
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
EXECUTION_ID("executionId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
TASK_ID("taskId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
TASK_RUN_ID("taskRunId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
CHILD_FILTER("childFilter") {
@Override
public List<Op> supportedOp() {
@@ -271,7 +325,7 @@ public record QueryFilter(
@Override
public List<Field> supportedField() {
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID,
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID, Field.TRIGGER_STATE
);
}
},
@@ -306,6 +360,48 @@ public record QueryFilter(
Field.UPDATED
);
}
},
ASSET {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.ID,
Field.TYPE,
Field.NAMESPACE,
Field.METADATA,
Field.UPDATED
);
}
},
ASSET_USAGE {
@Override
public List<Field> supportedField() {
return List.of(
Field.ASSET_ID,
Field.NAMESPACE,
Field.FLOW_ID,
Field.FLOW_REVISION,
Field.EXECUTION_ID,
Field.TASK_ID,
Field.TASK_RUN_ID,
Field.CREATED
);
}
},
ASSET_LINEAGE_EVENT {
@Override
public List<Field> supportedField() {
return List.of(
Field.NAMESPACE,
Field.FLOW_ID,
Field.FLOW_REVISION,
Field.EXECUTION_ID,
Field.TASK_ID,
Field.TASK_RUN_ID,
Field.CREATED
);
}
};
public abstract List<Field> supportedField();

View File

@@ -0,0 +1,111 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.Plugin;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
import jakarta.validation.constraints.Size;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.*;
@Getter
@NoArgsConstructor
public abstract class Asset implements HasUID, DeletedInterface, Plugin {
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
protected String tenantId;
@Pattern(regexp = "^[a-z0-9][a-z0-9._-]*")
@Size(min = 1, max = 150)
protected String namespace;
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
@Size(min = 1, max = 150)
protected String id;
@NotBlank
protected String type;
protected String displayName;
protected String description;
protected Map<String, Object> metadata;
@Nullable
@Hidden
private Instant created;
@Nullable
@Hidden
private Instant updated;
@Hidden
private boolean deleted;
public Asset(
String tenantId,
String namespace,
String id,
String type,
String displayName,
String description,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
this.tenantId = tenantId;
this.namespace = namespace;
this.id = id;
this.type = type;
this.displayName = displayName;
this.description = description;
this.metadata = Optional.ofNullable(metadata).map(HashMap::new).orElse(new HashMap<>());
Instant now = Instant.now();
this.created = Optional.ofNullable(created).orElse(now);
this.updated = Optional.ofNullable(updated).orElse(now);
this.deleted = deleted;
}
public <T extends Asset> T toUpdated() {
if (this.created == null) {
this.created = Instant.now();
}
this.updated = Instant.now();
return (T) this;
}
public Asset toDeleted() {
this.deleted = true;
return this;
}
@JsonAnySetter
public void setMetadata(String name, Object value) {
metadata.put(name, value);
}
@Override
public String uid() {
return Asset.uid(tenantId, id);
}
public static String uid(String tenantId, String id) {
return IdUtils.fromParts(tenantId, id);
}
public Asset withTenantId(String tenantId) {
this.tenantId = tenantId;
return this;
}
}

View File

@@ -0,0 +1,19 @@
package io.kestra.core.models.assets;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
public record AssetIdentifier(@Hidden String tenantId, @Hidden String namespace, String id){
public AssetIdentifier withTenantId(String tenantId) {
return new AssetIdentifier(tenantId, this.namespace, this.id);
}
public String uid() {
return IdUtils.fromParts(tenantId, id);
}
public static AssetIdentifier of(Asset asset) {
return new AssetIdentifier(asset.getTenantId(), asset.getNamespace(), asset.getId());
}
}

View File

@@ -0,0 +1,18 @@
package io.kestra.core.models.assets;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.utils.IdUtils;
/**
* Represents an entity that used an asset
*/
public record AssetUser(String tenantId, String namespace, String flowId, Integer flowRevision, String executionId, String taskId, String taskRunId) implements HasUID {
public String uid() {
return IdUtils.fromParts(tenantId, namespace, flowId, String.valueOf(flowRevision), executionId, taskRunId);
}
public FlowId toFlowId() {
return FlowId.of(tenantId, namespace, flowId, flowRevision);
}
}

View File

@@ -0,0 +1,22 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Optional;
@Getter
public class AssetsDeclaration extends AssetsInOut {
private boolean enableAuto;
@JsonCreator
public AssetsDeclaration(Boolean enableAuto, List<AssetIdentifier> inputs, List<Asset> outputs) {
super(inputs, outputs);
this.enableAuto = Optional.ofNullable(enableAuto).orElse(false);
}
}

View File

@@ -0,0 +1,21 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import lombok.Getter;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@Getter
public class AssetsInOut {
private List<AssetIdentifier> inputs;
private List<Asset> outputs;
@JsonCreator
public AssetsInOut(List<AssetIdentifier> inputs, List<Asset> outputs) {
this.inputs = Optional.ofNullable(inputs).orElse(Collections.emptyList());
this.outputs = Optional.ofNullable(outputs).orElse(Collections.emptyList());
}
}

View File

@@ -0,0 +1,30 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
@NoArgsConstructor
@Plugin
public class CustomAsset extends Asset {
@Builder
@JsonCreator
public CustomAsset(
String tenantId,
String namespace,
String id,
String type,
String displayName,
String description,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, type, displayName, description, metadata, created, updated, deleted);
}
}

View File

@@ -0,0 +1,73 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@NoArgsConstructor
@Plugin(aliases = DatasetAsset.ASSET_TYPE)
public class DatasetAsset extends Asset {
public static final String ASSET_TYPE = "DATASET";
@Builder
@JsonCreator
public DatasetAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
String system,
String location,
String format,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
this.setSystem(system);
this.setLocation(location);
this.setFormat(format);
}
@JsonProperty("system")
public String getSystem() {
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
}
@JsonProperty("location")
public String getLocation() {
return Optional.ofNullable(metadata.get("location")).map(Object::toString).orElse(null);
}
@JsonProperty("format")
public String getFormat() {
return Optional.ofNullable(metadata.get("format")).map(Object::toString).orElse(null);
}
public void setSystem(String system) {
if (system != null) {
metadata.put("system", system);
}
}
public void setLocation(String location) {
if (location != null) {
metadata.put("location", location);
}
}
public void setFormat(String format) {
if (format != null) {
metadata.put("format", format);
}
}
}

View File

@@ -0,0 +1,31 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
@NoArgsConstructor
@Plugin(aliases = ExternalAsset.ASSET_TYPE)
public class ExternalAsset extends Asset {
public static final String ASSET_TYPE = "EXTERNAL";
@Builder
@JsonCreator
public ExternalAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
}
}

View File

@@ -0,0 +1,60 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@NoArgsConstructor
@Plugin(aliases = FileAsset.ASSET_TYPE)
public class FileAsset extends Asset {
public static final String ASSET_TYPE = "FILE";
@Builder
@JsonCreator
public FileAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
String system,
String path,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
this.setSystem(system);
this.setPath(path);
}
@JsonProperty("system")
public String getSystem() {
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
}
@JsonProperty("path")
public String getPath() {
return Optional.ofNullable(metadata.get("path")).map(Object::toString).orElse(null);
}
public void setSystem(String system) {
if (system != null) {
metadata.put("system", system);
}
}
public void setPath(String path) {
if (path != null) {
metadata.put("path", path);
}
}
}

View File

@@ -0,0 +1,86 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@NoArgsConstructor
@Plugin(aliases = TableAsset.ASSET_TYPE)
public class TableAsset extends Asset {
public static final String ASSET_TYPE = "TABLE";
@Builder
@JsonCreator
public TableAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
String system,
String database,
String schema,
String name,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
this.setSystem(system);
this.setDatabase(database);
this.setSchema(schema);
this.setName(name);
}
@JsonProperty("system")
public String getSystem() {
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
}
@JsonProperty("database")
public String getDatabase() {
return Optional.ofNullable(metadata.get("database")).map(Object::toString).orElse(null);
}
@JsonProperty("schema")
public String getSchema() {
return Optional.ofNullable(metadata.get("schema")).map(Object::toString).orElse(null);
}
@JsonProperty("name")
public String getName() {
return Optional.ofNullable(metadata.get("name")).map(Object::toString).orElse(null);
}
public void setSystem(String system) {
if (system != null) {
metadata.put("system", system);
}
}
public void setDatabase(String database) {
if (database != null) {
metadata.put("database", database);
}
}
public void setSchema(String schema) {
if (schema != null) {
metadata.put("schema", schema);
}
}
public void setName(String name) {
if (name != null) {
metadata.put("name", name);
}
}
}

View File

@@ -0,0 +1,73 @@
package io.kestra.core.models.assets;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.Plugin;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@NoArgsConstructor
@Plugin(aliases = VmAsset.ASSET_TYPE)
public class VmAsset extends Asset {
public static final String ASSET_TYPE = "VM";
@Builder
@JsonCreator
public VmAsset(
String tenantId,
String namespace,
String id,
String displayName,
String description,
String provider,
String region,
String state,
Map<String, Object> metadata,
Instant created,
Instant updated,
boolean deleted
) {
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
this.setProvider(provider);
this.setRegion(region);
this.setState(state);
}
@JsonProperty("provider")
public String getProvider() {
return Optional.ofNullable(metadata.get("provider")).map(Object::toString).orElse(null);
}
@JsonProperty("region")
public String getRegion() {
return Optional.ofNullable(metadata.get("region")).map(Object::toString).orElse(null);
}
@JsonProperty("state")
public String getState() {
return Optional.ofNullable(metadata.get("state")).map(Object::toString).orElse(null);
}
public void setProvider(String provider) {
if (provider != null) {
metadata.put("provider", provider);
}
}
public void setRegion(String region) {
if (region != null) {
metadata.put("region", region);
}
}
public void setState(String state) {
if (state != null) {
metadata.put("state", state);
}
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.assets.AssetsInOut;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -57,6 +58,10 @@ public class TaskRun implements TenantInterface {
@Schema(implementation = Object.class)
Variables outputs;
@With
@Nullable
AssetsInOut assets;
@NotNull
State state;
@@ -87,6 +92,7 @@ public class TaskRun implements TenantInterface {
this.value,
this.attempts,
this.outputs,
this.assets,
this.state.withState(state),
this.iteration,
this.dynamic,
@@ -114,6 +120,7 @@ public class TaskRun implements TenantInterface {
this.value,
newAttempts,
this.outputs,
this.assets,
this.state.withState(state),
this.iteration,
this.dynamic,
@@ -137,6 +144,7 @@ public class TaskRun implements TenantInterface {
this.value,
newAttempts,
this.outputs,
this.assets,
this.state.withState(State.Type.FAILED),
this.iteration,
this.dynamic,
@@ -156,6 +164,7 @@ public class TaskRun implements TenantInterface {
.value(this.getValue())
.attempts(this.getAttempts())
.outputs(this.getOutputs())
.assets(this.getAssets())
.state(state == null ? this.getState() : state)
.iteration(this.getIteration())
.build();
@@ -242,6 +251,7 @@ public class TaskRun implements TenantInterface {
", parentTaskRunId=" + this.getParentTaskRunId() +
", state=" + this.getState().getCurrent().toString() +
", outputs=" + this.getOutputs() +
", assets=" + this.getAssets() +
", attempts=" + this.getAttempts() +
")";
}

View File

@@ -5,11 +5,13 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.assets.AssetsDeclaration;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.core.flow.WorkingDirectory;
import jakarta.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Size;
import lombok.Builder;
@@ -78,6 +80,11 @@ abstract public class Task implements TaskInterface {
@Valid
private Cache taskCache;
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
@Valid
@Nullable
private Property<AssetsDeclaration> assets;
public Optional<Task> findById(String id) {
if (this.getId().equals(id)) {
return Optional.of(this);

View File

@@ -1,10 +1,13 @@
package io.kestra.core.models.tasks.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.AssetEmitter;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;
@@ -18,6 +21,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
import static io.kestra.core.utils.Rethrow.throwConsumer;
/**
* Service for matching and capturing structured data from task execution logs.
@@ -76,6 +80,18 @@ public class TaskLogLineMatcher {
}
});
}
if (match.assets() != null) {
try {
AssetEmitter assetEmitter = runContext.assets();
match.assets().forEach(throwConsumer(assetEmitter::upsert));
} catch (IllegalVariableEvaluationException e) {
logger.warn("Unable to get asset emitter for log '{}'", data, e);
} catch (QueueException e) {
logger.warn("Unable to emit asset for log '{}'", data, e);
}
}
return match;
}
@@ -94,8 +110,9 @@ public class TaskLogLineMatcher {
public record TaskLogMatch(
Map<String, Object> outputs,
List<AbstractMetricEntry<?>> metrics,
List<LogLine> logs
) {
List<LogLine> logs,
List<Asset> assets
) {
@Override
public Map<String, Object> outputs() {
return Optional.ofNullable(outputs).orElse(Map.of());

View File

@@ -6,8 +6,10 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.assets.AssetsDeclaration;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
@@ -88,6 +90,9 @@ abstract public class AbstractTrigger implements TriggerInterface {
)
private boolean allowConcurrent = false;
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
private Property<AssetsDeclaration> assets;
/**
* For backward compatibility: we rename minLogLevel to logLevel.
* @deprecated use {@link #logLevel} instead

View File

@@ -1,6 +1,7 @@
package io.kestra.core.plugins;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.assets.Asset;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;

View File

@@ -2,6 +2,7 @@ package io.kestra.core.plugins;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.app.AppPluginInterface;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -11,6 +12,7 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.logs.LogExporter;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.serdes.AssetDeserializer;
import io.kestra.core.plugins.serdes.PluginDeserializer;
import io.kestra.core.secret.SecretPluginInterface;
import io.kestra.core.storages.StorageInterface;
@@ -45,5 +47,6 @@ public class PluginModule extends SimpleModule {
addDeserializer(SecretPluginInterface.class, new PluginDeserializer<>());
addDeserializer(AppPluginInterface.class, new PluginDeserializer<>());
addDeserializer(LogExporter.class, new PluginDeserializer<>());
addDeserializer(Asset.class, new AssetDeserializer());
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
import io.kestra.core.app.AppBlockInterface;
import io.kestra.core.app.AppPluginInterface;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -108,6 +109,7 @@ public class PluginScanner {
List<Class<? extends StorageInterface>> storages = new ArrayList<>();
List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>();
List<Class<? extends TaskRunner<?>>> taskRunners = new ArrayList<>();
List<Class<? extends Asset>> assets = new ArrayList<>();
List<Class<? extends AppPluginInterface>> apps = new ArrayList<>();
List<Class<? extends AppBlockInterface>> appBlocks = new ArrayList<>();
List<Class<? extends Chart<?>>> charts = new ArrayList<>();
@@ -155,6 +157,10 @@ public class PluginScanner {
//noinspection unchecked
taskRunners.add((Class<? extends TaskRunner<?>>) runner.getClass());
}
case Asset asset -> {
log.debug("Loading Asset plugin: '{}'", plugin.getClass());
assets.add(asset.getClass());
}
case AppPluginInterface app -> {
log.debug("Loading App plugin: '{}'", plugin.getClass());
apps.add(app.getClass());
@@ -223,6 +229,7 @@ public class PluginScanner {
.conditions(conditions)
.storages(storages)
.secrets(secrets)
.assets(assets)
.apps(apps)
.appBlocks(appBlocks)
.taskRunners(taskRunners)

View File

@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
import io.kestra.core.app.AppBlockInterface;
import io.kestra.core.app.AppPluginInterface;
import io.kestra.core.models.annotations.PluginSubGroup;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -39,6 +40,7 @@ public class RegisteredPlugin {
public static final String STORAGES_GROUP_NAME = "storages";
public static final String SECRETS_GROUP_NAME = "secrets";
public static final String TASK_RUNNERS_GROUP_NAME = "task-runners";
public static final String ASSETS_GROUP_NAME = "assets";
public static final String APPS_GROUP_NAME = "apps";
public static final String APP_BLOCKS_GROUP_NAME = "app-blocks";
public static final String CHARTS_GROUP_NAME = "charts";
@@ -56,6 +58,7 @@ public class RegisteredPlugin {
private final List<Class<? extends StorageInterface>> storages;
private final List<Class<? extends SecretPluginInterface>> secrets;
private final List<Class<? extends TaskRunner<?>>> taskRunners;
private final List<Class<? extends Asset>> assets;
private final List<Class<? extends AppPluginInterface>> apps;
private final List<Class<? extends AppBlockInterface>> appBlocks;
private final List<Class<? extends Chart<?>>> charts;
@@ -74,6 +77,7 @@ public class RegisteredPlugin {
!storages.isEmpty() ||
!secrets.isEmpty() ||
!taskRunners.isEmpty() ||
!assets.isEmpty() ||
!apps.isEmpty() ||
!appBlocks.isEmpty() ||
!charts.isEmpty() ||
@@ -145,6 +149,10 @@ public class RegisteredPlugin {
return AppPluginInterface.class;
}
if (this.getAssets().stream().anyMatch(r -> r.getName().equals(cls))) {
return Asset.class;
}
if (this.getLogExporters().stream().anyMatch(r -> r.getName().equals(cls))) {
return LogExporter.class;
}
@@ -180,6 +188,7 @@ public class RegisteredPlugin {
result.put(STORAGES_GROUP_NAME, Arrays.asList(this.getStorages().toArray(Class[]::new)));
result.put(SECRETS_GROUP_NAME, Arrays.asList(this.getSecrets().toArray(Class[]::new)));
result.put(TASK_RUNNERS_GROUP_NAME, Arrays.asList(this.getTaskRunners().toArray(Class[]::new)));
result.put(ASSETS_GROUP_NAME, Arrays.asList(this.getAssets().toArray(Class[]::new)));
result.put(APPS_GROUP_NAME, Arrays.asList(this.getApps().toArray(Class[]::new)));
result.put(APP_BLOCKS_GROUP_NAME, Arrays.asList(this.getAppBlocks().toArray(Class[]::new)));
result.put(CHARTS_GROUP_NAME, Arrays.asList(this.getCharts().toArray(Class[]::new)));
@@ -359,6 +368,12 @@ public class RegisteredPlugin {
b.append("] ");
}
if (!this.getAssets().isEmpty()) {
b.append("[Assets: ");
b.append(this.getAssets().stream().map(Class::getName).collect(Collectors.joining(", ")));
b.append("] ");
}
if (!this.getApps().isEmpty()) {
b.append("[Apps: ");
b.append(this.getApps().stream().map(Class::getName).collect(Collectors.joining(", ")));

View File

@@ -0,0 +1,25 @@
package io.kestra.core.plugins.notifications;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.Map;
public interface ExecutionInterface {
@Schema(
title = "The execution id to use",
description = "Default is the current execution, " +
"change it to {{ trigger.executionId }} if you use this task with a Flow trigger to use the original execution."
)
Property<String> getExecutionId();
@Schema(
title = "Custom fields to be added on notification"
)
Property<Map<String, Object>> getCustomFields();
@Schema(
title = "Custom message to be added on notification"
)
Property<String> getCustomMessage();
}

View File

@@ -0,0 +1,140 @@
package io.kestra.core.plugins.notifications;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.retrys.Exponential;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.RetryUtils;
import io.kestra.core.utils.UriProvider;
import java.time.Duration;
import java.util.*;
public final class ExecutionService {
private ExecutionService() {}
public static Execution findExecution(RunContext runContext, Property<String> executionId) throws IllegalVariableEvaluationException, NoSuchElementException {
ExecutionRepositoryInterface executionRepository = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionRepositoryInterface.class);
RetryUtils.Instance<Execution, NoSuchElementException> retryInstance = RetryUtils
.of(Exponential.builder()
.delayFactor(2.0)
.interval(Duration.ofSeconds(1))
.maxInterval(Duration.ofSeconds(15))
.maxAttempts(-1)
.maxDuration(Duration.ofMinutes(10))
.build(),
runContext.logger()
);
var executionRendererId = runContext.render(executionId).as(String.class).orElse(null);
var flowTriggerExecutionState = getOptionalFlowTriggerExecutionState(runContext);
var flowVars = (Map<String, String>) runContext.getVariables().get("flow");
var isCurrentExecution = isCurrentExecution(runContext, executionRendererId);
if (isCurrentExecution) {
runContext.logger().info("Loading execution data for the current execution.");
}
return retryInstance.run(
NoSuchElementException.class,
() -> executionRepository.findById(flowVars.get("tenantId"), executionRendererId)
.filter(foundExecution -> isExecutionInTheWantedState(foundExecution, isCurrentExecution, flowTriggerExecutionState))
.orElseThrow(() -> new NoSuchElementException("Unable to find execution '" + executionRendererId + "'"))
);
}
/**
* ExecutionRepository can be out of sync in ElasticSearch stack, with this filter we try to mitigate that
*
* @param execution the Execution we fetched from ExecutionRepository
* @param isCurrentExecution true if this *Execution Task is configured to send a notification for the current Execution
* @param flowTriggerExecutionState the Execution State that triggered the Flow trigger, if any
* @return true if we think we fetched the right Execution data for our usecase
*/
public static boolean isExecutionInTheWantedState(Execution execution, boolean isCurrentExecution, Optional<String> flowTriggerExecutionState) {
if (isCurrentExecution) {
// we don't wait for current execution to be terminated as it could not be possible as long as this task is running
return true;
}
if (flowTriggerExecutionState.isPresent()) {
// we were triggered by a Flow trigger that can be, for example: PAUSED
if (flowTriggerExecutionState.get().equals(State.Type.RUNNING.toString())) {
// RUNNING special case: we take the first state we got
return true;
} else {
// to handle the case where the ExecutionRepository is out of sync in ElasticSearch stack,
// we try to match an Execution with the same state
return execution.getState().getCurrent().name().equals(flowTriggerExecutionState.get());
}
} else {
return execution.getState().getCurrent().isTerminated();
}
}
public static Map<String, Object> executionMap(RunContext runContext, ExecutionInterface executionInterface) throws IllegalVariableEvaluationException {
Execution execution = findExecution(runContext, executionInterface.getExecutionId());
UriProvider uriProvider = ((DefaultRunContext) runContext).getApplicationContext().getBean(UriProvider.class);
Map<String, Object> templateRenderMap = new HashMap<>();
templateRenderMap.put("duration", execution.getState().humanDuration());
templateRenderMap.put("startDate", execution.getState().getStartDate());
templateRenderMap.put("link", uriProvider.executionUrl(execution));
templateRenderMap.put("execution", JacksonMapper.toMap(execution));
runContext.render(executionInterface.getCustomMessage())
.as(String.class)
.ifPresent(s -> templateRenderMap.put("customMessage", s));
final Map<String, Object> renderedCustomFields = runContext.render(executionInterface.getCustomFields()).asMap(String.class, Object.class);
if (!renderedCustomFields.isEmpty()) {
templateRenderMap.put("customFields", renderedCustomFields);
}
var isCurrentExecution = isCurrentExecution(runContext, execution.getId());
List<TaskRun> taskRuns;
if (isCurrentExecution) {
taskRuns = execution.getTaskRunList();
} else {
taskRuns = execution.getTaskRunList().stream()
.filter(t -> (execution.hasFailed() ? State.Type.FAILED : State.Type.SUCCESS).equals(t.getState().getCurrent()))
.toList();
}
if (!ListUtils.isEmpty(taskRuns)) {
TaskRun lastTaskRun = taskRuns.getLast();
templateRenderMap.put("firstFailed", State.Type.FAILED.equals(lastTaskRun.getState().getCurrent()) ? lastTaskRun : false);
templateRenderMap.put("lastTask", lastTaskRun);
}
return templateRenderMap;
}
/**
* if there is a state, we assume this is a Flow trigger with type: {@link io.kestra.plugin.core.trigger.Flow.Output}
*
* @return the state of the execution that triggered the Flow trigger, or empty if another usecase/trigger
*/
private static Optional<String> getOptionalFlowTriggerExecutionState(RunContext runContext) {
var triggerVar = Optional.ofNullable(
runContext.getVariables().get("trigger")
);
return triggerVar.map(trigger -> ((Map<String, String>) trigger).get("state"));
}
private static boolean isCurrentExecution(RunContext runContext, String executionId) {
var executionVars = (Map<String, String>) runContext.getVariables().get("execution");
return executionId.equals(executionVars.get("id"));
}
}

View File

@@ -0,0 +1,16 @@
package io.kestra.core.plugins.serdes;
import com.fasterxml.jackson.databind.JsonDeserializer;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.CustomAsset;
/**
* Specific {@link JsonDeserializer} for deserializing {@link Asset}.
*/
public final class AssetDeserializer extends PluginDeserializer<Asset> {
@Override
protected Class<? extends Plugin> fallbackClass() {
return CustomAsset.class;
}
}

View File

@@ -12,6 +12,7 @@ import io.kestra.core.models.dashboards.charts.DataChart;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.serializers.JacksonMapper;
import io.micronaut.context.exceptions.NoSuchBeanException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,7 +29,7 @@ import java.util.Optional;
* The {@link PluginDeserializer} uses the {@link PluginRegistry} to found the plugin class corresponding to
* a plugin type.
*/
public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
public class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
private static final Logger log = LoggerFactory.getLogger(PluginDeserializer.class);
@@ -72,7 +73,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
// By default, if no plugin-registry is configured retrieve
// the one configured from the static Kestra's context.
pluginRegistry = KestraContext.getContext().getPluginRegistry();
} catch (IllegalStateException ignore) {
} catch (IllegalStateException | NoSuchBeanException ignore) {
// This error can only happen if the KestraContext is not initialized (i.e. in unit tests).
log.error("No plugin registry was initialized. Use default implementation.");
pluginRegistry = DefaultPluginRegistry.getOrCreate();
@@ -92,6 +93,10 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
identifier
);
pluginType = pluginRegistry.findClassByIdentifier(identifier);
if (pluginType == null) {
pluginType = fallbackClass();
}
}
if (pluginType == null) {
@@ -152,4 +157,8 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
return isVersioningSupported && version != null && !version.isEmpty() ? type + ":" + version : type;
}
protected Class<? extends Plugin> fallbackClass() {
return null;
}
}

View File

@@ -0,0 +1,12 @@
package io.kestra.core.runners;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.queues.QueueException;
import java.util.List;
public interface AssetEmitter {
void upsert(Asset asset) throws QueueException;
List<Asset> outputs();
}

View File

@@ -6,11 +6,13 @@ import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.assets.AssetsDeclaration;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.services.AssetManagerFactory;
import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.Storage;
@@ -54,6 +56,7 @@ public class DefaultRunContext extends RunContext {
private MetricRegistry meterRegistry;
private VersionProvider version;
private KVStoreService kvStoreService;
private AssetManagerFactory assetManagerFactory;
private Optional<String> secretKey;
private WorkingDir workingDir;
private Validator validator;
@@ -73,6 +76,8 @@ public class DefaultRunContext extends RunContext {
private Task task;
private AbstractTrigger trigger;
private volatile AssetEmitter assetEmitter;
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
@@ -161,6 +166,7 @@ public class DefaultRunContext extends RunContext {
this.secretKey = applicationContext.getProperty("kestra.encryption.secret-key", String.class);
this.validator = applicationContext.getBean(Validator.class);
this.localPath = applicationContext.getBean(LocalPathFactory.class).createLocalPath(this);
this.assetManagerFactory = applicationContext.getBean(AssetManagerFactory.class);
}
}
@@ -537,6 +543,23 @@ public class DefaultRunContext extends RunContext {
return flow != null ? flow.get("tenantId") : null;
}
/**
* {@inheritDoc}
*/
@Override
public TaskRunInfo taskRunInfo() {
Optional<Map<String, Object>> maybeTaskRunMap = Optional.ofNullable(this.getVariables().get("taskrun"))
.map(Map.class::cast);
return new TaskRunInfo(
(String) this.getVariables().get("executionId"),
(String) this.getVariables().get("taskId"),
maybeTaskRunMap.map(m -> (String) m.get("id"))
.orElse(null),
maybeTaskRunMap.map(m -> (String) m.get("value"))
.orElse(null)
);
}
/**
* {@inheritDoc}
*/
@@ -545,12 +568,7 @@ public class DefaultRunContext extends RunContext {
public FlowInfo flowInfo() {
Map<String, Object> flow = (Map<String, Object>) this.getVariables().get("flow");
// normally only tests should not have the flow variable
return flow == null ? new FlowInfo(null, null, null, null) : new FlowInfo(
(String) flow.get("tenantId"),
(String) flow.get("namespace"),
(String) flow.get("id"),
(Integer) flow.get("revision")
);
return flow == null ? new FlowInfo(null, null, null, null) : FlowInfo.from(flow);
}
/**
@@ -594,6 +612,25 @@ public class DefaultRunContext extends RunContext {
return new AclCheckerImpl(this.applicationContext, flowInfo());
}
@Override
public AssetEmitter assets() throws IllegalVariableEvaluationException {
if (this.assetEmitter == null) {
synchronized (this) {
if (this.assetEmitter == null) {
this.assetEmitter = assetManagerFactory.of(
Optional.ofNullable(task).map(Task::getAssets)
.or(() -> Optional.ofNullable(trigger).map(AbstractTrigger::getAssets))
.flatMap(throwFunction(asset -> this.render(asset).as(AssetsDeclaration.class)))
.map(AssetsDeclaration::isEnableAuto)
.orElse(false)
);
}
}
}
return this.assetEmitter;
}
@Override
public LocalPath localPath() {
return localPath;

View File

@@ -143,6 +143,8 @@ public abstract class RunContext implements PropertyContext {
@Deprecated(forRemoval = true)
public abstract String tenantId();
public abstract TaskRunInfo taskRunInfo();
public abstract FlowInfo flowInfo();
/**
@@ -190,7 +192,19 @@ public abstract class RunContext implements PropertyContext {
*/
public abstract LocalPath localPath();
public record TaskRunInfo(String executionId, String taskId, String taskRunId, Object value) {
}
public record FlowInfo(String tenantId, String namespace, String id, Integer revision) {
public static FlowInfo from(Map<String, Object> flowInfoMap) {
return new FlowInfo(
(String) flowInfoMap.get("tenantId"),
(String) flowInfoMap.get("namespace"),
(String) flowInfoMap.get("id"),
(Integer) flowInfoMap.get("revision")
);
}
}
/**
@@ -206,6 +220,11 @@ public abstract class RunContext implements PropertyContext {
*/
public abstract AclChecker acl();
/**
* Get access to the Assets handler.
*/
public abstract AssetEmitter assets() throws IllegalVariableEvaluationException;
/**
* Clone this run context for a specific plugin.
* @return a new run context with the plugin configuration of the given plugin.

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.executions.TaskRun;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -10,6 +11,7 @@ import java.util.ArrayList;
import java.util.List;
import jakarta.validation.constraints.NotNull;
import lombok.With;
@Value
@AllArgsConstructor
@@ -21,8 +23,7 @@ public class WorkerTaskResult implements HasUID {
List<TaskRun> dynamicTaskRuns;
public WorkerTaskResult(TaskRun taskRun) {
this.taskRun = taskRun;
this.dynamicTaskRuns = new ArrayList<>();
this(taskRun, new ArrayList<>());
}
/**

View File

@@ -0,0 +1,25 @@
package io.kestra.core.services;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.runners.AssetEmitter;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
@Singleton
public class AssetManagerFactory {
public AssetEmitter of(boolean enabled) {
return new AssetEmitter() {
@Override
public void upsert(Asset asset) {
throw new UnsupportedOperationException();
}
@Override
public List<Asset> outputs() {
return new ArrayList<>();
}
};
}
}

View File

@@ -0,0 +1,32 @@
package io.kestra.core.services;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.AssetIdentifier;
import io.kestra.core.models.assets.AssetUser;
import io.kestra.core.queues.QueueException;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Collections;
import java.util.List;
@Singleton
public class AssetService implements Runnable {
@PostConstruct
public void start() {
this.run();
}
public void asyncUpsert(AssetUser assetUser, Asset asset) throws QueueException {
// No-op
}
public void assetLineage(AssetUser assetUser, List<AssetIdentifier> inputs, List<AssetIdentifier> outputs) throws QueueException {
// No-op
}
public void run() {
// No-op
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.test.flow;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import jakarta.validation.constraints.NotNull;
@@ -8,6 +9,7 @@ import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
@Getter
@@ -25,5 +27,7 @@ public class TaskFixture {
private Map<String, Object> outputs;
private List<Asset> assets;
private Property<String> description;
}

View File

@@ -19,15 +19,7 @@ import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -155,6 +147,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
.map(task -> task.getId())
.collect(Collectors.toList());
violations.addAll(assetsViolations(allTasks));
if (!invalidTasks.isEmpty()) {
violations.add("Invalid output reference: use outputs[key-name] instead of outputs.key-name — keys with dashes require bracket notation, offending tasks:" +
" [" + String.join(", ", invalidTasks) + "]");
@@ -181,6 +175,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
}
}
protected List<String> assetsViolations(List<Task> allTasks) {
return allTasks.stream().filter(task -> task.getAssets() != null)
.map(taskWithAssets -> "Task '" + taskWithAssets.getId() + "' can't have any `assets` because assets are only available in Enterprise Edition.")
.toList();
}
private static boolean checkObjectFieldsWithPatterns(Object object, List<Pattern> patterns) {
if (object == null) {
return true;

View File

@@ -0,0 +1,268 @@
package io.kestra.assets.assets;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.assets.*;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest
public class AssetTest {
@Test
void custom() throws JsonProcessingException {
String namespace = TestsUtils.randomNamespace();
String id = TestsUtils.randomString();
String type = "MY_OWN_ASSET_TYPE";
String displayName = "My own asset";
String description = "This is my asset";
String metadataKey = "owner";
String metadataValue = "data-team";
Asset asset = JacksonMapper.ofYaml().readValue("""
namespace: %s
id: %s
type: %s
displayName: %s
description: %s
metadata:
%s: %s""".formatted(
namespace,
id,
type,
displayName,
description,
metadataKey,
metadataValue
), Asset.class);
assertThat(asset).isInstanceOf(CustomAsset.class);
assertThat(asset.getNamespace()).isEqualTo(namespace);
assertThat(asset.getId()).isEqualTo(id);
assertThat(asset.getType()).isEqualTo(type);
assertThat(asset.getDisplayName()).isEqualTo(displayName);
assertThat(asset.getDescription()).isEqualTo(description);
assertThat(asset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
}
@Test
void external() throws JsonProcessingException {
String namespace = TestsUtils.randomNamespace();
String id = TestsUtils.randomString();
String type = "EXTERNAL";
String displayName = "External asset";
String description = "This is an external asset";
String metadataKey = "owner";
String metadataValue = "external-team";
Asset asset = JacksonMapper.ofYaml().readValue("""
namespace: %s
id: %s
type: %s
displayName: %s
description: %s
metadata:
%s: %s""".formatted(
namespace,
id,
type,
displayName,
description,
metadataKey,
metadataValue
), Asset.class);
assertThat(asset).isInstanceOf(ExternalAsset.class);
assertThat(asset.getNamespace()).isEqualTo(namespace);
assertThat(asset.getId()).isEqualTo(id);
assertThat(asset.getType()).isEqualTo(type);
assertThat(asset.getDisplayName()).isEqualTo(displayName);
assertThat(asset.getDescription()).isEqualTo(description);
assertThat(asset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
}
@Test
void dataset() throws JsonProcessingException {
String namespace = TestsUtils.randomNamespace();
String id = TestsUtils.randomString();
String displayName = "My Dataset";
String description = "This is my dataset";
String system = "S3";
String location = "s3://my-bucket/my-dataset";
String format = "parquet";
String metadataKey = "owner";
String metadataValue = "data-team";
Asset asset = JacksonMapper.ofYaml().readValue("""
namespace: %s
id: %s
type: %s
displayName: %s
description: %s
system: %s
location: %s
format: %s
metadata:
%s: %s""".formatted(
namespace,
id,
DatasetAsset.ASSET_TYPE,
displayName,
description,
system,
location,
format,
metadataKey,
metadataValue
), Asset.class);
assertThat(asset).isInstanceOf(DatasetAsset.class);
DatasetAsset datasetAsset = (DatasetAsset) asset;
assertThat(datasetAsset.getNamespace()).isEqualTo(namespace);
assertThat(datasetAsset.getId()).isEqualTo(id);
assertThat(datasetAsset.getDisplayName()).isEqualTo(displayName);
assertThat(datasetAsset.getDescription()).isEqualTo(description);
assertThat(datasetAsset.getSystem()).isEqualTo(system);
assertThat(datasetAsset.getLocation()).isEqualTo(location);
assertThat(datasetAsset.getFormat()).isEqualTo(format);
assertThat(datasetAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
}
@Test
void file() throws JsonProcessingException {
String namespace = TestsUtils.randomNamespace();
String id = TestsUtils.randomString();
String displayName = "My File";
String description = "This is my file";
String system = "local";
String path = "/data/my-file.txt";
String metadataKey = "owner";
String metadataValue = "file-team";
Asset asset = JacksonMapper.ofYaml().readValue("""
namespace: %s
id: %s
type: %s
displayName: %s
description: %s
system: %s
path: %s
metadata:
%s: %s""".formatted(
namespace,
id,
FileAsset.ASSET_TYPE,
displayName,
description,
system,
path,
metadataKey,
metadataValue
), Asset.class);
assertThat(asset).isInstanceOf(FileAsset.class);
FileAsset fileAsset = (FileAsset) asset;
assertThat(fileAsset.getNamespace()).isEqualTo(namespace);
assertThat(fileAsset.getId()).isEqualTo(id);
assertThat(fileAsset.getDisplayName()).isEqualTo(displayName);
assertThat(fileAsset.getDescription()).isEqualTo(description);
assertThat(fileAsset.getSystem()).isEqualTo(system);
assertThat(fileAsset.getPath()).isEqualTo(path);
assertThat(fileAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
}
@Test
void table() throws JsonProcessingException {
String namespace = TestsUtils.randomNamespace();
String id = TestsUtils.randomString();
String displayName = "My Table";
String description = "This is my table";
String system = "postgres";
String database = "mydb";
String schema = "my_schema";
String name = "mytable";
String metadataKey = "owner";
String metadataValue = "table-team";
Asset asset = JacksonMapper.ofYaml().readValue("""
namespace: %s
id: %s
type: %s
displayName: %s
description: %s
system: %s
database: %s
schema: %s
name: %s
metadata:
%s: %s""".formatted(
namespace,
id,
TableAsset.ASSET_TYPE,
displayName,
description,
system,
database,
schema,
name,
metadataKey,
metadataValue
), Asset.class);
assertThat(asset).isInstanceOf(TableAsset.class);
TableAsset tableAsset = (TableAsset) asset;
assertThat(tableAsset.getNamespace()).isEqualTo(namespace);
assertThat(tableAsset.getId()).isEqualTo(id);
assertThat(tableAsset.getDisplayName()).isEqualTo(displayName);
assertThat(tableAsset.getDescription()).isEqualTo(description);
assertThat(tableAsset.getSystem()).isEqualTo(system);
assertThat(tableAsset.getDatabase()).isEqualTo(database);
assertThat(tableAsset.getSchema()).isEqualTo(schema);
assertThat(tableAsset.getName()).isEqualTo(name);
assertThat(tableAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
}
@Test
void vm() throws JsonProcessingException {
String namespace = TestsUtils.randomNamespace();
String id = TestsUtils.randomString();
String displayName = "My VM";
String description = "This is my vm";
String provider = "aws";
String region = "us-east-1";
String state = "running";
String metadataKey = "owner";
String metadataValue = "vm-team";
Asset asset = JacksonMapper.ofYaml().readValue("""
namespace: %s
id: %s
type: %s
displayName: %s
description: %s
provider: %s
region: %s
state: %s
metadata:
%s: %s""".formatted(
namespace,
id,
VmAsset.ASSET_TYPE,
displayName,
description,
provider,
region,
state,
metadataKey,
metadataValue
), Asset.class);
assertThat(asset).isInstanceOf(VmAsset.class);
VmAsset vmAsset = (VmAsset) asset;
assertThat(vmAsset.getNamespace()).isEqualTo(namespace);
assertThat(vmAsset.getId()).isEqualTo(id);
assertThat(vmAsset.getDisplayName()).isEqualTo(displayName);
assertThat(vmAsset.getDescription()).isEqualTo(description);
assertThat(vmAsset.getProvider()).isEqualTo(provider);
assertThat(vmAsset.getRegion()).isEqualTo(region);
assertThat(vmAsset.getState()).isEqualTo(state);
assertThat(vmAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
}
}

View File

@@ -95,4 +95,4 @@ class PluginDeserializerTest {
public record TestPlugin(String type) implements Plugin {
}
}
}

View File

@@ -81,6 +81,9 @@ public abstract class AbstractRunnerTest {
@Inject
private AfterExecutionTestCase afterExecutionTestCase;
@Inject
private AssetTestCase assetTestCase;
@Test
@ExecuteFlow("flows/valids/full.yaml")
void full(Execution execution) {
@@ -558,4 +561,10 @@ public abstract class AbstractRunnerTest {
public void shouldCallTasksAfterListener(Execution execution) {
afterExecutionTestCase.shouldCallTasksAfterListener(execution);
}
}
@Test
@LoadFlows(value = "flows/valids/assets.yaml", tenantId = "abstract-runner-test-assets")
public void assets() throws QueueException, TimeoutException {
assetTestCase.staticAndDynamicAssets("abstract-runner-test-assets");
}
}

View File

@@ -0,0 +1,220 @@
package io.kestra.core.runners;
import io.kestra.core.models.assets.*;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.queues.QueueException;
import io.kestra.core.services.AssetManagerFactory;
import io.kestra.core.services.AssetService;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Replaces;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
@Singleton
public class AssetTestCase {
@Inject
private AssetService mockedAssetService;
@Inject
private TestRunnerUtils testRunnerUtils;
private static final List<Asset> capturedAsyncCreate = new CopyOnWriteArrayList<>();
private static final List<Pair<AssetUser, Pair<List<AssetIdentifier>, List<AssetIdentifier>>>> capturedAssetLineage = new CopyOnWriteArrayList<>();
private static final List<Asset> capturedEnabledDynamicAssets = new CopyOnWriteArrayList<>();
private static final List<Asset> capturedDisabledDynamicAssets = new CopyOnWriteArrayList<>();
public void staticAndDynamicAssets(String tenantId) throws QueueException, TimeoutException {
Execution execution = testRunnerUtils.runOne(tenantId, "io.kestra.tests", "assets");
Mockito.verify(mockedAssetService, Mockito.times(1)).run();
// region assets-in-taskruns
List<TaskRun> taskRuns = execution.getTaskRunList().stream().toList();
assertThat(taskRuns).map(TaskRun::getAssets).map(AssetsInOut::getInputs).satisfiesExactlyInAnyOrder(
assets -> assertThat(assets).isEmpty(),
assets -> assertThat(assets).isEmpty(),
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-non-existing-input-asset-uid")
),
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-existing-input-uid")
)
);
assertThat(taskRuns).map(TaskRun::getAssets).map(AssetsInOut::getOutputs).satisfiesExactlyInAnyOrder(
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
AssetTestCase::assertEnabledDynamicAsset,
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-emit-asset")
),
assets -> assertThat(assets).isEmpty(),
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-non-existing-output-uid")
),
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-existing-output-uid")
)
);
// endregion
// region dynamic-assets
assertThat(capturedEnabledDynamicAssets).anySatisfy(AssetTestCase::assertEnabledDynamicAsset);
assertThat(capturedEnabledDynamicAssets).noneMatch(asset -> asset.getId().equals("assets-flow-emit-asset-auto-false-uid"));
assertThat(capturedDisabledDynamicAssets).anySatisfy(AssetTestCase::assertDisabledDynamicAsset);
assertThat(capturedDisabledDynamicAssets).noneMatch(asset -> asset.getId().equals("assets-flow-emit-asset-uid"));
// endregion
// region asset-creation
assertThat(capturedAsyncCreate).satisfiesExactlyInAnyOrder(
AssetTestCase::assertEnabledDynamicAsset,
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-emit-asset"),
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-non-existing-output-uid"),
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-existing-output-uid")
);
assertThat(capturedAsyncCreate).noneMatch(asset -> asset.getId().equals("assets-flow-emit-asset-auto-false-uid"));
// endregion
// region asset-lineage
assertThat(capturedAssetLineage).satisfiesExactlyInAnyOrder(
assetLineage -> {
AssetUser assetUser = assetLineage.getLeft();
assertAssetExecution(tenantId, assetUser, execution);
assertThat(assetUser.taskRunId()).isEqualTo(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getTaskId().equals("emit-asset"))
.findFirst().map(TaskRun::getId).orElseThrow());
// No input assets
assertThat(assetLineage.getRight().getLeft()).isEmpty();
assertThat(assetLineage.getRight().getRight()).satisfiesExactlyInAnyOrder(
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-emit-asset-uid"),
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-emit-asset")
);
},
// No lineage for the second taskrun due to `enableAuto: false`, below is for the third one
assetLineage -> {
AssetUser assetUser = assetLineage.getLeft();
assertAssetExecution(tenantId, assetUser, execution);
assertThat(assetUser.taskRunId()).isEqualTo(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getTaskId().equals("static-asset-non-existing-input"))
.findFirst().map(TaskRun::getId).orElseThrow());
assertThat(assetLineage.getRight().getLeft()).satisfiesExactlyInAnyOrder(
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-non-existing-input-asset-uid")
);
assertThat(assetLineage.getRight().getRight()).satisfiesExactlyInAnyOrder(
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-non-existing-output-uid")
);
},
assetLineage -> {
AssetUser assetUser = assetLineage.getLeft();
assertAssetExecution(tenantId, assetUser, execution);
assertThat(assetUser.taskRunId()).isEqualTo(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getTaskId().equals("static-asset-existing-input"))
.findFirst().map(TaskRun::getId).orElseThrow());
assertThat(assetLineage.getRight().getLeft()).satisfiesExactlyInAnyOrder(
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-existing-input-uid")
);
assertThat(assetLineage.getRight().getRight()).satisfiesExactlyInAnyOrder(
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-existing-output-uid")
);
}
);
// endregion
}
private static void assertAssetExecution(String tenantId, AssetUser assetUser, Execution execution) {
assertThat(assetUser.tenantId()).isEqualTo(tenantId);
assertThat(assetUser.namespace()).isEqualTo("io.kestra.tests");
assertThat(assetUser.flowId()).isEqualTo("assets");
assertThat(assetUser.flowRevision()).isEqualTo(execution.getFlowRevision());
assertThat(assetUser.executionId()).isEqualTo(execution.getId());
}
private static void assertEnabledDynamicAsset(Asset asset) {
assertThat(asset).isInstanceOf(TableAsset.class);
TableAsset tableAsset = (TableAsset) asset;
assertThat(tableAsset.getId()).isEqualTo("assets-flow-emit-asset-uid");
assertThat(tableAsset.getType()).isEqualTo(TableAsset.ASSET_TYPE);
assertThat(tableAsset.getDisplayName()).isEqualTo("My Table Asset");
assertThat(tableAsset.getDescription()).isEqualTo("This is my table asset");
assertThat(tableAsset.getSystem()).isEqualTo("MY_DB_SYSTEM");
assertThat(tableAsset.getDatabase()).isEqualTo("my_database");
assertThat(tableAsset.getSchema()).isEqualTo("my_schema");
assertThat(tableAsset.getName()).isEqualTo("my_table");
assertThat(tableAsset.getMetadata().get("owner")).isEqualTo("data-team");
}
private static void assertDisabledDynamicAsset(Asset asset) {
assertThat(asset).isInstanceOf(TableAsset.class);
TableAsset tableAsset = (TableAsset) asset;
assertThat(tableAsset.getId()).isEqualTo("assets-flow-emit-asset-auto-false-uid");
assertThat(tableAsset.getType()).isEqualTo(TableAsset.ASSET_TYPE);
assertThat(tableAsset.getDisplayName()).isEqualTo("My Table Asset");
assertThat(tableAsset.getDescription()).isEqualTo("This is my table asset");
assertThat(tableAsset.getSystem()).isEqualTo("MY_DB_SYSTEM");
assertThat(tableAsset.getDatabase()).isEqualTo("my_database");
assertThat(tableAsset.getSchema()).isEqualTo("my_schema");
assertThat(tableAsset.getName()).isEqualTo("my_table");
assertThat(tableAsset.getMetadata().get("owner")).isEqualTo("data-team");
}
@Factory
static class MockFactory {
@Singleton
@Replaces(AssetService.class)
public AssetService mockedAssetService() {
return Mockito.spy(new AssetService() {
@Override
public void asyncUpsert(AssetUser assetUser, Asset asset) {
capturedAsyncCreate.add(asset);
}
@Override
public void assetLineage(AssetUser assetUser, List<AssetIdentifier> inputs, List<AssetIdentifier> outputs) {
capturedAssetLineage.add(Pair.of(assetUser, Pair.of(inputs, outputs)));
}
});
}
@Singleton
@Replaces(AssetManagerFactory.class)
public AssetManagerFactory mockedAssetManagerFactory() {
return Mockito.spy(new AssetManagerFactory() {
@Override
public AssetEmitter of(boolean enabled) {
if (!enabled) {
return new AssetEmitter() {
@Override
public void upsert(Asset asset) {
capturedDisabledDynamicAssets.add(asset);
}
@Override
public List<Asset> outputs() {
return new ArrayList<>();
}
};
}
return new AssetEmitter() {
private final List<Asset> localCapturedAssets = new CopyOnWriteArrayList<>();
@Override
public void upsert(Asset asset) {
localCapturedAssets.add(asset);
capturedEnabledDynamicAssets.add(asset);
}
@Override
public List<Asset> outputs() {
return localCapturedAssets;
}
};
}
});
}
}
}

View File

@@ -0,0 +1,30 @@
package io.kestra.core.runners.test;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.TableAsset;
import io.kestra.core.models.tasks.*;
import io.kestra.core.runners.RunContext;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin
public class AssetEmitter extends Task implements RunnableTask<VoidOutput> {
@NotNull
@PluginProperty
private Asset assetToEmit;
@Override
public VoidOutput run(RunContext runContext) throws Exception {
runContext.assets().upsert(assetToEmit);
return null;
}
}

View File

@@ -1,5 +1,7 @@
package io.kestra.core.validations;
import io.kestra.core.models.assets.AssetIdentifier;
import io.kestra.core.models.assets.AssetsDeclaration;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.validations.ModelValidator;
@@ -7,7 +9,9 @@ import io.kestra.core.serializers.YamlParser;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.plugin.core.log.Log;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolation;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.services.FlowService;
@@ -229,6 +233,31 @@ class FlowValidationTest {
assertThat(validate.get().getMessage()).contains("Duplicate preconditions with id [flows]");
}
@Test
void eeAllowsDefiningAssets() {
Flow flow = Flow.builder()
.id(TestsUtils.randomString())
.namespace(TestsUtils.randomNamespace())
.tasks(List.of(
Log.builder()
.id("log")
.type(Log.class.getName())
.message("any")
.assets(io.kestra.core.models.property.Property.ofValue(
new AssetsDeclaration(true, List.of(new AssetIdentifier(null, null, "anyId")), null))
)
.build()
))
.build();
Optional<ConstraintViolationException> violations = modelValidator.isValid(flow);
assertThat(violations.isPresent()).isEqualTo(true);
assertThat(violations.get().getConstraintViolations().stream().map(ConstraintViolation::getMessage)).satisfiesExactly(
message -> assertThat(message).contains("Task 'log' can't have any `assets` because assets are only available in Enterprise Edition.")
);
};
private Flow parse(String path) {
URL resource = TestsUtils.class.getClassLoader().getResource(path);
assert resource != null;
@@ -237,4 +266,4 @@ class FlowValidationTest {
return YamlParser.parse(file, Flow.class);
}
}
}

View File

@@ -0,0 +1,83 @@
id: assets
namespace: io.kestra.tests
tasks:
- id: emit-asset
type: io.kestra.core.runners.test.AssetEmitter
assetToEmit:
namespace: io.kestra.tests
id: assets-flow-emit-asset-uid
type: TABLE
displayName: My Table Asset
description: This is my table asset
system: MY_DB_SYSTEM
database: my_database
schema: my_schema
name: my_table
metadata:
owner: data-team
assets:
outputs:
- id: assets-flow-static-emit-asset
type: TABLE
displayName: My Static Table Asset
description: This is my static table asset
system: MY_DB_SYSTEM
database: my_database
schema: my_schema
name: my_static_table
metadata:
owner: data-team
- id: emit-asset-auto-false
type: io.kestra.core.runners.test.AssetEmitter
assets:
enableAuto: false
assetToEmit:
namespace: io.kestra.tests
id: assets-flow-emit-asset-auto-false-uid
type: TABLE
displayName: My Table Asset
description: This is my table asset
system: MY_DB_SYSTEM
database: my_database
schema: my_schema
name: my_table
metadata:
owner: data-team
# Expected to create an 'EXTERNAL' asset automatically as it doesn't exist
- id: static-asset-non-existing-input
type: io.kestra.plugin.core.debug.Return
format: "whatever"
assets:
inputs:
- id: assets-flow-static-asset-non-existing-input-asset-uid
outputs:
- namespace: io.kestra.tests
id: assets-flow-static-asset-non-existing-output-uid
type: TABLE
displayName: My Static Table Asset
description: This is my static table asset
system: MY_DB_SYSTEM
database: my_database
schema: my_schema
name: my_static_table
metadata:
owner: data-team
- id: static-asset-existing-input
type: io.kestra.plugin.core.debug.Return
format: "whatever"
assets:
inputs:
- id: assets-flow-static-asset-existing-input-uid
outputs:
- namespace: io.kestra.tests
id: assets-flow-static-asset-existing-output-uid
type: TABLE
displayName: My Static Table Asset
description: This is my static table asset
system: MY_DB_SYSTEM
database: my_database
schema: my_schema
name: my_static_table
metadata:
owner: data-team

View File

@@ -4,6 +4,10 @@ import io.kestra.core.debug.Breakpoint;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.assets.AssetIdentifier;
import io.kestra.core.models.assets.AssetUser;
import io.kestra.core.models.assets.AssetsDeclaration;
import io.kestra.core.models.assets.AssetsInOut;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
@@ -96,6 +100,12 @@ public class ExecutorService {
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
@Inject
private AssetService assetService;
@Inject
private RunContextInitializer runContextInitializer;
protected FlowMetaStoreInterface flowExecutorInterface() {
// bean is injected late, so we need to wait
if (this.flowExecutorInterface == null) {
@@ -896,21 +906,35 @@ public class ExecutorService {
boolean hasMockedWorkerTask = false;
record FixtureAndTaskRun(TaskFixture fixture, TaskRun taskRun) {}
if (executor.getExecution().getFixtures() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
RunContext runContext = runContextInitializer.forExecutor((DefaultRunContext) runContextFactory.of(
executor.getFlow(),
executor.getExecution()
));
List<WorkerTaskResult> workerTaskResults = executor.getExecution()
.getTaskRunList()
.stream()
.filter(taskRun -> taskRun.getState().getCurrent().isCreated())
.flatMap(taskRun -> executor.getExecution().getFixtureForTaskRun(taskRun).stream().map(fixture -> new FixtureAndTaskRun(fixture, taskRun)))
.map(throwFunction(fixtureAndTaskRun -> WorkerTaskResult.builder()
.taskRun(fixtureAndTaskRun.taskRun()
.withState(Optional.ofNullable(fixtureAndTaskRun.fixture().getState()).orElse(State.Type.SUCCESS))
.withOutputs(
variablesService.of(StorageContext.forTask(fixtureAndTaskRun.taskRun),
fixtureAndTaskRun.fixture().getOutputs() == null ? null : runContext.render(fixtureAndTaskRun.fixture().getOutputs()))
)
)
.build()
.map(throwFunction(fixtureAndTaskRun -> {
Optional<AssetsDeclaration> renderedAssetsDeclaration = runContext.render(executor.getFlow().findTaskByTaskId(fixtureAndTaskRun.taskRun.getTaskId()).getAssets()).as(AssetsDeclaration.class);
return WorkerTaskResult.builder()
.taskRun(fixtureAndTaskRun.taskRun()
.withState(Optional.ofNullable(fixtureAndTaskRun.fixture().getState()).orElse(State.Type.SUCCESS))
.withOutputs(
variablesService.of(StorageContext.forTask(fixtureAndTaskRun.taskRun),
fixtureAndTaskRun.fixture().getOutputs() == null ? null : runContext.render(fixtureAndTaskRun.fixture().getOutputs()))
)
.withAssets(new AssetsInOut(
renderedAssetsDeclaration.map(AssetsDeclaration::getInputs).orElse(Collections.emptyList()).stream()
.map(assetIdentifier -> assetIdentifier.withTenantId(executor.getFlow().getTenantId()))
.toList(),
fixtureAndTaskRun.fixture().getAssets() == null ? null : fixtureAndTaskRun.fixture().getAssets().stream()
.map(asset -> asset.withTenantId(executor.getFlow().getTenantId()))
.toList()
))
)
.build();
}
))
.toList();
@@ -1172,6 +1196,47 @@ public class ExecutorService {
metricRegistry.tags(workerTaskResult)
)
.record(taskRun.getState().getDurationOrComputeIt());
if (
!taskRun.getState().isFailed()
&& taskRun.getAssets() != null &&
(!taskRun.getAssets().getInputs().isEmpty() || !taskRun.getAssets().getOutputs().isEmpty())
) {
AssetUser assetUser = new AssetUser(
taskRun.getTenantId(),
taskRun.getNamespace(),
taskRun.getFlowId(),
newExecution.getFlowRevision(),
taskRun.getExecutionId(),
taskRun.getTaskId(),
taskRun.getId()
);
List<AssetIdentifier> outputIdentifiers = taskRun.getAssets().getOutputs().stream()
.map(asset -> asset.withTenantId(taskRun.getTenantId()))
.map(AssetIdentifier::of)
.toList();
List<AssetIdentifier> inputAssets = taskRun.getAssets().getInputs().stream()
.map(assetIdentifier -> assetIdentifier.withTenantId(taskRun.getTenantId()))
.toList();
try {
assetService.assetLineage(
assetUser,
inputAssets,
outputIdentifiers
);
} catch (QueueException e) {
log.warn("Unable to submit asset lineage event for {} -> {}", inputAssets, outputIdentifiers, e);
}
taskRun.getAssets().getOutputs().forEach(asset -> {
try {
assetService.asyncUpsert(assetUser, asset);
} catch (QueueException e) {
log.warn("Unable to submit asset upsert event for asset {}", asset.getId(), e);
}
});
}
}
}

View File

@@ -0,0 +1,22 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.MultipleConditionEvent',
'io.kestra.ee.assets.AssetLineageEvent',
'io.kestra.ee.assets.AssetUpsertCommand',
'io.kestra.ee.assets.AssetStateEvent'
) NOT NULL

View File

@@ -0,0 +1,3 @@
ALTER TABLE triggers
ADD COLUMN "disabled" BOOL
GENERATED ALWAYS AS (JQ_BOOLEAN("value", '.disabled')) NOT NULL;

View File

@@ -0,0 +1,22 @@
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.MultipleConditionEvent',
'io.kestra.ee.assets.AssetLineageEvent',
'io.kestra.ee.assets.AssetUpsertCommand',
'io.kestra.ee.assets.AssetStateEvent'
) NOT NULL;

View File

@@ -0,0 +1,3 @@
ALTER TABLE triggers
ADD COLUMN `disabled` BOOL
GENERATED ALWAYS AS (value ->> '$.disabled' = 'true') STORED NOT NULL

View File

@@ -0,0 +1,3 @@
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.ee.assets.AssetLineageEvent';
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.ee.assets.AssetUpsertCommand';
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.ee.assets.AssetStateEvent';

View File

@@ -0,0 +1,4 @@
ALTER TABLE triggers
ADD COLUMN "disabled" BOOL
GENERATED ALWAYS AS (CAST(value ->> 'disabled' AS BOOL)) STORED NOT NULL;

View File

@@ -324,6 +324,14 @@ public abstract class AbstractJdbcRepository {
}
}
if(field == QueryFilter.Field.TRIGGER_STATE){
return applyTriggerStateCondition(value, operation);
}
if (field.equals(QueryFilter.Field.METADATA)) {
return findMetadataCondition((Map<?, ?>) value, operation);
}
// Convert the field name to lowercase and quote it
Name columnName = getColumnName(field);
@@ -341,7 +349,7 @@ public abstract class AbstractJdbcRepository {
case CONTAINS -> DSL.field(columnName).like("%" + value + "%");
case REGEX -> DSL.field(columnName).likeRegex((String) value);
case PREFIX -> DSL.field(columnName).like(value + "%")
.or(DSL.field(columnName).eq(value));
.or(DSL.field(columnName).eq(value));
default -> throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
};
}
@@ -376,6 +384,10 @@ public abstract class AbstractJdbcRepository {
throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
}
protected Condition findMetadataCondition(Map<?, ?> metadata, QueryFilter.Op operation) {
throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
}
// Generate the condition for Field.STATE
@SuppressWarnings("unchecked")
private Condition generateStateCondition(Object value, QueryFilter.Op operation) {
@@ -469,6 +481,23 @@ public abstract class AbstractJdbcRepository {
};
}
private Condition applyTriggerStateCondition(Object value, QueryFilter.Op operation) {
String triggerState = value.toString();
Boolean isDisabled = switch (triggerState) {
case "disabled" -> true;
case "enabled" -> false;
default -> null;
};
if (isDisabled == null) {
return DSL.noCondition();
}
return switch (operation) {
case EQUALS -> field("disabled").eq(isDisabled);
case NOT_EQUALS -> field("disabled").ne(isDisabled);
default -> throw new InvalidQueryFiltersException("Unsupported operation for Trigger State: " + operation);
};
}
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
throw new UnsupportedOperationException("formatDateField() not implemented");
}

View File

@@ -7,10 +7,12 @@ import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.test.annotation.TransactionMode;
import io.micronaut.test.condition.TestActiveCondition;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import java.lang.annotation.*;
@Tag("integration")
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.TYPE})
@ExtendWith(KestraTestExtension.class)

View File

@@ -74,7 +74,7 @@ abstract public class TestsUtils {
* @param prefix
* @return
*/
private static String randomString(String... prefix) {
public static String randomString(String... prefix) {
if (prefix.length == 0) {
prefix = new String[]{String.join("-", stackTraceToParts())};
}

View File

@@ -103,6 +103,10 @@ export function useValues(label: string | undefined, t?: ReturnType<typeof useI1
STATUSES: buildFromArray(["PENDING", "ACCEPTED", "EXPIRED"]),
AGGREGATIONS: buildFromArray(["SUM", "AVG", "MIN", "MAX"]),
RELATIVE_DATE,
TRIGGER_STATES:[
{label: t("filter.triggerState.enabled"), value: "enabled"},
{label: t("filter.triggerState.disabled"), value: "disabled"}
]
};
return {VALUES, getRelativeDateLabel};

View File

@@ -41,9 +41,9 @@ export const useTriggerFilter = (): ComputedRef<FilterConfiguration> => {
return [...current, `${(previousCombination ? previousCombination + "." : "")}${part}`];
}, []);
}))].map(namespace => ({
label: namespace,
value: namespace
}));
label: namespace,
value: namespace
}));
}
return [];
},
@@ -116,8 +116,22 @@ export const useTriggerFilter = (): ComputedRef<FilterConfiguration> => {
],
valueType: "text",
searchable: true,
},
{
key: "triggerState",
label: t("filter.triggerState.label"),
description: t("filter.triggerState.description"),
comparators: [
Comparators.EQUALS,
Comparators.NOT_EQUALS
],
valueType: "select",
valueProvider: async () => {
const {VALUES} = useValues("triggers");
return VALUES.TRIGGER_STATES;
}
}
]
};
});
};
};

View File

@@ -4,7 +4,7 @@
id="side-menu"
:menu
@update:collapsed="onToggleCollapse"
width="268px"
width="280px"
:collapsed="collapsed"
linkComponentName="LeftMenuLink"
hideToggle

View File

@@ -1,45 +1,70 @@
import {computed} from "vue";
import {useRoute, useRouter} from "vue-router";
import {useRoute, useRouter, type RouteRecordNameGeneric} from "vue-router";
import {useI18n} from "vue-i18n";
import {useMiscStore} from "override/stores/misc";
import {getDashboard} from "../../components/dashboard/composables/useDashboards";
// Main icons
import ChartLineVariant from "vue-material-design-icons/ChartLineVariant.vue";
import FileTreeOutline from "vue-material-design-icons/FileTreeOutline.vue";
import LayersTripleOutline from "vue-material-design-icons/LayersTripleOutline.vue";
import ContentCopy from "vue-material-design-icons/ContentCopy.vue";
import TimelineClockOutline from "vue-material-design-icons/TimelineClockOutline.vue";
import TimelineTextOutline from "vue-material-design-icons/TimelineTextOutline.vue";
import BallotOutline from "vue-material-design-icons/BallotOutline.vue";
import ShieldAccountVariantOutline from "vue-material-design-icons/ShieldAccountVariantOutline.vue";
import ViewDashboardVariantOutline from "vue-material-design-icons/ViewDashboardVariantOutline.vue";
import Connection from "vue-material-design-icons/Connection.vue";
import DotsSquare from "vue-material-design-icons/DotsSquare.vue";
import FormatListGroupPlus from "vue-material-design-icons/FormatListGroupPlus.vue";
import DatabaseOutline from "vue-material-design-icons/DatabaseOutline.vue";
import ShieldKeyOutline from "vue-material-design-icons/ShieldKeyOutline.vue";
import PlayOutline from "vue-material-design-icons/PlayOutline.vue";
import FileDocumentOutline from "vue-material-design-icons/FileDocumentOutline.vue";
import FlaskOutline from "vue-material-design-icons/FlaskOutline.vue";
// import PackageVariantClosed from "vue-material-design-icons/PackageVariantClosed.vue";
import FolderOpenOutline from "vue-material-design-icons/FolderOpenOutline.vue";
import PuzzleOutline from "vue-material-design-icons/PuzzleOutline.vue";
import ShapePlusOutline from "vue-material-design-icons/ShapePlusOutline.vue";
import OfficeBuildingOutline from "vue-material-design-icons/OfficeBuildingOutline.vue";
import ServerNetworkOutline from "vue-material-design-icons/ServerNetworkOutline.vue";
// Blueprints icons
import Wrench from "vue-material-design-icons/Wrench.vue";
// Tenant Administration icons
import Monitor from "vue-material-design-icons/Monitor.vue";
import DatabaseOutline from "vue-material-design-icons/DatabaseOutline.vue";
import LockOutline from "vue-material-design-icons/LockOutline.vue";
import LightningBolt from "vue-material-design-icons/LightningBolt.vue";
import Battery40 from "vue-material-design-icons/Battery40.vue";
import ShieldAccount from "vue-material-design-icons/ShieldAccount.vue";
export type MenuItem = {
title: string;
routes?: RouteRecordNameGeneric[];
href?: {
path?: string,
name: string,
params?: Record<string, any>,
query?: Record<string, any>
},
child?: MenuItem[],
disabled?: boolean,
name: string;
params?: Record<string, any>;
query?: Record<string, any>;
};
icon?: {
element?: any;
class?: any;
};
child?: MenuItem[];
attributes?: {
locked?: boolean;
};
hidden?: boolean;
};
export function useLeftMenu() {
const {t} = useI18n({useScope: "global"});
const $route = useRoute();
const $router = useRouter();
const miscStore = useMiscStore();
const {t} = useI18n({useScope: "global"});
const configs = useMiscStore().configs;
/**
* Returns all route names that start with the given route
* @param route
* @returns
* Returns the names of all registered routes whose name starts with the given prefix.
*
* @param route - The route name prefix to match against.
* @returns An array of route names starting with the provided prefix.
*/
function routeStartWith(route: string) {
return $router
@@ -50,140 +75,145 @@ export function useLeftMenu() {
.map((r) => r.name);
}
const flatMenuItems = (items: MenuItem[]): MenuItem[] => {
return items.flatMap(item => item.child ? [item, ...flatMenuItems(item.child)] : [item])
}
const menu = computed(() => {
const generatedMenu = [
const menu = computed<MenuItem[]>(() => {
return [
{
title: t("dashboards.labels.plural"),
href: {
name: "home",
params: {dashboard: getDashboard($route, "id")},
params: {
dashboard: getDashboard($route, "id"),
},
},
title: t("dashboards.labels.plural"),
icon: {
element: ViewDashboardVariantOutline,
class: "menu-icon",
element: ChartLineVariant,
},
},
{
href: {name: "flows/list"},
routes: routeStartWith("flows"),
title: t("flows"),
routes: routeStartWith("flows"),
href: {
name: "flows/list",
},
icon: {
element: FileTreeOutline,
class: "menu-icon",
},
exact: false,
},
{
href: {name: "apps/list"},
routes: routeStartWith("apps"),
title: t("apps"),
routes: routeStartWith("apps"),
href: {
name: "apps/list",
},
icon: {
element: FormatListGroupPlus,
class: "menu-icon",
element: LayersTripleOutline,
},
attributes: {
locked: true,
},
},
{
href: {name: "templates/list"},
routes: routeStartWith("templates"),
title: t("templates"),
icon: {
element: ContentCopy,
class: "menu-icon",
},
hidden: !miscStore.configs?.isTemplateEnabled,
},
{
href: {name: "executions/list"},
routes: routeStartWith("executions"),
title: t("executions"),
routes: routeStartWith("executions"),
href: {
name: "executions/list",
},
icon: {
element: TimelineClockOutline,
class: "menu-icon",
element: PlayOutline,
},
},
{
href: {name: "logs/list"},
routes: routeStartWith("logs"),
title: t("logs"),
routes: routeStartWith("logs"),
href: {
name: "logs/list",
},
icon: {
element: TimelineTextOutline,
class: "menu-icon",
element: FileDocumentOutline,
},
},
{
href: {name: "tests/list"},
routes: routeStartWith("tests"),
title: t("demos.tests.label"),
routes: routeStartWith("tests"),
href: {
name: "tests/list",
},
icon: {
element: FlaskOutline,
class: "menu-icon"
},
attributes: {
locked: true,
},
},
// TODO: To add Assets entry here in future release
// Uncomment PackageVariantClosed on line 25 and use as the icon
{
href: {name: "namespaces/list"},
routes: routeStartWith("namespaces"),
title: t("namespaces"),
routes: routeStartWith("namespaces"),
href: {
name: "namespaces/list",
},
icon: {
element: DotsSquare,
class: "menu-icon",
element: FolderOpenOutline,
},
},
{
href: {name: "kv/list"},
routes: routeStartWith("kv"),
title: t("kv.name"),
title: t("templates"),
routes: routeStartWith("templates"),
href: {
name: "templates/list",
},
icon: {
element: DatabaseOutline,
class: "menu-icon",
element: ContentCopy,
},
hidden: !configs?.isTemplateEnabled,
},
{
title: t("plugins.names"),
routes: routeStartWith("plugins"),
href: {
name: "plugins/list",
},
icon: {
element: PuzzleOutline,
},
},
{
href: {name: "secrets/list"},
routes: routeStartWith("secrets"),
title: t("secret.names"),
icon: {
element: ShieldKeyOutline,
class: "menu-icon",
},
attributes: {
locked: true,
},
},
{
routes: routeStartWith("blueprints"),
title: t("blueprints.title"),
routes: routeStartWith("blueprints"),
icon: {
element: BallotOutline,
class: "menu-icon",
element: ShapePlusOutline,
},
child: [
{
title: t("blueprints.custom"),
routes: routeStartWith("blueprints/flow"),
attributes: {
locked: true,
},
routes: routeStartWith("blueprints/flow/custom"),
href: {
name: "blueprints",
params: {kind: "flow", tab: "custom"},
params: {
kind: "flow",
tab: "custom",
},
},
icon: {
element: Wrench,
},
attributes: {
locked: true,
},
},
{
title: t("blueprints.flows"),
routes: routeStartWith("blueprints/flow"),
routes: routeStartWith("blueprints/flow/community"),
href: {
name: "blueprints",
params: {kind: "flow", tab: "community"},
params: {
kind: "flow",
tab: "community",
},
},
icon: {
element: FileTreeOutline,
},
},
{
@@ -191,91 +221,144 @@ export function useLeftMenu() {
routes: routeStartWith("blueprints/dashboard"),
href: {
name: "blueprints",
params: {kind: "dashboard", tab: "community"},
params: {
kind: "dashboard",
tab: "community",
},
},
icon: {
element: ChartLineVariant,
},
},
],
},
{
href: {name: "plugins/list"},
routes: routeStartWith("plugins"),
title: t("plugins.names"),
title: t("tenant_administration"),
routes: [
"admin/stats",
"kv",
"secrets",
"admin/triggers",
"admin/auditlogs",
"admin/iam",
"admin/concurrency-limits",
]
.map(routeStartWith)
.find((routes) => routes.length > 0),
icon: {
element: Connection,
class: "menu-icon",
},
},
{
title: t("administration"),
routes: routeStartWith("admin"),
icon: {
element: ShieldAccountVariantOutline,
class: "menu-icon",
element: OfficeBuildingOutline,
},
child: [
{
href: {name: "admin/iam"},
routes: routeStartWith("admin/iam"),
title: t("iam"),
attributes: {
locked: true,
},
},
{
href: {name: "admin/auditlogs/list"},
routes: routeStartWith("admin/auditlogs"),
title: t("auditlogs"),
attributes: {
locked: true,
},
},
{
href: {name: "admin/triggers"},
routes: routeStartWith("admin/triggers"),
title: t("triggers"),
},
{
href: {name: "admin/instance"},
routes: routeStartWith("admin/instance"),
title: t("instance"),
attributes: {
locked: true,
},
},
{
href: {name: "admin/tenants/list"},
routes: routeStartWith("admin/tenants"),
title: t("tenant.names"),
attributes: {
locked: true,
},
},
{
href: {name: "admin/concurrency-limits"},
routes: routeStartWith("admin/concurrency-limits"),
title: t("concurrency limits"),
hidden: !miscStore.configs?.isConcurrencyViewEnabled,
},
{
href: {name: "admin/stats"},
routes: routeStartWith("admin/stats"),
title: t("system overview"),
routes: routeStartWith("admin/stats"),
href: {
name: "admin/stats",
},
icon: {
element: Monitor,
},
},
{
title: t("kv.name"),
routes: routeStartWith("kv"),
href: {
name: "kv/list",
},
icon: {
element: DatabaseOutline,
},
},
{
title: t("secret.names"),
routes: routeStartWith("secrets"),
href: {
name: "secrets/list",
},
icon: {
element: LockOutline,
},
attributes: {
locked: true,
},
},
{
title: t("triggers"),
routes: routeStartWith("admin/triggers"),
href: {
name: "admin/triggers",
},
icon: {
element: LightningBolt,
},
},
{
title: t("auditlogs"),
routes: routeStartWith("admin/auditlogs"),
href: {
name: "admin/auditlogs/list",
},
icon: {
element: FileDocumentOutline,
},
attributes: {
locked: true,
},
},
{
title: t("concurrency limits"),
routes: routeStartWith("admin/concurrency-limits"),
href: {
name: "admin/concurrency-limits",
},
icon: {
element: Battery40,
},
hidden: !configs?.isConcurrencyViewEnabled,
},
{
title: t("iam"),
routes: routeStartWith("admin/iam"),
href: {
name: "admin/iam",
},
icon: {
element: ShieldAccount,
},
attributes: {
locked: true,
},
},
],
},
{
title: t("instance_administration"),
routes: routeStartWith("admin/instance"),
href: {
name: "admin/instance",
},
icon: {
element: ServerNetworkOutline,
},
attributes: {
locked: true,
},
},
].map((item: MenuItem) => {
if (item.icon?.element) {
item.icon.class = "menu-icon"; // Add default class to all menu icons
}
];
flatMenuItems(generatedMenu).forEach(menuItem => {
if (menuItem.href !== undefined && menuItem.href?.name === $route.name) {
menuItem.href.query = {...$route.query, ...menuItem.href?.query};
if (item.href && item.href?.name === $route.name) {
item.href.query = {
...$route.query,
...item.href?.query,
};
}
return item;
});
return generatedMenu;
});
return {
routeStartWith,
menu
};
return {menu};
}

View File

@@ -110,7 +110,7 @@ export default [
//Admin
{name: "admin/triggers", path: "/:tenant?/admin/triggers", component: () => import("../components/admin/Triggers.vue")},
{name: "admin/stats", path: "/:tenant?/admin/stats", component: () => import("override/components/admin/stats/Stats.vue")},
{name: "admin/stats", path: "/:tenant?/admin/stats/:type?", component: () => import("override/components/admin/stats/Stats.vue")},
{name: "admin/concurrency-limits", path: "/:tenant?/admin/concurrency-limits", component: () => import("../components/admin/ConcurrencyLimits.vue")},
//Setup

View File

@@ -1,208 +1,233 @@
@import "@kestra-io/ui-libs/src/scss/variables.scss";
#app {
.vsm--item {
padding: 0 30px;
transition: padding 0.2s ease;
}
#app {
.vsm--icon {
transition: left 0.2s ease;
font-size: 1.5em;
background-color: transparent !important;
padding-bottom: 15px;
width: 30px !important;
z-index: 20; // in collapsed menu, keep the icon above the opening menu
.vsm--icon {
width: 20px;
margin-right: calc($spacer / 2);
transition: left 0.2s ease;
background-color: transparent !important;
padding-bottom: 15px;
svg {
position: relative;
margin-top: 13px;
svg {
height: 20px !important;
width: 20px !important;
position: relative;
margin-top: 13px;
}
}
.vsm--title {
font-size: $font-size-sm;
&>span {
width: 100%;
}
}
.vsm--child {
.vsm--item {
padding: 0;
.vsm--title {
font-size: $font-size-xs;
}
}
// Make Plugins icon appear as outline
.vsm--link[href*="plugins"] .vsm--icon svg {
fill: none !important;
stroke: currentColor !important;
stroke-width: 1.5 !important;
.vsm--icon {
width: 1rem;
svg {
height: 1rem !important;
width: 1rem !important;
}
}
}
.vsm--link {
height: 30px;
padding: 0.25rem 0.5rem;
margin-bottom: 0.3rem;
border-radius: .25rem;
transition: padding 0.2s ease;
color: var(--ks-content-primary);
box-shadow: none;
&_active,
body &_active:hover {
background-color: var(--ks-button-background-primary) !important;
color: var(--ks-button-content-primary);
font-weight: normal;
}
.vsm--item {
padding: 0 30px;
transition: padding 0.2s ease;
&.vsm--link_open,
&.vsm--link_open:hover {
background-color: var(--ks-background-left-menu);
color: var(--ks-content-primary);
}
.vsm--child {
.vsm--item {
padding: 0;
.vsm--title {
padding-left: 10px;
&_disabled {
pointer-events: auto;
opacity: 1;
}
&:hover,
body &_hover {
background-color: var(--ks-button-background-secondary-hover);
}
.el-tooltip__trigger {
display: flex;
}
&>span {
max-width: 100%;
}
}
.vsm--link_open {
position: relative !important;
z-index: 3;
}
.vsm_collapsed .vsm--link_open {
position: static !important;
}
.vsm--child .vsm--link {
padding: 0 0.2rem;
position: relative !important;
margin-left: 1.8rem;
&.vsm--link_level-3 {
margin-left: 3.6rem;
& span {
margin-left: calc($spacer / 4);
}
}
.vsm--icon {
margin-left: calc($spacer / 2);
color: var(--ks-content-secondary);
}
&.vsm--link_active .vsm--icon {
color: var(--ks-button-content-primary);
}
&:before {
content: "";
position: absolute;
left: -.8rem;
height: 150%;
border: 2px solid var(--ks-border-primary);
border-top: 0;
border-right: 0;
z-index: 2;
// mask the right half of the object and the top border
clip-path: polygon(50% 8px, 50% 100%, 0 100%, 0 8px);
}
}
.vsm--title span:first-child {
flex-grow: 0;
}
.vsm--link_open.vsm--link_active {
.vsm--title,
.vsm--icon {
color: var(--ks-button-content-primary);
}
}
.vsm--arrow_default {
width: 8px;
&:before {
border-left-width: 1px;
border-bottom-width: 1px;
height: 4px;
width: 4px;
top: 3px;
}
}
a.vsm--link_active[href="#"] {
cursor: initial !important;
}
.vsm--dropdown {
background-color: var(--ks-background-left-menu);
border-radius: 4px;
margin-bottom: .5rem;
.vsm--title {
top: 3px;
}
}
.vsm--scroll-thumb {
background: var(--ks-border-primary) !important;
border-radius: 8px;
}
.vsm--mobile-bg {
border-radius: 0 var(--bs-border-radius) var(--bs-border-radius) 0;
}
.vsm_collapsed {
.logo {
>* {
left: 10px;
span.img {
background-size: 207px 55px;
}
}
}
.vsm--link {
padding: 0.3rem 0.5rem;
margin-bottom: 0.3rem;
border-radius: .25rem;
transition: padding 0.2s ease;
color: var(--ks-content-primary);
box-shadow: none;
padding-left: 13px;
&_active, body &_active:hover {
background-color: var(--ks-button-background-primary) !important;
color: var(--ks-button-content-primary);
font-weight: normal;
}
&.vsm--link_open, &.vsm--link_open:hover {
background-color: var(--ks-background-left-menu);
color: var(--ks-content-primary);
}
&_disabled {
pointer-events: auto;
opacity: 1;
}
&:hover, body &_hover {
background-color: var(--ks-button-background-secondary-hover);
}
.el-tooltip__trigger {
display: flex;
}
& > span{
max-width: 100%;
}
}
.vsm--link_open{
position: relative !important;
z-index: 3;
}
.vsm_collapsed .vsm--link_open{
position: static !important;
}
.vsm--child .vsm--link{
padding: 0 0.2rem;
position: relative!important;
font-size: 14px;
margin-left: 1.8rem;
.vsm--icon {
margin-right:4px;
color: var(--ks-content-secondary);
}
&.vsm--link_active .vsm--icon{
&.vsm--link_hover {
background-color: var(--ks-button-background-primary);
color: var(--ks-button-content-primary);
}
&:before{
content: "";
position: absolute;
left: -.8rem;
top: -2.5rem;
border-radius: 8px;
width: 1.6rem;
height: 170%;
border: 2px solid var(--ks-border-primary);
border-top:0;
border-right:0;
z-index: 2;
// mask the right half of the object and the top border
clip-path: polygon(50% 8px, 50% 100%, 0 100%, 0 8px);
}
}
.vsm--title span:first-child{
flex-grow: 0;
}
.vsm--link_open.vsm--link_active {
.vsm--title, .vsm--icon {
color: var(--ks-button-content-primary);
}
}
.vsm--arrow_default{
width: 8px;
&:before{
border-left-width: 1px;
border-bottom-width: 1px;
height: 4px;
width: 4px;
top: 3px;
}
}
a.vsm--link_active[href="#"] {
cursor: initial !important;
}
.vsm--dropdown {
background-color: var(--ks-background-left-menu);
border-radius: 4px;
margin-bottom: .5rem;
.vsm--title {
top: 3px;
}
}
.vsm--scroll-thumb {
background: var(--ks-border-primary) !important;
border-radius: 8px;
}
.vsm--mobile-bg {
border-radius: 0 var(--bs-border-radius) var(--bs-border-radius) 0;
}
.vsm_collapsed {
.logo {
> * {
left: 10px;
span.img {
background-size: 207px 55px;
}
}
}
.vsm--link {
padding-left: 13px;
&.vsm--link_hover {
background-color: var(--ks-button-background-primary);
color: var(--ks-button-content-primary);
}
}
.vsm--item {
padding: 0 5px;
}
.el-button {
margin-right: 0;
}
}
.el-tooltip__trigger .lock-icon.material-design-icon > .material-design-icon__svg {
bottom: 0 !important;
margin-left: 5px;
}
.vsm--item {
position: relative;
&::after {
content: '';
position: absolute;
bottom: 0;
left: 0;
right: 10px;
height: 1.25rem;
z-index: 5;
background: linear-gradient(to top, var(--ks-background-left-menu), transparent);
opacity: 0.18;
}
padding: 0 5px;
}
}
.el-button {
margin-right: 0;
}
}
.el-tooltip__trigger .lock-icon.material-design-icon>.material-design-icon__svg {
bottom: 0 !important;
margin-left: 5px;
}
.vsm--item {
position: relative;
&::after {
content: '';
position: absolute;
bottom: 0;
left: 0;
right: 10px;
height: 1.25rem;
z-index: 5;
background: linear-gradient(to top, var(--ks-background-left-menu), transparent);
opacity: 0.18;
}
}
}

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "oder benutzerdefinierte Dauer eingeben:",
"inputs": "Inputs",
"instance": "Instanz",
"instance_administration": "Instanzverwaltung",
"invalid bulk delete": "Ausführungen konnten nicht gelöscht werden",
"invalid bulk force run": "Konnte Ausführungen nicht erzwingen",
"invalid bulk kill": "Ausführungen konnten nicht beendet werden",
@@ -1737,6 +1738,7 @@
"names": "Mandanten"
},
"tenantId": "Mandanten-ID",
"tenant_administration": "Mandantenverwaltung",
"test-badge-text": "Test",
"test-badge-tooltip": "Diese Ausführung wurde durch einen Test erstellt",
"theme": "Modus",

View File

@@ -391,6 +391,8 @@
"conditions": "Conditions",
"triggerId": "Trigger ID",
"tenantId": "Tenant ID",
"tenant_administration": "Tenant Administration",
"instance_administration": "Instance Administration",
"codeDisabled": "Disabled in Flow",
"paused": "Paused",
"Fold auto": "Editor: automatic fold of multi-lines",
@@ -1727,6 +1729,12 @@
"label": "Trigger Execution ID",
"description": "Filter by trigger execution ID"
},
"triggerState":{
"label": " Trigger State",
"description": "Filter by trigger state",
"enabled": "Enabled",
"disabled": "Disabled"
},
"scope_flow": {
"label": "Scope",
"description": "Filter by flow scope"

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "o ingrese duración personalizada:",
"inputs": "Entradas",
"instance": "Instancia",
"instance_administration": "Administración de Instancia",
"invalid bulk delete": "No se pudieron eliminar las ejecuciones",
"invalid bulk force run": "No se pudo forzar la ejecución de ejecuciones",
"invalid bulk kill": "No se pudieron matar las ejecuciones",
@@ -1737,6 +1738,7 @@
"names": "Arrendatarios"
},
"tenantId": "ID de Mandante",
"tenant_administration": "Administración de Mandantes",
"test-badge-text": "Prueba",
"test-badge-tooltip": "Esta ejecución fue creada por una prueba",
"theme": "Tema",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "ou saisir une durée personnalisée :",
"inputs": "Entrées",
"instance": "Instance",
"instance_administration": "Administration de l'Instance",
"invalid bulk delete": "Impossible de supprimer les exécutions",
"invalid bulk force run": "Impossible de forcer l'exécution des exécutions",
"invalid bulk kill": "Impossible d'arrêter les exécutions",
@@ -1737,6 +1738,7 @@
"names": "Mandants"
},
"tenantId": "ID du mandant",
"tenant_administration": "Administration des Mandants",
"test-badge-text": "Test",
"test-badge-tooltip": "Cette exécution a été créée par un Test",
"theme": "Thème",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "या कस्टम अवधि दर्ज करें:",
"inputs": "इनपुट्स",
"instance": "इंस्टेंस",
"instance_administration": "इंस्टेंस प्रशासन",
"invalid bulk delete": "निष्पादन हटाने में असमर्थ",
"invalid bulk force run": "निष्पादन को जबरन चलाने में असमर्थ",
"invalid bulk kill": "निष्पादन kill करने में असमर्थ",
@@ -1737,6 +1738,7 @@
"names": "मंडल"
},
"tenantId": "टेनेंट ID",
"tenant_administration": "किरायेदार प्रशासन",
"test-badge-text": "परीक्षण",
"test-badge-tooltip": "यह execution एक Test द्वारा बनाया गया था",
"theme": "थीम",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "oppure inserisci durata personalizzata:",
"inputs": "Inputs",
"instance": "Istanza",
"instance_administration": "Amministrazione dell'istanza",
"invalid bulk delete": "Impossibile eliminare le esecuzioni",
"invalid bulk force run": "Impossibile forzare l'esecuzione delle esecuzioni",
"invalid bulk kill": "Impossibile kill le esecuzioni",
@@ -1737,6 +1738,7 @@
"names": "Mandanti"
},
"tenantId": "ID del Mandante",
"tenant_administration": "Amministrazione del Mandante",
"test-badge-text": "Test",
"test-badge-tooltip": "Questa esecuzione è stata creata da un Test",
"theme": "Tema",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "またはカスタム期間を入力してください:",
"inputs": "Inputs",
"instance": "インスタンス",
"instance_administration": "インスタンス管理",
"invalid bulk delete": "実行を削除できませんでした",
"invalid bulk force run": "実行を強制的に開始できませんでした",
"invalid bulk kill": "実行をkillできませんでした",
@@ -1737,6 +1738,7 @@
"names": "テナント"
},
"tenantId": "テナントID",
"tenant_administration": "テナント管理",
"test-badge-text": "テスト",
"test-badge-tooltip": "この実行はテストによって作成されました",
"theme": "テーマ",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "또는 사용자 지정 기간 입력:",
"inputs": "Inputs",
"instance": "인스턴스",
"instance_administration": "인스턴스 관리",
"invalid bulk delete": "실행을 삭제할 수 없습니다",
"invalid bulk force run": "실행을 강제로 실행할 수 없습니다.",
"invalid bulk kill": "실행을 강제 종료할 수 없습니다",
@@ -1737,6 +1738,7 @@
"names": "테넌트"
},
"tenantId": "테넌트 ID",
"tenant_administration": "테넌트 관리",
"test-badge-text": "테스트",
"test-badge-tooltip": "이 실행은 테스트에 의해 생성되었습니다.",
"theme": "테마",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "lub wprowadź niestandardowy czas trwania:",
"inputs": "Inputs",
"instance": "Instancja",
"instance_administration": "Administracja Instancji",
"invalid bulk delete": "Nie można usunąć wykonań",
"invalid bulk force run": "Nie można wymusić uruchomienia wykonania",
"invalid bulk kill": "Nie można zabić wykonań",
@@ -1737,6 +1738,7 @@
"names": "Najemcy"
},
"tenantId": "Identyfikator Mandanta",
"tenant_administration": "Administracja Mandanta",
"test-badge-text": "Test",
"test-badge-tooltip": "To wykonanie zostało utworzone przez Test.",
"theme": "Motyw",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "ou insira uma duração personalizada:",
"inputs": "Inputs",
"instance": "Instância",
"instance_administration": "Administração da Instância",
"invalid bulk delete": "Não foi possível deletar execuções",
"invalid bulk force run": "Não foi possível forçar a execução das execuções",
"invalid bulk kill": "Não foi possível matar execuções",
@@ -1737,6 +1738,7 @@
"names": "Mandantes"
},
"tenantId": "ID do Mandante",
"tenant_administration": "Administração do Mandante",
"test-badge-text": "Teste",
"test-badge-tooltip": "Esta execução foi criada por um Teste",
"theme": "Tema",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "ou insira uma duração personalizada:",
"inputs": "Inputs",
"instance": "Instância",
"instance_administration": "Administração da Instância",
"invalid bulk delete": "Não foi possível excluir execuções",
"invalid bulk force run": "Não foi possível forçar a execução das execuções",
"invalid bulk kill": "Não foi possível matar execuções",
@@ -1737,6 +1738,7 @@
"names": "Clientes"
},
"tenantId": "ID do Cliente",
"tenant_administration": "Administração de Tenant",
"test-badge-text": "Teste",
"test-badge-tooltip": "Esta execução foi criada por um Teste",
"theme": "Tema",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "или введите пользовательскую продолжительность:",
"inputs": "Входные данные",
"instance": "Экземпляр",
"instance_administration": "Администрирование экземпляра",
"invalid bulk delete": "Не удалось удалить выполнения",
"invalid bulk force run": "Не удалось принудительно запустить executions",
"invalid bulk kill": "Не удалось убить выполнения",
@@ -1737,6 +1738,7 @@
"names": "Арендаторы"
},
"tenantId": "ID арендатора",
"tenant_administration": "Администрирование Манданта",
"test-badge-text": "Тест",
"test-badge-tooltip": "Это выполнение было создано тестом",
"theme": "Тема",

View File

@@ -1005,6 +1005,7 @@
"input_custom_duration": "或输入自定义持续时间:",
"inputs": "输入",
"instance": "实例",
"instance_administration": "实例管理",
"invalid bulk delete": "无法删除执行",
"invalid bulk force run": "无法强制运行执行",
"invalid bulk kill": "无法终止执行",
@@ -1737,6 +1738,7 @@
"names": "租户"
},
"tenantId": "租户 ID",
"tenant_administration": "租户管理",
"test-badge-text": "测试",
"test-badge-tooltip": "此执行由测试创建",
"theme": "主题",

View File

@@ -25,7 +25,7 @@ public class ConcurrencyLimitController {
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/search")
@Operation(tags = {"Flows", "Executions"}, summary = "Search for flow concurrency limits")
@Operation(tags = {"Flows"}, summary = "Search for flow concurrency limits")
public PagedResults<ConcurrencyLimit> searchConcurrencyLimits() {
var results = concurrencyLimitService.find(tenantService.resolveTenant());
return PagedResults.of(new ArrayListTotal<>(results, results.size()));
@@ -33,7 +33,7 @@ public class ConcurrencyLimitController {
@ExecuteOn(TaskExecutors.IO)
@Put("/{namespace}/{flowId}")
@Operation(tags = {"Flows", "Executions"}, summary = "Update a flow concurrency limit")
@Operation(tags = {"Flows"}, summary = "Update a flow concurrency limit")
public HttpResponse<ConcurrencyLimit> updateConcurrencyLimit(@Body ConcurrencyLimit concurrencyLimit) {
var existing = concurrencyLimitService.findById(concurrencyLimit.getTenantId(), concurrencyLimit.getNamespace(), concurrencyLimit.getFlowId());
if (existing.isEmpty()) {

View File

@@ -9,6 +9,9 @@ import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.AssetsDeclaration;
import io.kestra.core.models.assets.AssetsInOut;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
@@ -954,7 +957,15 @@ public class DefaultWorker implements Worker {
try {
Variables variables = variablesService.of(StorageContext.forTask(taskRun), workerTaskCallable.getTaskOutput());
taskRun = taskRun.withOutputs(variables);
if (workerTask.getTask().getAssets() != null) {
List<Asset> outputAssets = runContext.assets().outputs();
Optional<AssetsDeclaration> renderedAssetsDeclaration = runContext.render(workerTask.getTask().getAssets()).as(AssetsDeclaration.class);
renderedAssetsDeclaration.map(AssetsDeclaration::getOutputs).ifPresent(outputAssets::addAll);
taskRun = taskRun.withOutputs(variables).withAssets(new AssetsInOut(
renderedAssetsDeclaration.map(AssetsDeclaration::getInputs).orElse(null),
outputAssets
));
}
} catch (Exception e) {
logger.warn("Unable to save output on taskRun '{}'", taskRun, e);
}