mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
2 Commits
feat/agent
...
feat/asset
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c07ecd3d4 | ||
|
|
950223e0a8 |
@@ -24,6 +24,9 @@ dependencies {
|
||||
// reactor
|
||||
api "io.projectreactor:reactor-core"
|
||||
|
||||
// awaitility
|
||||
api "org.awaitility:awaitility"
|
||||
|
||||
// micronaut
|
||||
api "io.micronaut.data:micronaut-data-model"
|
||||
implementation "io.micronaut:micronaut-http-server-netty"
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.docs;
|
||||
|
||||
import com.fasterxml.classmate.ResolvedType;
|
||||
import com.fasterxml.classmate.members.HierarchicType;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
@@ -22,6 +23,8 @@ import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.CustomAsset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
@@ -63,7 +66,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class JsonSchemaGenerator {
|
||||
|
||||
|
||||
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
|
||||
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
|
||||
|
||||
@@ -276,10 +279,10 @@ public class JsonSchemaGenerator {
|
||||
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
|
||||
.with(Option.PLAIN_DEFINITION_KEYS)
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);
|
||||
|
||||
|
||||
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
|
||||
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
|
||||
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
|
||||
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider() {
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
try {
|
||||
@@ -321,7 +324,7 @@ public class JsonSchemaGenerator {
|
||||
// inline some type
|
||||
builder.forTypesInGeneral()
|
||||
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
|
||||
|
||||
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
|
||||
@@ -589,11 +592,31 @@ public class JsonSchemaGenerator {
|
||||
// The `const` property is used by editors for auto-completion based on that schema.
|
||||
builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> {
|
||||
final Class<?> pluginType = scope.getType().getErasedType();
|
||||
if (pluginType.getAnnotation(Plugin.class) != null) {
|
||||
Plugin pluginAnnotation = pluginType.getAnnotation(Plugin.class);
|
||||
if (pluginAnnotation != null) {
|
||||
ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties");
|
||||
if (properties != null) {
|
||||
String typeConst = pluginType.getName();
|
||||
// This is needed so that assets can have arbitrary types while still being able to be identified as assets.
|
||||
if (pluginType == CustomAsset.class) {
|
||||
properties.set("type", context.getGeneratorConfig().createObjectNode()
|
||||
.put("type", "string")
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (Asset.class.isAssignableFrom(pluginType)) {
|
||||
// For Asset types, we want to be able to use a simple-string type. Convention is that first alias is that string type.
|
||||
typeConst = pluginAnnotation.aliases().length > 0 ? pluginAnnotation.aliases()[0] : pluginType.getName();
|
||||
Arrays.stream(pluginType.getDeclaredMethods())
|
||||
.filter(m -> m.isAnnotationPresent(JsonProperty.class))
|
||||
.forEach(m -> properties.set(m.getAnnotation(JsonProperty.class).value(), context.getGeneratorConfig().createObjectNode()
|
||||
.put("type", "string")
|
||||
));
|
||||
}
|
||||
|
||||
properties.set("type", context.getGeneratorConfig().createObjectNode()
|
||||
.put("const", pluginType.getName())
|
||||
.put("const", typeConst)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -764,6 +787,14 @@ public class JsonSchemaGenerator {
|
||||
consumer.accept(typeContext.resolve(clz));
|
||||
}
|
||||
}).toList();
|
||||
} else if (declaredType.getErasedType() == Asset.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getAssets().stream())
|
||||
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.map(typeContext::resolve)
|
||||
.toList();
|
||||
}
|
||||
|
||||
return null;
|
||||
|
||||
@@ -103,12 +103,48 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
|
||||
}
|
||||
},
|
||||
METADATA("metadata") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
|
||||
}
|
||||
},
|
||||
FLOW_ID("flowId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
|
||||
}
|
||||
},
|
||||
FLOW_REVISION("flowRevision") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
ID("id") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
ASSET_ID("assetId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
TYPE("type") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
CREATED("created") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
UPDATED("updated") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -163,6 +199,18 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
TASK_ID("taskId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
TASK_RUN_ID("taskRunId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
CHILD_FILTER("childFilter") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -312,6 +360,34 @@ public record QueryFilter(
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
},
|
||||
ASSET {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.QUERY,
|
||||
Field.ID,
|
||||
Field.TYPE,
|
||||
Field.NAMESPACE,
|
||||
Field.METADATA,
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
},
|
||||
ASSET_USAGE {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.ASSET_ID,
|
||||
Field.NAMESPACE,
|
||||
Field.FLOW_ID,
|
||||
Field.FLOW_REVISION,
|
||||
Field.EXECUTION_ID,
|
||||
Field.TASK_ID,
|
||||
Field.TASK_RUN_ID,
|
||||
Field.CREATED
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
public abstract List<Field> supportedField();
|
||||
|
||||
111
core/src/main/java/io/kestra/core/models/assets/Asset.java
Normal file
111
core/src/main/java/io/kestra/core/models/assets/Asset.java
Normal file
@@ -0,0 +1,111 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAnySetter;
|
||||
import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public abstract class Asset implements HasUID, DeletedInterface, Plugin {
|
||||
@Hidden
|
||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||
protected String tenantId;
|
||||
|
||||
@Pattern(regexp = "^[a-z0-9][a-z0-9._-]*")
|
||||
@Size(min = 1, max = 150)
|
||||
protected String namespace;
|
||||
|
||||
@NotBlank
|
||||
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
|
||||
@Size(min = 1, max = 150)
|
||||
protected String id;
|
||||
|
||||
@NotBlank
|
||||
protected String type;
|
||||
|
||||
protected String displayName;
|
||||
|
||||
protected String description;
|
||||
|
||||
protected Map<String, Object> metadata;
|
||||
|
||||
@Nullable
|
||||
@Hidden
|
||||
private Instant created;
|
||||
|
||||
@Nullable
|
||||
@Hidden
|
||||
private Instant updated;
|
||||
|
||||
@Hidden
|
||||
private boolean deleted;
|
||||
|
||||
public Asset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String type,
|
||||
String displayName,
|
||||
String description,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
this.tenantId = tenantId;
|
||||
this.namespace = namespace;
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.displayName = displayName;
|
||||
this.description = description;
|
||||
this.metadata = Optional.ofNullable(metadata).map(HashMap::new).orElse(new HashMap<>());
|
||||
Instant now = Instant.now();
|
||||
this.created = Optional.ofNullable(created).orElse(now);
|
||||
this.updated = Optional.ofNullable(updated).orElse(now);
|
||||
this.deleted = deleted;
|
||||
}
|
||||
|
||||
public <T extends Asset> T toUpdated() {
|
||||
if (this.created == null) {
|
||||
this.created = Instant.now();
|
||||
}
|
||||
this.updated = Instant.now();
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
public Asset toDeleted() {
|
||||
this.deleted = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
@JsonAnySetter
|
||||
public void setMetadata(String name, Object value) {
|
||||
metadata.put(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return Asset.uid(tenantId, id);
|
||||
}
|
||||
|
||||
public static String uid(String tenantId, String id) {
|
||||
return IdUtils.fromParts(tenantId, id);
|
||||
}
|
||||
|
||||
public Asset withTenantId(String tenantId) {
|
||||
this.tenantId = tenantId;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
|
||||
public record AssetIdentifier(@Hidden String tenantId, @Hidden String namespace, String id){
|
||||
|
||||
public AssetIdentifier withTenantId(String tenantId) {
|
||||
return new AssetIdentifier(tenantId, this.namespace, this.id);
|
||||
}
|
||||
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(tenantId, id);
|
||||
}
|
||||
|
||||
public static AssetIdentifier of(Asset asset) {
|
||||
return new AssetIdentifier(asset.getTenantId(), asset.getNamespace(), asset.getId());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
|
||||
/**
|
||||
* Represents an entity that used an asset
|
||||
*/
|
||||
public record AssetUser(String tenantId, String namespace, String flowId, Integer flowRevision, String executionId, String taskId, String taskRunId) implements HasUID {
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(tenantId, namespace, flowId, String.valueOf(flowRevision), executionId, taskRunId);
|
||||
}
|
||||
|
||||
public FlowId toFlowId() {
|
||||
return FlowId.of(tenantId, namespace, flowId, flowRevision);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Getter
|
||||
public class AssetsDeclaration extends AssetsInOut {
|
||||
private boolean enableAuto;
|
||||
|
||||
@JsonCreator
|
||||
public AssetsDeclaration(Boolean enableAuto, List<AssetIdentifier> inputs, List<Asset> outputs) {
|
||||
super(inputs, outputs);
|
||||
|
||||
this.enableAuto = Optional.ofNullable(enableAuto).orElse(false);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Getter
|
||||
public class AssetsInOut {
|
||||
private List<AssetIdentifier> inputs;
|
||||
|
||||
private List<Asset> outputs;
|
||||
|
||||
@JsonCreator
|
||||
public AssetsInOut(List<AssetIdentifier> inputs, List<Asset> outputs) {
|
||||
this.inputs = Optional.ofNullable(inputs).orElse(Collections.emptyList());
|
||||
this.outputs = Optional.ofNullable(outputs).orElse(Collections.emptyList());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
public class CustomAsset extends Asset {
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public CustomAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String type,
|
||||
String displayName,
|
||||
String description,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, type, displayName, description, metadata, created, updated, deleted);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = DatasetAsset.ASSET_TYPE)
|
||||
public class DatasetAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "DATASET";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public DatasetAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String system,
|
||||
String location,
|
||||
String format,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setSystem(system);
|
||||
this.setLocation(location);
|
||||
this.setFormat(format);
|
||||
}
|
||||
|
||||
@JsonProperty("system")
|
||||
public String getSystem() {
|
||||
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("location")
|
||||
public String getLocation() {
|
||||
return Optional.ofNullable(metadata.get("location")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("format")
|
||||
public String getFormat() {
|
||||
return Optional.ofNullable(metadata.get("format")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setSystem(String system) {
|
||||
if (system != null) {
|
||||
metadata.put("system", system);
|
||||
}
|
||||
}
|
||||
|
||||
public void setLocation(String location) {
|
||||
if (location != null) {
|
||||
metadata.put("location", location);
|
||||
}
|
||||
}
|
||||
|
||||
public void setFormat(String format) {
|
||||
if (format != null) {
|
||||
metadata.put("format", format);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = ExternalAsset.ASSET_TYPE)
|
||||
public class ExternalAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "EXTERNAL";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public ExternalAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = FileAsset.ASSET_TYPE)
|
||||
public class FileAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "FILE";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public FileAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String system,
|
||||
String path,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setSystem(system);
|
||||
this.setPath(path);
|
||||
}
|
||||
|
||||
@JsonProperty("system")
|
||||
public String getSystem() {
|
||||
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("path")
|
||||
public String getPath() {
|
||||
return Optional.ofNullable(metadata.get("path")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setSystem(String system) {
|
||||
if (system != null) {
|
||||
metadata.put("system", system);
|
||||
}
|
||||
}
|
||||
|
||||
public void setPath(String path) {
|
||||
if (path != null) {
|
||||
metadata.put("path", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = TableAsset.ASSET_TYPE)
|
||||
public class TableAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "TABLE";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public TableAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String system,
|
||||
String database,
|
||||
String schema,
|
||||
String name,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setSystem(system);
|
||||
this.setDatabase(database);
|
||||
this.setSchema(schema);
|
||||
this.setName(name);
|
||||
}
|
||||
|
||||
@JsonProperty("system")
|
||||
public String getSystem() {
|
||||
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("database")
|
||||
public String getDatabase() {
|
||||
return Optional.ofNullable(metadata.get("database")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("schema")
|
||||
public String getSchema() {
|
||||
return Optional.ofNullable(metadata.get("schema")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("name")
|
||||
public String getName() {
|
||||
return Optional.ofNullable(metadata.get("name")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setSystem(String system) {
|
||||
if (system != null) {
|
||||
metadata.put("system", system);
|
||||
}
|
||||
}
|
||||
|
||||
public void setDatabase(String database) {
|
||||
if (database != null) {
|
||||
metadata.put("database", database);
|
||||
}
|
||||
}
|
||||
|
||||
public void setSchema(String schema) {
|
||||
if (schema != null) {
|
||||
metadata.put("schema", schema);
|
||||
}
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
if (name != null) {
|
||||
metadata.put("name", name);
|
||||
}
|
||||
}
|
||||
}
|
||||
73
core/src/main/java/io/kestra/core/models/assets/VmAsset.java
Normal file
73
core/src/main/java/io/kestra/core/models/assets/VmAsset.java
Normal file
@@ -0,0 +1,73 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = VmAsset.ASSET_TYPE)
|
||||
public class VmAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "VM";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public VmAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String provider,
|
||||
String region,
|
||||
String state,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setProvider(provider);
|
||||
this.setRegion(region);
|
||||
this.setState(state);
|
||||
}
|
||||
|
||||
@JsonProperty("provider")
|
||||
public String getProvider() {
|
||||
return Optional.ofNullable(metadata.get("provider")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("region")
|
||||
public String getRegion() {
|
||||
return Optional.ofNullable(metadata.get("region")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("state")
|
||||
public String getState() {
|
||||
return Optional.ofNullable(metadata.get("state")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setProvider(String provider) {
|
||||
if (provider != null) {
|
||||
metadata.put("provider", provider);
|
||||
}
|
||||
}
|
||||
|
||||
public void setRegion(String region) {
|
||||
if (region != null) {
|
||||
metadata.put("region", region);
|
||||
}
|
||||
}
|
||||
|
||||
public void setState(String state) {
|
||||
if (state != null) {
|
||||
metadata.put("state", state);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.models.executions;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.assets.AssetsInOut;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
@@ -57,6 +58,10 @@ public class TaskRun implements TenantInterface {
|
||||
@Schema(implementation = Object.class)
|
||||
Variables outputs;
|
||||
|
||||
@With
|
||||
@Nullable
|
||||
AssetsInOut assets;
|
||||
|
||||
@NotNull
|
||||
State state;
|
||||
|
||||
@@ -87,6 +92,7 @@ public class TaskRun implements TenantInterface {
|
||||
this.value,
|
||||
this.attempts,
|
||||
this.outputs,
|
||||
this.assets,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
@@ -114,6 +120,7 @@ public class TaskRun implements TenantInterface {
|
||||
this.value,
|
||||
newAttempts,
|
||||
this.outputs,
|
||||
this.assets,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
@@ -137,6 +144,7 @@ public class TaskRun implements TenantInterface {
|
||||
this.value,
|
||||
newAttempts,
|
||||
this.outputs,
|
||||
this.assets,
|
||||
this.state.withState(State.Type.FAILED),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
@@ -156,6 +164,7 @@ public class TaskRun implements TenantInterface {
|
||||
.value(this.getValue())
|
||||
.attempts(this.getAttempts())
|
||||
.outputs(this.getOutputs())
|
||||
.assets(this.getAssets())
|
||||
.state(state == null ? this.getState() : state)
|
||||
.iteration(this.getIteration())
|
||||
.build();
|
||||
@@ -242,6 +251,7 @@ public class TaskRun implements TenantInterface {
|
||||
", parentTaskRunId=" + this.getParentTaskRunId() +
|
||||
", state=" + this.getState().getCurrent().toString() +
|
||||
", outputs=" + this.getOutputs() +
|
||||
", assets=" + this.getAssets() +
|
||||
", attempts=" + this.getAttempts() +
|
||||
")";
|
||||
}
|
||||
|
||||
@@ -5,11 +5,13 @@ import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.plugin.core.flow.WorkingDirectory;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Builder;
|
||||
@@ -78,6 +80,11 @@ abstract public class Task implements TaskInterface {
|
||||
@Valid
|
||||
private Cache taskCache;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
@Valid
|
||||
@Nullable
|
||||
private Property<AssetsDeclaration> assets;
|
||||
|
||||
public Optional<Task> findById(String id) {
|
||||
if (this.getId().equals(id)) {
|
||||
return Optional.of(this);
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.runners.AssetEmitter;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
import org.slf4j.spi.LoggingEventBuilder;
|
||||
@@ -18,6 +21,7 @@ import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
/**
|
||||
* Service for matching and capturing structured data from task execution logs.
|
||||
@@ -76,6 +80,18 @@ public class TaskLogLineMatcher {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (match.assets() != null) {
|
||||
try {
|
||||
AssetEmitter assetEmitter = runContext.assets();
|
||||
match.assets().forEach(throwConsumer(assetEmitter::upsert));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
logger.warn("Unable to get asset emitter for log '{}'", data, e);
|
||||
} catch (QueueException e) {
|
||||
logger.warn("Unable to emit asset for log '{}'", data, e);
|
||||
}
|
||||
}
|
||||
|
||||
return match;
|
||||
}
|
||||
|
||||
@@ -94,8 +110,9 @@ public class TaskLogLineMatcher {
|
||||
public record TaskLogMatch(
|
||||
Map<String, Object> outputs,
|
||||
List<AbstractMetricEntry<?>> metrics,
|
||||
List<LogLine> logs
|
||||
) {
|
||||
List<LogLine> logs,
|
||||
List<Asset> assets
|
||||
) {
|
||||
@Override
|
||||
public Map<String, Object> outputs() {
|
||||
return Optional.ofNullable(outputs).orElse(Map.of());
|
||||
|
||||
@@ -6,8 +6,10 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
@@ -88,6 +90,9 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
)
|
||||
private boolean allowConcurrent = false;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private Property<AssetsDeclaration> assets;
|
||||
|
||||
/**
|
||||
* For backward compatibility: we rename minLogLevel to logLevel.
|
||||
* @deprecated use {@link #logLevel} instead
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.plugins;
|
||||
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -11,6 +12,7 @@ import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.logs.LogExporter;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.serdes.AssetDeserializer;
|
||||
import io.kestra.core.plugins.serdes.PluginDeserializer;
|
||||
import io.kestra.core.secret.SecretPluginInterface;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -45,5 +47,6 @@ public class PluginModule extends SimpleModule {
|
||||
addDeserializer(SecretPluginInterface.class, new PluginDeserializer<>());
|
||||
addDeserializer(AppPluginInterface.class, new PluginDeserializer<>());
|
||||
addDeserializer(LogExporter.class, new PluginDeserializer<>());
|
||||
addDeserializer(Asset.class, new AssetDeserializer());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
|
||||
import io.kestra.core.app.AppBlockInterface;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -108,6 +109,7 @@ public class PluginScanner {
|
||||
List<Class<? extends StorageInterface>> storages = new ArrayList<>();
|
||||
List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>();
|
||||
List<Class<? extends TaskRunner<?>>> taskRunners = new ArrayList<>();
|
||||
List<Class<? extends Asset>> assets = new ArrayList<>();
|
||||
List<Class<? extends AppPluginInterface>> apps = new ArrayList<>();
|
||||
List<Class<? extends AppBlockInterface>> appBlocks = new ArrayList<>();
|
||||
List<Class<? extends Chart<?>>> charts = new ArrayList<>();
|
||||
@@ -155,6 +157,10 @@ public class PluginScanner {
|
||||
//noinspection unchecked
|
||||
taskRunners.add((Class<? extends TaskRunner<?>>) runner.getClass());
|
||||
}
|
||||
case Asset asset -> {
|
||||
log.debug("Loading Asset plugin: '{}'", plugin.getClass());
|
||||
assets.add(asset.getClass());
|
||||
}
|
||||
case AppPluginInterface app -> {
|
||||
log.debug("Loading App plugin: '{}'", plugin.getClass());
|
||||
apps.add(app.getClass());
|
||||
@@ -223,6 +229,7 @@ public class PluginScanner {
|
||||
.conditions(conditions)
|
||||
.storages(storages)
|
||||
.secrets(secrets)
|
||||
.assets(assets)
|
||||
.apps(apps)
|
||||
.appBlocks(appBlocks)
|
||||
.taskRunners(taskRunners)
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
|
||||
import io.kestra.core.app.AppBlockInterface;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -39,6 +40,7 @@ public class RegisteredPlugin {
|
||||
public static final String STORAGES_GROUP_NAME = "storages";
|
||||
public static final String SECRETS_GROUP_NAME = "secrets";
|
||||
public static final String TASK_RUNNERS_GROUP_NAME = "task-runners";
|
||||
public static final String ASSETS_GROUP_NAME = "assets";
|
||||
public static final String APPS_GROUP_NAME = "apps";
|
||||
public static final String APP_BLOCKS_GROUP_NAME = "app-blocks";
|
||||
public static final String CHARTS_GROUP_NAME = "charts";
|
||||
@@ -56,6 +58,7 @@ public class RegisteredPlugin {
|
||||
private final List<Class<? extends StorageInterface>> storages;
|
||||
private final List<Class<? extends SecretPluginInterface>> secrets;
|
||||
private final List<Class<? extends TaskRunner<?>>> taskRunners;
|
||||
private final List<Class<? extends Asset>> assets;
|
||||
private final List<Class<? extends AppPluginInterface>> apps;
|
||||
private final List<Class<? extends AppBlockInterface>> appBlocks;
|
||||
private final List<Class<? extends Chart<?>>> charts;
|
||||
@@ -74,6 +77,7 @@ public class RegisteredPlugin {
|
||||
!storages.isEmpty() ||
|
||||
!secrets.isEmpty() ||
|
||||
!taskRunners.isEmpty() ||
|
||||
!assets.isEmpty() ||
|
||||
!apps.isEmpty() ||
|
||||
!appBlocks.isEmpty() ||
|
||||
!charts.isEmpty() ||
|
||||
@@ -145,6 +149,10 @@ public class RegisteredPlugin {
|
||||
return AppPluginInterface.class;
|
||||
}
|
||||
|
||||
if (this.getAssets().stream().anyMatch(r -> r.getName().equals(cls))) {
|
||||
return Asset.class;
|
||||
}
|
||||
|
||||
if (this.getLogExporters().stream().anyMatch(r -> r.getName().equals(cls))) {
|
||||
return LogExporter.class;
|
||||
}
|
||||
@@ -180,6 +188,7 @@ public class RegisteredPlugin {
|
||||
result.put(STORAGES_GROUP_NAME, Arrays.asList(this.getStorages().toArray(Class[]::new)));
|
||||
result.put(SECRETS_GROUP_NAME, Arrays.asList(this.getSecrets().toArray(Class[]::new)));
|
||||
result.put(TASK_RUNNERS_GROUP_NAME, Arrays.asList(this.getTaskRunners().toArray(Class[]::new)));
|
||||
result.put(ASSETS_GROUP_NAME, Arrays.asList(this.getAssets().toArray(Class[]::new)));
|
||||
result.put(APPS_GROUP_NAME, Arrays.asList(this.getApps().toArray(Class[]::new)));
|
||||
result.put(APP_BLOCKS_GROUP_NAME, Arrays.asList(this.getAppBlocks().toArray(Class[]::new)));
|
||||
result.put(CHARTS_GROUP_NAME, Arrays.asList(this.getCharts().toArray(Class[]::new)));
|
||||
@@ -359,6 +368,12 @@ public class RegisteredPlugin {
|
||||
b.append("] ");
|
||||
}
|
||||
|
||||
if (!this.getAssets().isEmpty()) {
|
||||
b.append("[Assets: ");
|
||||
b.append(this.getAssets().stream().map(Class::getName).collect(Collectors.joining(", ")));
|
||||
b.append("] ");
|
||||
}
|
||||
|
||||
if (!this.getApps().isEmpty()) {
|
||||
b.append("[Apps: ");
|
||||
b.append(this.getApps().stream().map(Class::getName).collect(Collectors.joining(", ")));
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
package io.kestra.core.plugins.serdes;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.CustomAsset;
|
||||
|
||||
/**
|
||||
* Specific {@link JsonDeserializer} for deserializing {@link Asset}.
|
||||
*/
|
||||
public final class AssetDeserializer extends PluginDeserializer<Asset> {
|
||||
@Override
|
||||
protected Class<? extends Plugin> fallbackClass() {
|
||||
return CustomAsset.class;
|
||||
}
|
||||
}
|
||||
@@ -29,7 +29,7 @@ import java.util.Optional;
|
||||
* The {@link PluginDeserializer} uses the {@link PluginRegistry} to found the plugin class corresponding to
|
||||
* a plugin type.
|
||||
*/
|
||||
public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
|
||||
public class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PluginDeserializer.class);
|
||||
|
||||
@@ -93,6 +93,10 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
identifier
|
||||
);
|
||||
pluginType = pluginRegistry.findClassByIdentifier(identifier);
|
||||
|
||||
if (pluginType == null) {
|
||||
pluginType = fallbackClass();
|
||||
}
|
||||
}
|
||||
|
||||
if (pluginType == null) {
|
||||
@@ -153,4 +157,8 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
|
||||
return isVersioningSupported && version != null && !version.isEmpty() ? type + ":" + version : type;
|
||||
}
|
||||
|
||||
protected Class<? extends Plugin> fallbackClass() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
12
core/src/main/java/io/kestra/core/runners/AssetEmitter.java
Normal file
12
core/src/main/java/io/kestra/core/runners/AssetEmitter.java
Normal file
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface AssetEmitter {
|
||||
void upsert(Asset asset) throws QueueException;
|
||||
|
||||
List<Asset> outputs();
|
||||
}
|
||||
@@ -6,11 +6,13 @@ import com.google.common.base.CaseFormat;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.services.AssetManagerFactory;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
import io.kestra.core.services.KVStoreService;
|
||||
import io.kestra.core.storages.Storage;
|
||||
@@ -54,6 +56,7 @@ public class DefaultRunContext extends RunContext {
|
||||
private MetricRegistry meterRegistry;
|
||||
private VersionProvider version;
|
||||
private KVStoreService kvStoreService;
|
||||
private AssetManagerFactory assetManagerFactory;
|
||||
private Optional<String> secretKey;
|
||||
private WorkingDir workingDir;
|
||||
private Validator validator;
|
||||
@@ -73,6 +76,8 @@ public class DefaultRunContext extends RunContext {
|
||||
private Task task;
|
||||
private AbstractTrigger trigger;
|
||||
|
||||
private volatile AssetEmitter assetEmitter;
|
||||
|
||||
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
|
||||
|
||||
|
||||
@@ -161,6 +166,7 @@ public class DefaultRunContext extends RunContext {
|
||||
this.secretKey = applicationContext.getProperty("kestra.encryption.secret-key", String.class);
|
||||
this.validator = applicationContext.getBean(Validator.class);
|
||||
this.localPath = applicationContext.getBean(LocalPathFactory.class).createLocalPath(this);
|
||||
this.assetManagerFactory = applicationContext.getBean(AssetManagerFactory.class);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -537,6 +543,23 @@ public class DefaultRunContext extends RunContext {
|
||||
return flow != null ? flow.get("tenantId") : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public TaskRunInfo taskRunInfo() {
|
||||
Optional<Map<String, Object>> maybeTaskRunMap = Optional.ofNullable(this.getVariables().get("taskrun"))
|
||||
.map(Map.class::cast);
|
||||
return new TaskRunInfo(
|
||||
(String) this.getVariables().get("executionId"),
|
||||
(String) this.getVariables().get("taskId"),
|
||||
maybeTaskRunMap.map(m -> (String) m.get("id"))
|
||||
.orElse(null),
|
||||
maybeTaskRunMap.map(m -> (String) m.get("value"))
|
||||
.orElse(null)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@@ -545,12 +568,7 @@ public class DefaultRunContext extends RunContext {
|
||||
public FlowInfo flowInfo() {
|
||||
Map<String, Object> flow = (Map<String, Object>) this.getVariables().get("flow");
|
||||
// normally only tests should not have the flow variable
|
||||
return flow == null ? new FlowInfo(null, null, null, null) : new FlowInfo(
|
||||
(String) flow.get("tenantId"),
|
||||
(String) flow.get("namespace"),
|
||||
(String) flow.get("id"),
|
||||
(Integer) flow.get("revision")
|
||||
);
|
||||
return flow == null ? new FlowInfo(null, null, null, null) : FlowInfo.from(flow);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -594,6 +612,25 @@ public class DefaultRunContext extends RunContext {
|
||||
return new AclCheckerImpl(this.applicationContext, flowInfo());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AssetEmitter assets() throws IllegalVariableEvaluationException {
|
||||
if (this.assetEmitter == null) {
|
||||
synchronized (this) {
|
||||
if (this.assetEmitter == null) {
|
||||
this.assetEmitter = assetManagerFactory.of(
|
||||
Optional.ofNullable(task).map(Task::getAssets)
|
||||
.or(() -> Optional.ofNullable(trigger).map(AbstractTrigger::getAssets))
|
||||
.flatMap(throwFunction(asset -> this.render(asset).as(AssetsDeclaration.class)))
|
||||
.map(AssetsDeclaration::isEnableAuto)
|
||||
.orElse(false)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return this.assetEmitter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocalPath localPath() {
|
||||
return localPath;
|
||||
|
||||
@@ -143,6 +143,8 @@ public abstract class RunContext implements PropertyContext {
|
||||
@Deprecated(forRemoval = true)
|
||||
public abstract String tenantId();
|
||||
|
||||
public abstract TaskRunInfo taskRunInfo();
|
||||
|
||||
public abstract FlowInfo flowInfo();
|
||||
|
||||
/**
|
||||
@@ -190,7 +192,19 @@ public abstract class RunContext implements PropertyContext {
|
||||
*/
|
||||
public abstract LocalPath localPath();
|
||||
|
||||
public record TaskRunInfo(String executionId, String taskId, String taskRunId, Object value) {
|
||||
|
||||
}
|
||||
|
||||
public record FlowInfo(String tenantId, String namespace, String id, Integer revision) {
|
||||
public static FlowInfo from(Map<String, Object> flowInfoMap) {
|
||||
return new FlowInfo(
|
||||
(String) flowInfoMap.get("tenantId"),
|
||||
(String) flowInfoMap.get("namespace"),
|
||||
(String) flowInfoMap.get("id"),
|
||||
(Integer) flowInfoMap.get("revision")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -206,6 +220,11 @@ public abstract class RunContext implements PropertyContext {
|
||||
*/
|
||||
public abstract AclChecker acl();
|
||||
|
||||
/**
|
||||
* Get access to the Assets handler.
|
||||
*/
|
||||
public abstract AssetEmitter assets() throws IllegalVariableEvaluationException;
|
||||
|
||||
/**
|
||||
* Clone this run context for a specific plugin.
|
||||
* @return a new run context with the plugin configuration of the given plugin.
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
@@ -10,6 +11,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.With;
|
||||
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
@@ -21,8 +23,7 @@ public class WorkerTaskResult implements HasUID {
|
||||
List<TaskRun> dynamicTaskRuns;
|
||||
|
||||
public WorkerTaskResult(TaskRun taskRun) {
|
||||
this.taskRun = taskRun;
|
||||
this.dynamicTaskRuns = new ArrayList<>();
|
||||
this(taskRun, new ArrayList<>());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.runners.AssetEmitter;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class AssetManagerFactory {
|
||||
public AssetEmitter of(boolean enabled) {
|
||||
return new AssetEmitter() {
|
||||
@Override
|
||||
public void upsert(Asset asset) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Asset> outputs() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
32
core/src/main/java/io/kestra/core/services/AssetService.java
Normal file
32
core/src/main/java/io/kestra/core/services/AssetService.java
Normal file
@@ -0,0 +1,32 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.AssetIdentifier;
|
||||
import io.kestra.core.models.assets.AssetUser;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class AssetService implements Runnable {
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
this.run();
|
||||
}
|
||||
|
||||
public void asyncUpsert(AssetUser assetUser, Asset asset) throws QueueException {
|
||||
// No-op
|
||||
}
|
||||
|
||||
public void assetLineage(AssetUser assetUser, List<AssetIdentifier> inputs, List<AssetIdentifier> outputs) throws QueueException {
|
||||
// No-op
|
||||
}
|
||||
|
||||
public void run() {
|
||||
// No-op
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.test.flow;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -8,6 +9,7 @@ import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Getter
|
||||
@@ -25,5 +27,7 @@ public class TaskFixture {
|
||||
|
||||
private Map<String, Object> outputs;
|
||||
|
||||
private List<Asset> assets;
|
||||
|
||||
private Property<String> description;
|
||||
}
|
||||
|
||||
@@ -19,15 +19,7 @@ import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -155,6 +147,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
violations.addAll(assetsViolations(allTasks));
|
||||
|
||||
if (!invalidTasks.isEmpty()) {
|
||||
violations.add("Invalid output reference: use outputs[key-name] instead of outputs.key-name — keys with dashes require bracket notation, offending tasks:" +
|
||||
" [" + String.join(", ", invalidTasks) + "]");
|
||||
@@ -181,6 +175,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
}
|
||||
}
|
||||
|
||||
protected List<String> assetsViolations(List<Task> allTasks) {
|
||||
return allTasks.stream().filter(task -> task.getAssets() != null)
|
||||
.map(taskWithAssets -> "Task '" + taskWithAssets.getId() + "' can't have any `assets` because assets are only available in Enterprise Edition.")
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static boolean checkObjectFieldsWithPatterns(Object object, List<Pattern> patterns) {
|
||||
if (object == null) {
|
||||
return true;
|
||||
|
||||
268
core/src/test/java/io/kestra/assets/assets/AssetTest.java
Normal file
268
core/src/test/java/io/kestra/assets/assets/AssetTest.java
Normal file
@@ -0,0 +1,268 @@
|
||||
package io.kestra.assets.assets;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.assets.*;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@MicronautTest
|
||||
public class AssetTest {
|
||||
@Test
|
||||
void custom() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String type = "MY_OWN_ASSET_TYPE";
|
||||
String displayName = "My own asset";
|
||||
String description = "This is my asset";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "data-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
type,
|
||||
displayName,
|
||||
description,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(CustomAsset.class);
|
||||
assertThat(asset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(asset.getId()).isEqualTo(id);
|
||||
assertThat(asset.getType()).isEqualTo(type);
|
||||
assertThat(asset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(asset.getDescription()).isEqualTo(description);
|
||||
assertThat(asset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void external() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String type = "EXTERNAL";
|
||||
String displayName = "External asset";
|
||||
String description = "This is an external asset";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "external-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
type,
|
||||
displayName,
|
||||
description,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(ExternalAsset.class);
|
||||
assertThat(asset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(asset.getId()).isEqualTo(id);
|
||||
assertThat(asset.getType()).isEqualTo(type);
|
||||
assertThat(asset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(asset.getDescription()).isEqualTo(description);
|
||||
assertThat(asset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void dataset() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String displayName = "My Dataset";
|
||||
String description = "This is my dataset";
|
||||
String system = "S3";
|
||||
String location = "s3://my-bucket/my-dataset";
|
||||
String format = "parquet";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "data-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
system: %s
|
||||
location: %s
|
||||
format: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
DatasetAsset.ASSET_TYPE,
|
||||
displayName,
|
||||
description,
|
||||
system,
|
||||
location,
|
||||
format,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(DatasetAsset.class);
|
||||
DatasetAsset datasetAsset = (DatasetAsset) asset;
|
||||
assertThat(datasetAsset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(datasetAsset.getId()).isEqualTo(id);
|
||||
assertThat(datasetAsset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(datasetAsset.getDescription()).isEqualTo(description);
|
||||
assertThat(datasetAsset.getSystem()).isEqualTo(system);
|
||||
assertThat(datasetAsset.getLocation()).isEqualTo(location);
|
||||
assertThat(datasetAsset.getFormat()).isEqualTo(format);
|
||||
assertThat(datasetAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void file() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String displayName = "My File";
|
||||
String description = "This is my file";
|
||||
String system = "local";
|
||||
String path = "/data/my-file.txt";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "file-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
system: %s
|
||||
path: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
FileAsset.ASSET_TYPE,
|
||||
displayName,
|
||||
description,
|
||||
system,
|
||||
path,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(FileAsset.class);
|
||||
FileAsset fileAsset = (FileAsset) asset;
|
||||
assertThat(fileAsset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(fileAsset.getId()).isEqualTo(id);
|
||||
assertThat(fileAsset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(fileAsset.getDescription()).isEqualTo(description);
|
||||
assertThat(fileAsset.getSystem()).isEqualTo(system);
|
||||
assertThat(fileAsset.getPath()).isEqualTo(path);
|
||||
assertThat(fileAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void table() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String displayName = "My Table";
|
||||
String description = "This is my table";
|
||||
String system = "postgres";
|
||||
String database = "mydb";
|
||||
String schema = "my_schema";
|
||||
String name = "mytable";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "table-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
system: %s
|
||||
database: %s
|
||||
schema: %s
|
||||
name: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
TableAsset.ASSET_TYPE,
|
||||
displayName,
|
||||
description,
|
||||
system,
|
||||
database,
|
||||
schema,
|
||||
name,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(TableAsset.class);
|
||||
TableAsset tableAsset = (TableAsset) asset;
|
||||
assertThat(tableAsset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(tableAsset.getId()).isEqualTo(id);
|
||||
assertThat(tableAsset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(tableAsset.getDescription()).isEqualTo(description);
|
||||
assertThat(tableAsset.getSystem()).isEqualTo(system);
|
||||
assertThat(tableAsset.getDatabase()).isEqualTo(database);
|
||||
assertThat(tableAsset.getSchema()).isEqualTo(schema);
|
||||
assertThat(tableAsset.getName()).isEqualTo(name);
|
||||
assertThat(tableAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void vm() throws JsonProcessingException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String id = TestsUtils.randomString();
|
||||
String displayName = "My VM";
|
||||
String description = "This is my vm";
|
||||
String provider = "aws";
|
||||
String region = "us-east-1";
|
||||
String state = "running";
|
||||
String metadataKey = "owner";
|
||||
String metadataValue = "vm-team";
|
||||
Asset asset = JacksonMapper.ofYaml().readValue("""
|
||||
namespace: %s
|
||||
id: %s
|
||||
type: %s
|
||||
displayName: %s
|
||||
description: %s
|
||||
provider: %s
|
||||
region: %s
|
||||
state: %s
|
||||
metadata:
|
||||
%s: %s""".formatted(
|
||||
namespace,
|
||||
id,
|
||||
VmAsset.ASSET_TYPE,
|
||||
displayName,
|
||||
description,
|
||||
provider,
|
||||
region,
|
||||
state,
|
||||
metadataKey,
|
||||
metadataValue
|
||||
), Asset.class);
|
||||
|
||||
assertThat(asset).isInstanceOf(VmAsset.class);
|
||||
VmAsset vmAsset = (VmAsset) asset;
|
||||
assertThat(vmAsset.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(vmAsset.getId()).isEqualTo(id);
|
||||
assertThat(vmAsset.getDisplayName()).isEqualTo(displayName);
|
||||
assertThat(vmAsset.getDescription()).isEqualTo(description);
|
||||
assertThat(vmAsset.getProvider()).isEqualTo(provider);
|
||||
assertThat(vmAsset.getRegion()).isEqualTo(region);
|
||||
assertThat(vmAsset.getState()).isEqualTo(state);
|
||||
assertThat(vmAsset.getMetadata().get(metadataKey)).isEqualTo(metadataValue);
|
||||
}
|
||||
}
|
||||
@@ -95,4 +95,4 @@ class PluginDeserializerTest {
|
||||
|
||||
public record TestPlugin(String type) implements Plugin {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,6 +81,9 @@ public abstract class AbstractRunnerTest {
|
||||
@Inject
|
||||
private AfterExecutionTestCase afterExecutionTestCase;
|
||||
|
||||
@Inject
|
||||
private AssetTestCase assetTestCase;
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/full.yaml")
|
||||
void full(Execution execution) {
|
||||
@@ -558,4 +561,10 @@ public abstract class AbstractRunnerTest {
|
||||
public void shouldCallTasksAfterListener(Execution execution) {
|
||||
afterExecutionTestCase.shouldCallTasksAfterListener(execution);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = "flows/valids/assets.yaml", tenantId = "abstract-runner-test-assets")
|
||||
public void assets() throws QueueException, TimeoutException {
|
||||
assetTestCase.staticAndDynamicAssets("abstract-runner-test-assets");
|
||||
}
|
||||
}
|
||||
|
||||
220
core/src/test/java/io/kestra/core/runners/AssetTestCase.java
Normal file
220
core/src/test/java/io/kestra/core/runners/AssetTestCase.java
Normal file
@@ -0,0 +1,220 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.assets.*;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.services.AssetManagerFactory;
|
||||
import io.kestra.core.services.AssetService;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.context.annotation.Replaces;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Singleton
|
||||
public class AssetTestCase {
|
||||
@Inject
|
||||
private AssetService mockedAssetService;
|
||||
@Inject
|
||||
private TestRunnerUtils testRunnerUtils;
|
||||
|
||||
private static final List<Asset> capturedAsyncCreate = new CopyOnWriteArrayList<>();
|
||||
private static final List<Pair<AssetUser, Pair<List<AssetIdentifier>, List<AssetIdentifier>>>> capturedAssetLineage = new CopyOnWriteArrayList<>();
|
||||
private static final List<Asset> capturedEnabledDynamicAssets = new CopyOnWriteArrayList<>();
|
||||
private static final List<Asset> capturedDisabledDynamicAssets = new CopyOnWriteArrayList<>();
|
||||
|
||||
public void staticAndDynamicAssets(String tenantId) throws QueueException, TimeoutException {
|
||||
Execution execution = testRunnerUtils.runOne(tenantId, "io.kestra.tests", "assets");
|
||||
|
||||
Mockito.verify(mockedAssetService, Mockito.times(1)).run();
|
||||
|
||||
// region assets-in-taskruns
|
||||
List<TaskRun> taskRuns = execution.getTaskRunList().stream().toList();
|
||||
assertThat(taskRuns).map(TaskRun::getAssets).map(AssetsInOut::getInputs).satisfiesExactlyInAnyOrder(
|
||||
assets -> assertThat(assets).isEmpty(),
|
||||
assets -> assertThat(assets).isEmpty(),
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-non-existing-input-asset-uid")
|
||||
),
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-existing-input-uid")
|
||||
)
|
||||
);
|
||||
assertThat(taskRuns).map(TaskRun::getAssets).map(AssetsInOut::getOutputs).satisfiesExactlyInAnyOrder(
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
AssetTestCase::assertEnabledDynamicAsset,
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-emit-asset")
|
||||
),
|
||||
assets -> assertThat(assets).isEmpty(),
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-non-existing-output-uid")
|
||||
),
|
||||
assets -> assertThat(assets).satisfiesExactlyInAnyOrder(
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-existing-output-uid")
|
||||
)
|
||||
);
|
||||
// endregion
|
||||
|
||||
// region dynamic-assets
|
||||
assertThat(capturedEnabledDynamicAssets).anySatisfy(AssetTestCase::assertEnabledDynamicAsset);
|
||||
assertThat(capturedEnabledDynamicAssets).noneMatch(asset -> asset.getId().equals("assets-flow-emit-asset-auto-false-uid"));
|
||||
|
||||
assertThat(capturedDisabledDynamicAssets).anySatisfy(AssetTestCase::assertDisabledDynamicAsset);
|
||||
assertThat(capturedDisabledDynamicAssets).noneMatch(asset -> asset.getId().equals("assets-flow-emit-asset-uid"));
|
||||
// endregion
|
||||
|
||||
// region asset-creation
|
||||
assertThat(capturedAsyncCreate).satisfiesExactlyInAnyOrder(
|
||||
AssetTestCase::assertEnabledDynamicAsset,
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-emit-asset"),
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-non-existing-output-uid"),
|
||||
asset -> assertThat(asset.getId()).isEqualTo("assets-flow-static-asset-existing-output-uid")
|
||||
);
|
||||
assertThat(capturedAsyncCreate).noneMatch(asset -> asset.getId().equals("assets-flow-emit-asset-auto-false-uid"));
|
||||
// endregion
|
||||
|
||||
// region asset-lineage
|
||||
assertThat(capturedAssetLineage).satisfiesExactlyInAnyOrder(
|
||||
assetLineage -> {
|
||||
AssetUser assetUser = assetLineage.getLeft();
|
||||
assertAssetExecution(tenantId, assetUser, execution);
|
||||
assertThat(assetUser.taskRunId()).isEqualTo(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getTaskId().equals("emit-asset"))
|
||||
.findFirst().map(TaskRun::getId).orElseThrow());
|
||||
// No input assets
|
||||
assertThat(assetLineage.getRight().getLeft()).isEmpty();
|
||||
assertThat(assetLineage.getRight().getRight()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-emit-asset-uid"),
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-emit-asset")
|
||||
);
|
||||
},
|
||||
// No lineage for the second taskrun due to `enableAuto: false`, below is for the third one
|
||||
assetLineage -> {
|
||||
AssetUser assetUser = assetLineage.getLeft();
|
||||
assertAssetExecution(tenantId, assetUser, execution);
|
||||
assertThat(assetUser.taskRunId()).isEqualTo(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getTaskId().equals("static-asset-non-existing-input"))
|
||||
.findFirst().map(TaskRun::getId).orElseThrow());
|
||||
assertThat(assetLineage.getRight().getLeft()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-non-existing-input-asset-uid")
|
||||
);
|
||||
assertThat(assetLineage.getRight().getRight()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-non-existing-output-uid")
|
||||
);
|
||||
},
|
||||
assetLineage -> {
|
||||
AssetUser assetUser = assetLineage.getLeft();
|
||||
assertAssetExecution(tenantId, assetUser, execution);
|
||||
assertThat(assetUser.taskRunId()).isEqualTo(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getTaskId().equals("static-asset-existing-input"))
|
||||
.findFirst().map(TaskRun::getId).orElseThrow());
|
||||
assertThat(assetLineage.getRight().getLeft()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-existing-input-uid")
|
||||
);
|
||||
assertThat(assetLineage.getRight().getRight()).satisfiesExactlyInAnyOrder(
|
||||
assetId -> assertThat(assetId.id()).isEqualTo("assets-flow-static-asset-existing-output-uid")
|
||||
);
|
||||
}
|
||||
);
|
||||
// endregion
|
||||
}
|
||||
|
||||
private static void assertAssetExecution(String tenantId, AssetUser assetUser, Execution execution) {
|
||||
assertThat(assetUser.tenantId()).isEqualTo(tenantId);
|
||||
assertThat(assetUser.namespace()).isEqualTo("io.kestra.tests");
|
||||
assertThat(assetUser.flowId()).isEqualTo("assets");
|
||||
assertThat(assetUser.flowRevision()).isEqualTo(execution.getFlowRevision());
|
||||
assertThat(assetUser.executionId()).isEqualTo(execution.getId());
|
||||
}
|
||||
|
||||
private static void assertEnabledDynamicAsset(Asset asset) {
|
||||
assertThat(asset).isInstanceOf(TableAsset.class);
|
||||
TableAsset tableAsset = (TableAsset) asset;
|
||||
assertThat(tableAsset.getId()).isEqualTo("assets-flow-emit-asset-uid");
|
||||
assertThat(tableAsset.getType()).isEqualTo(TableAsset.ASSET_TYPE);
|
||||
assertThat(tableAsset.getDisplayName()).isEqualTo("My Table Asset");
|
||||
assertThat(tableAsset.getDescription()).isEqualTo("This is my table asset");
|
||||
assertThat(tableAsset.getSystem()).isEqualTo("MY_DB_SYSTEM");
|
||||
assertThat(tableAsset.getDatabase()).isEqualTo("my_database");
|
||||
assertThat(tableAsset.getSchema()).isEqualTo("my_schema");
|
||||
assertThat(tableAsset.getName()).isEqualTo("my_table");
|
||||
assertThat(tableAsset.getMetadata().get("owner")).isEqualTo("data-team");
|
||||
}
|
||||
|
||||
private static void assertDisabledDynamicAsset(Asset asset) {
|
||||
assertThat(asset).isInstanceOf(TableAsset.class);
|
||||
TableAsset tableAsset = (TableAsset) asset;
|
||||
assertThat(tableAsset.getId()).isEqualTo("assets-flow-emit-asset-auto-false-uid");
|
||||
assertThat(tableAsset.getType()).isEqualTo(TableAsset.ASSET_TYPE);
|
||||
assertThat(tableAsset.getDisplayName()).isEqualTo("My Table Asset");
|
||||
assertThat(tableAsset.getDescription()).isEqualTo("This is my table asset");
|
||||
assertThat(tableAsset.getSystem()).isEqualTo("MY_DB_SYSTEM");
|
||||
assertThat(tableAsset.getDatabase()).isEqualTo("my_database");
|
||||
assertThat(tableAsset.getSchema()).isEqualTo("my_schema");
|
||||
assertThat(tableAsset.getName()).isEqualTo("my_table");
|
||||
assertThat(tableAsset.getMetadata().get("owner")).isEqualTo("data-team");
|
||||
}
|
||||
|
||||
@Factory
|
||||
static class MockFactory {
|
||||
@Singleton
|
||||
@Replaces(AssetService.class)
|
||||
public AssetService mockedAssetService() {
|
||||
return Mockito.spy(new AssetService() {
|
||||
@Override
|
||||
public void asyncUpsert(AssetUser assetUser, Asset asset) {
|
||||
capturedAsyncCreate.add(asset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assetLineage(AssetUser assetUser, List<AssetIdentifier> inputs, List<AssetIdentifier> outputs) {
|
||||
capturedAssetLineage.add(Pair.of(assetUser, Pair.of(inputs, outputs)));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Singleton
|
||||
@Replaces(AssetManagerFactory.class)
|
||||
public AssetManagerFactory mockedAssetManagerFactory() {
|
||||
return Mockito.spy(new AssetManagerFactory() {
|
||||
@Override
|
||||
public AssetEmitter of(boolean enabled) {
|
||||
if (!enabled) {
|
||||
return new AssetEmitter() {
|
||||
@Override
|
||||
public void upsert(Asset asset) {
|
||||
capturedDisabledDynamicAssets.add(asset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Asset> outputs() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return new AssetEmitter() {
|
||||
private final List<Asset> localCapturedAssets = new CopyOnWriteArrayList<>();
|
||||
|
||||
@Override
|
||||
public void upsert(Asset asset) {
|
||||
localCapturedAssets.add(asset);
|
||||
capturedEnabledDynamicAssets.add(asset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Asset> outputs() {
|
||||
return localCapturedAssets;
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package io.kestra.core.runners.test;
|
||||
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.TableAsset;
|
||||
import io.kestra.core.models.tasks.*;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
public class AssetEmitter extends Task implements RunnableTask<VoidOutput> {
|
||||
@NotNull
|
||||
@PluginProperty
|
||||
private Asset assetToEmit;
|
||||
|
||||
|
||||
@Override
|
||||
public VoidOutput run(RunContext runContext) throws Exception {
|
||||
runContext.assets().upsert(assetToEmit);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
package io.kestra.core.validations;
|
||||
|
||||
import io.kestra.core.models.assets.AssetIdentifier;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
@@ -7,7 +9,9 @@ import io.kestra.core.serializers.YamlParser;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import io.kestra.core.models.validations.ValidateConstraintViolation;
|
||||
import io.kestra.core.services.FlowService;
|
||||
@@ -229,6 +233,31 @@ class FlowValidationTest {
|
||||
assertThat(validate.get().getMessage()).contains("Duplicate preconditions with id [flows]");
|
||||
}
|
||||
|
||||
@Test
|
||||
void eeAllowsDefiningAssets() {
|
||||
Flow flow = Flow.builder()
|
||||
.id(TestsUtils.randomString())
|
||||
.namespace(TestsUtils.randomNamespace())
|
||||
.tasks(List.of(
|
||||
Log.builder()
|
||||
.id("log")
|
||||
.type(Log.class.getName())
|
||||
.message("any")
|
||||
.assets(io.kestra.core.models.property.Property.ofValue(
|
||||
new AssetsDeclaration(true, List.of(new AssetIdentifier(null, null, "anyId")), null))
|
||||
)
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
|
||||
Optional<ConstraintViolationException> violations = modelValidator.isValid(flow);
|
||||
|
||||
assertThat(violations.isPresent()).isEqualTo(true);
|
||||
assertThat(violations.get().getConstraintViolations().stream().map(ConstraintViolation::getMessage)).satisfiesExactly(
|
||||
message -> assertThat(message).contains("Task 'log' can't have any `assets` because assets are only available in Enterprise Edition.")
|
||||
);
|
||||
};
|
||||
|
||||
private Flow parse(String path) {
|
||||
URL resource = TestsUtils.class.getClassLoader().getResource(path);
|
||||
assert resource != null;
|
||||
@@ -237,4 +266,4 @@ class FlowValidationTest {
|
||||
|
||||
return YamlParser.parse(file, Flow.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
83
core/src/test/resources/flows/valids/assets.yaml
Normal file
83
core/src/test/resources/flows/valids/assets.yaml
Normal file
@@ -0,0 +1,83 @@
|
||||
id: assets
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: emit-asset
|
||||
type: io.kestra.core.runners.test.AssetEmitter
|
||||
assetToEmit:
|
||||
namespace: io.kestra.tests
|
||||
id: assets-flow-emit-asset-uid
|
||||
type: TABLE
|
||||
displayName: My Table Asset
|
||||
description: This is my table asset
|
||||
system: MY_DB_SYSTEM
|
||||
database: my_database
|
||||
schema: my_schema
|
||||
name: my_table
|
||||
metadata:
|
||||
owner: data-team
|
||||
assets:
|
||||
outputs:
|
||||
- id: assets-flow-static-emit-asset
|
||||
type: TABLE
|
||||
displayName: My Static Table Asset
|
||||
description: This is my static table asset
|
||||
system: MY_DB_SYSTEM
|
||||
database: my_database
|
||||
schema: my_schema
|
||||
name: my_static_table
|
||||
metadata:
|
||||
owner: data-team
|
||||
- id: emit-asset-auto-false
|
||||
type: io.kestra.core.runners.test.AssetEmitter
|
||||
assets:
|
||||
enableAuto: false
|
||||
assetToEmit:
|
||||
namespace: io.kestra.tests
|
||||
id: assets-flow-emit-asset-auto-false-uid
|
||||
type: TABLE
|
||||
displayName: My Table Asset
|
||||
description: This is my table asset
|
||||
system: MY_DB_SYSTEM
|
||||
database: my_database
|
||||
schema: my_schema
|
||||
name: my_table
|
||||
metadata:
|
||||
owner: data-team
|
||||
# Expected to create an 'EXTERNAL' asset automatically as it doesn't exist
|
||||
- id: static-asset-non-existing-input
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "whatever"
|
||||
assets:
|
||||
inputs:
|
||||
- id: assets-flow-static-asset-non-existing-input-asset-uid
|
||||
outputs:
|
||||
- namespace: io.kestra.tests
|
||||
id: assets-flow-static-asset-non-existing-output-uid
|
||||
type: TABLE
|
||||
displayName: My Static Table Asset
|
||||
description: This is my static table asset
|
||||
system: MY_DB_SYSTEM
|
||||
database: my_database
|
||||
schema: my_schema
|
||||
name: my_static_table
|
||||
metadata:
|
||||
owner: data-team
|
||||
- id: static-asset-existing-input
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "whatever"
|
||||
assets:
|
||||
inputs:
|
||||
- id: assets-flow-static-asset-existing-input-uid
|
||||
outputs:
|
||||
- namespace: io.kestra.tests
|
||||
id: assets-flow-static-asset-existing-output-uid
|
||||
type: TABLE
|
||||
displayName: My Static Table Asset
|
||||
description: This is my static table asset
|
||||
system: MY_DB_SYSTEM
|
||||
database: my_database
|
||||
schema: my_schema
|
||||
name: my_static_table
|
||||
metadata:
|
||||
owner: data-team
|
||||
@@ -4,6 +4,10 @@ import io.kestra.core.debug.Breakpoint;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.assets.AssetIdentifier;
|
||||
import io.kestra.core.models.assets.AssetUser;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.assets.AssetsInOut;
|
||||
import io.kestra.core.models.executions.*;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
@@ -96,6 +100,12 @@ public class ExecutorService {
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
private QueueInterface<LogEntry> logQueue;
|
||||
|
||||
@Inject
|
||||
private AssetService assetService;
|
||||
|
||||
@Inject
|
||||
private RunContextInitializer runContextInitializer;
|
||||
|
||||
protected FlowMetaStoreInterface flowExecutorInterface() {
|
||||
// bean is injected late, so we need to wait
|
||||
if (this.flowExecutorInterface == null) {
|
||||
@@ -896,21 +906,35 @@ public class ExecutorService {
|
||||
boolean hasMockedWorkerTask = false;
|
||||
record FixtureAndTaskRun(TaskFixture fixture, TaskRun taskRun) {}
|
||||
if (executor.getExecution().getFixtures() != null) {
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
RunContext runContext = runContextInitializer.forExecutor((DefaultRunContext) runContextFactory.of(
|
||||
executor.getFlow(),
|
||||
executor.getExecution()
|
||||
));
|
||||
List<WorkerTaskResult> workerTaskResults = executor.getExecution()
|
||||
.getTaskRunList()
|
||||
.stream()
|
||||
.filter(taskRun -> taskRun.getState().getCurrent().isCreated())
|
||||
.flatMap(taskRun -> executor.getExecution().getFixtureForTaskRun(taskRun).stream().map(fixture -> new FixtureAndTaskRun(fixture, taskRun)))
|
||||
.map(throwFunction(fixtureAndTaskRun -> WorkerTaskResult.builder()
|
||||
.taskRun(fixtureAndTaskRun.taskRun()
|
||||
.withState(Optional.ofNullable(fixtureAndTaskRun.fixture().getState()).orElse(State.Type.SUCCESS))
|
||||
.withOutputs(
|
||||
variablesService.of(StorageContext.forTask(fixtureAndTaskRun.taskRun),
|
||||
fixtureAndTaskRun.fixture().getOutputs() == null ? null : runContext.render(fixtureAndTaskRun.fixture().getOutputs()))
|
||||
)
|
||||
)
|
||||
.build()
|
||||
.map(throwFunction(fixtureAndTaskRun -> {
|
||||
Optional<AssetsDeclaration> renderedAssetsDeclaration = runContext.render(executor.getFlow().findTaskByTaskId(fixtureAndTaskRun.taskRun.getTaskId()).getAssets()).as(AssetsDeclaration.class);
|
||||
return WorkerTaskResult.builder()
|
||||
.taskRun(fixtureAndTaskRun.taskRun()
|
||||
.withState(Optional.ofNullable(fixtureAndTaskRun.fixture().getState()).orElse(State.Type.SUCCESS))
|
||||
.withOutputs(
|
||||
variablesService.of(StorageContext.forTask(fixtureAndTaskRun.taskRun),
|
||||
fixtureAndTaskRun.fixture().getOutputs() == null ? null : runContext.render(fixtureAndTaskRun.fixture().getOutputs()))
|
||||
)
|
||||
.withAssets(new AssetsInOut(
|
||||
renderedAssetsDeclaration.map(AssetsDeclaration::getInputs).orElse(Collections.emptyList()).stream()
|
||||
.map(assetIdentifier -> assetIdentifier.withTenantId(executor.getFlow().getTenantId()))
|
||||
.toList(),
|
||||
fixtureAndTaskRun.fixture().getAssets() == null ? null : fixtureAndTaskRun.fixture().getAssets().stream()
|
||||
.map(asset -> asset.withTenantId(executor.getFlow().getTenantId()))
|
||||
.toList()
|
||||
))
|
||||
)
|
||||
.build();
|
||||
}
|
||||
))
|
||||
.toList();
|
||||
|
||||
@@ -1172,6 +1196,47 @@ public class ExecutorService {
|
||||
metricRegistry.tags(workerTaskResult)
|
||||
)
|
||||
.record(taskRun.getState().getDurationOrComputeIt());
|
||||
|
||||
if (
|
||||
!taskRun.getState().isFailed()
|
||||
&& taskRun.getAssets() != null &&
|
||||
(!taskRun.getAssets().getInputs().isEmpty() || !taskRun.getAssets().getOutputs().isEmpty())
|
||||
) {
|
||||
AssetUser assetUser = new AssetUser(
|
||||
taskRun.getTenantId(),
|
||||
taskRun.getNamespace(),
|
||||
taskRun.getFlowId(),
|
||||
newExecution.getFlowRevision(),
|
||||
taskRun.getExecutionId(),
|
||||
taskRun.getTaskId(),
|
||||
taskRun.getId()
|
||||
);
|
||||
|
||||
List<AssetIdentifier> outputIdentifiers = taskRun.getAssets().getOutputs().stream()
|
||||
.map(asset -> asset.withTenantId(taskRun.getTenantId()))
|
||||
.map(AssetIdentifier::of)
|
||||
.toList();
|
||||
List<AssetIdentifier> inputAssets = taskRun.getAssets().getInputs().stream()
|
||||
.map(assetIdentifier -> assetIdentifier.withTenantId(taskRun.getTenantId()))
|
||||
.toList();
|
||||
try {
|
||||
assetService.assetLineage(
|
||||
assetUser,
|
||||
inputAssets,
|
||||
outputIdentifiers
|
||||
);
|
||||
} catch (QueueException e) {
|
||||
log.warn("Unable to submit asset lineage event for {} -> {}", inputAssets, outputIdentifiers, e);
|
||||
}
|
||||
|
||||
taskRun.getAssets().getOutputs().forEach(asset -> {
|
||||
try {
|
||||
assetService.asyncUpsert(assetUser, asset);
|
||||
} catch (QueueException e) {
|
||||
log.warn("Unable to submit asset upsert event for asset {}", asset.getId(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.MultipleConditionEvent',
|
||||
'io.kestra.ee.assets.AssetLineageEvent',
|
||||
'io.kestra.ee.assets.AssetUpsertCommand',
|
||||
'io.kestra.ee.assets.AssetStateEvent'
|
||||
) NOT NULL
|
||||
@@ -0,0 +1,22 @@
|
||||
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.MultipleConditionEvent',
|
||||
'io.kestra.ee.assets.AssetLineageEvent',
|
||||
'io.kestra.ee.assets.AssetUpsertCommand',
|
||||
'io.kestra.ee.assets.AssetStateEvent'
|
||||
) NOT NULL;
|
||||
@@ -0,0 +1,3 @@
|
||||
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.ee.assets.AssetLineageEvent';
|
||||
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.ee.assets.AssetUpsertCommand';
|
||||
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.ee.assets.AssetStateEvent';
|
||||
@@ -328,6 +328,10 @@ public abstract class AbstractJdbcRepository {
|
||||
return applyTriggerStateCondition(value, operation);
|
||||
}
|
||||
|
||||
if (field.equals(QueryFilter.Field.METADATA)) {
|
||||
return findMetadataCondition((Map<?, ?>) value, operation);
|
||||
}
|
||||
|
||||
// Convert the field name to lowercase and quote it
|
||||
Name columnName = getColumnName(field);
|
||||
|
||||
@@ -380,6 +384,10 @@ public abstract class AbstractJdbcRepository {
|
||||
throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
|
||||
}
|
||||
|
||||
protected Condition findMetadataCondition(Map<?, ?> metadata, QueryFilter.Op operation) {
|
||||
throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
|
||||
}
|
||||
|
||||
// Generate the condition for Field.STATE
|
||||
@SuppressWarnings("unchecked")
|
||||
private Condition generateStateCondition(Object value, QueryFilter.Op operation) {
|
||||
|
||||
@@ -74,7 +74,7 @@ abstract public class TestsUtils {
|
||||
* @param prefix
|
||||
* @return
|
||||
*/
|
||||
private static String randomString(String... prefix) {
|
||||
public static String randomString(String... prefix) {
|
||||
if (prefix.length == 0) {
|
||||
prefix = new String[]{String.join("-", stackTraceToParts())};
|
||||
}
|
||||
|
||||
@@ -9,6 +9,9 @@ import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.assets.AssetsInOut;
|
||||
import io.kestra.core.models.executions.*;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
@@ -954,7 +957,15 @@ public class DefaultWorker implements Worker {
|
||||
|
||||
try {
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), workerTaskCallable.getTaskOutput());
|
||||
taskRun = taskRun.withOutputs(variables);
|
||||
if (workerTask.getTask().getAssets() != null) {
|
||||
List<Asset> outputAssets = runContext.assets().outputs();
|
||||
Optional<AssetsDeclaration> renderedAssetsDeclaration = runContext.render(workerTask.getTask().getAssets()).as(AssetsDeclaration.class);
|
||||
renderedAssetsDeclaration.map(AssetsDeclaration::getOutputs).ifPresent(outputAssets::addAll);
|
||||
taskRun = taskRun.withOutputs(variables).withAssets(new AssetsInOut(
|
||||
renderedAssetsDeclaration.map(AssetsDeclaration::getInputs).orElse(null),
|
||||
outputAssets
|
||||
));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("Unable to save output on taskRun '{}'", taskRun, e);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user