Compare commits

...

1 Commits

Author SHA1 Message Date
Loïc Mathieu
8df161df9b feat(assets): add an AssetShipper task 2025-12-29 18:03:01 +01:00
11 changed files with 90 additions and 19 deletions

View File

@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.AssetExporter;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
@@ -774,6 +775,14 @@ public class JsonSchemaGenerator {
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.map(typeContext::resolve)
.toList();
} else if (declaredType.getErasedType() == AssetExporter.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getAssetExporters().stream())
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.map(typeContext::resolve)
.toList();
}
return null;

View File

@@ -9,9 +9,7 @@ import java.util.List;
public class InvalidQueryFiltersException extends KestraRuntimeException {
@Serial
private static final long serialVersionUID = 1L;
private static final String INVALID_QUERY_FILTER_MESSAGE = "Provided query filters are invalid";
private transient final List<String> invalids;
private static final String INVALID_QUERY_FILTER_MESSAGE = "Provided query filters are invalid: %s";
/**
* Creates a new {@link InvalidQueryFiltersException} instance.
@@ -19,8 +17,7 @@ public class InvalidQueryFiltersException extends KestraRuntimeException {
* @param invalids the invalid filters.
*/
public InvalidQueryFiltersException(final List<String> invalids) {
super(INVALID_QUERY_FILTER_MESSAGE);
this.invalids = invalids;
super(INVALID_QUERY_FILTER_MESSAGE.formatted( String.join(", ", invalids)));
}
/**
@@ -29,15 +26,6 @@ public class InvalidQueryFiltersException extends KestraRuntimeException {
* @param invalid the invalid filter.
*/
public InvalidQueryFiltersException(final String invalid) {
super(INVALID_QUERY_FILTER_MESSAGE);
this.invalids = List.of(invalid);
}
public String formatedInvalidObjects(){
if (invalids == null || invalids.isEmpty()){
return INVALID_QUERY_FILTER_MESSAGE;
}
return String.join(", ", invalids);
super(INVALID_QUERY_FILTER_MESSAGE.formatted(invalid));
}
}

View File

@@ -388,6 +388,21 @@ public record QueryFilter(
Field.CREATED
);
}
},
ASSET_LINEAGE_EVENT {
@Override
public List<Field> supportedField() {
return List.of(
Field.ASSET_ID,
Field.NAMESPACE,
Field.FLOW_ID,
Field.FLOW_REVISION,
Field.EXECUTION_ID,
Field.TASK_ID,
Field.TASK_RUN_ID,
Field.CREATED
);
}
};
public abstract List<Field> supportedField();

View File

@@ -0,0 +1,32 @@
package io.kestra.core.models.assets;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.runners.RunContext;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@Plugin
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
public abstract class AssetExporter<T extends Output> implements io.kestra.core.models.Plugin {
@NotNull
@NotBlank
@Pattern(regexp="^[a-zA-Z0-9][a-zA-Z0-9_-]*")
protected String id;
@NotBlank
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
protected String type;
public abstract T sendAssets(RunContext runContext, Flux<AssetLineage> records) throws Exception;
}

View File

@@ -0,0 +1,8 @@
package io.kestra.core.models.assets;
import java.time.Instant;
import java.util.List;
public record AssetLineage(String tenantId, String namespace, String flowId, Integer flowRevision, String executionId,
String taskId, String taskRunId, List<Asset> inputs, List<Asset> outputs, Instant timestamp) {
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.app.AppPluginInterface;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.AssetExporter;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -48,5 +49,6 @@ public class PluginModule extends SimpleModule {
addDeserializer(AppPluginInterface.class, new PluginDeserializer<>());
addDeserializer(LogExporter.class, new PluginDeserializer<>());
addDeserializer(Asset.class, new AssetDeserializer());
addDeserializer(AssetExporter.class, new PluginDeserializer<>());
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.app.AppBlockInterface;
import io.kestra.core.app.AppPluginInterface;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.AssetExporter;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -110,6 +111,7 @@ public class PluginScanner {
List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>();
List<Class<? extends TaskRunner<?>>> taskRunners = new ArrayList<>();
List<Class<? extends Asset>> assets = new ArrayList<>();
List<Class<? extends AssetExporter<?>>> assetExporters = new ArrayList<>();
List<Class<? extends AppPluginInterface>> apps = new ArrayList<>();
List<Class<? extends AppBlockInterface>> appBlocks = new ArrayList<>();
List<Class<? extends Chart<?>>> charts = new ArrayList<>();
@@ -161,6 +163,11 @@ public class PluginScanner {
log.debug("Loading Asset plugin: '{}'", plugin.getClass());
assets.add(asset.getClass());
}
case AssetExporter<?> assetExporter -> {
log.debug("Loading AssetExporter plugin: '{}'", plugin.getClass());
//noinspection unchecked
assetExporters.add((Class<? extends AssetExporter<?>>) assetExporter.getClass());
}
case AppPluginInterface app -> {
log.debug("Loading App plugin: '{}'", plugin.getClass());
apps.add(app.getClass());
@@ -230,6 +237,7 @@ public class PluginScanner {
.storages(storages)
.secrets(secrets)
.assets(assets)
.assetExporters(assetExporters)
.apps(apps)
.appBlocks(appBlocks)
.taskRunners(taskRunners)

View File

@@ -4,6 +4,7 @@ import io.kestra.core.app.AppBlockInterface;
import io.kestra.core.app.AppPluginInterface;
import io.kestra.core.models.annotations.PluginSubGroup;
import io.kestra.core.models.assets.Asset;
import io.kestra.core.models.assets.AssetExporter;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
@@ -41,6 +42,7 @@ public class RegisteredPlugin {
public static final String SECRETS_GROUP_NAME = "secrets";
public static final String TASK_RUNNERS_GROUP_NAME = "task-runners";
public static final String ASSETS_GROUP_NAME = "assets";
public static final String ASSETS_EXPORTERS_GROUP_NAME = "asset-exporters";
public static final String APPS_GROUP_NAME = "apps";
public static final String APP_BLOCKS_GROUP_NAME = "app-blocks";
public static final String CHARTS_GROUP_NAME = "charts";
@@ -59,6 +61,7 @@ public class RegisteredPlugin {
private final List<Class<? extends SecretPluginInterface>> secrets;
private final List<Class<? extends TaskRunner<?>>> taskRunners;
private final List<Class<? extends Asset>> assets;
private final List<Class<? extends AssetExporter<?>>> assetExporters;
private final List<Class<? extends AppPluginInterface>> apps;
private final List<Class<? extends AppBlockInterface>> appBlocks;
private final List<Class<? extends Chart<?>>> charts;
@@ -78,6 +81,7 @@ public class RegisteredPlugin {
!secrets.isEmpty() ||
!taskRunners.isEmpty() ||
!assets.isEmpty() ||
!assetExporters.isEmpty() ||
!apps.isEmpty() ||
!appBlocks.isEmpty() ||
!charts.isEmpty() ||
@@ -153,6 +157,10 @@ public class RegisteredPlugin {
return Asset.class;
}
if (this.getAssetExporters().stream().anyMatch(r -> r.getName().equals(cls))) {
return AssetExporter.class;
}
if (this.getLogExporters().stream().anyMatch(r -> r.getName().equals(cls))) {
return LogExporter.class;
}
@@ -189,6 +197,7 @@ public class RegisteredPlugin {
result.put(SECRETS_GROUP_NAME, Arrays.asList(this.getSecrets().toArray(Class[]::new)));
result.put(TASK_RUNNERS_GROUP_NAME, Arrays.asList(this.getTaskRunners().toArray(Class[]::new)));
result.put(ASSETS_GROUP_NAME, Arrays.asList(this.getAssets().toArray(Class[]::new)));
result.put(ASSETS_EXPORTERS_GROUP_NAME, Arrays.asList(this.getAssetExporters().toArray(Class[]::new)));
result.put(APPS_GROUP_NAME, Arrays.asList(this.getApps().toArray(Class[]::new)));
result.put(APP_BLOCKS_GROUP_NAME, Arrays.asList(this.getAppBlocks().toArray(Class[]::new)));
result.put(CHARTS_GROUP_NAME, Arrays.asList(this.getCharts().toArray(Class[]::new)));
@@ -317,7 +326,7 @@ public class RegisteredPlugin {
}
return null;
}
public long crc32() {
return Optional.ofNullable(externalPlugin).map(ExternalPlugin::getCrc32).orElse(-1L);
}

View File

@@ -28,7 +28,7 @@ public class QueryFilterTest {
InvalidQueryFiltersException e = assertThrows(
InvalidQueryFiltersException.class,
() -> QueryFilter.validateQueryFilters(List.of(filter), resource));
assertThat(e.formatedInvalidObjects()).contains("Operation");
assertThat(e.getMessage()).contains("Operation");
}
static Stream<Arguments> validOperationFilters() {

View File

@@ -300,7 +300,7 @@ public abstract class AbstractJdbcRepository {
}
// Special handling for START_DATE and END_DATE
if (field == QueryFilter.Field.START_DATE || field == QueryFilter.Field.END_DATE || field == QueryFilter.Field.UPDATED) {
if (field == QueryFilter.Field.START_DATE || field == QueryFilter.Field.END_DATE || field == QueryFilter.Field.UPDATED || field == QueryFilter.Field.CREATED) {
if(dateColumn == null){
throw new InvalidQueryFiltersException("When creating filtering on START_DATE and/or END_DATE, dateColumn is required but was null");
}

View File

@@ -161,7 +161,7 @@ public class ErrorController {
@Error(global = true)
public HttpResponse<JsonError> error(HttpRequest<?> request, InvalidQueryFiltersException e) {
return jsonError(request, e, HttpStatus.BAD_REQUEST, e.formatedInvalidObjects());
return jsonError(request, e, HttpStatus.BAD_REQUEST, e.getMessage());
}
@Error(global = true)