mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-30 03:00:23 -05:00
Compare commits
1 Commits
dashboard-
...
feat/Asset
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8df161df9b |
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.AssetExporter;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
@@ -774,6 +775,14 @@ public class JsonSchemaGenerator {
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.map(typeContext::resolve)
|
||||
.toList();
|
||||
} else if (declaredType.getErasedType() == AssetExporter.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getAssetExporters().stream())
|
||||
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.map(typeContext::resolve)
|
||||
.toList();
|
||||
}
|
||||
|
||||
return null;
|
||||
|
||||
@@ -9,9 +9,7 @@ import java.util.List;
|
||||
public class InvalidQueryFiltersException extends KestraRuntimeException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
private static final String INVALID_QUERY_FILTER_MESSAGE = "Provided query filters are invalid";
|
||||
|
||||
private transient final List<String> invalids;
|
||||
private static final String INVALID_QUERY_FILTER_MESSAGE = "Provided query filters are invalid: %s";
|
||||
|
||||
/**
|
||||
* Creates a new {@link InvalidQueryFiltersException} instance.
|
||||
@@ -19,8 +17,7 @@ public class InvalidQueryFiltersException extends KestraRuntimeException {
|
||||
* @param invalids the invalid filters.
|
||||
*/
|
||||
public InvalidQueryFiltersException(final List<String> invalids) {
|
||||
super(INVALID_QUERY_FILTER_MESSAGE);
|
||||
this.invalids = invalids;
|
||||
super(INVALID_QUERY_FILTER_MESSAGE.formatted( String.join(", ", invalids)));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -29,15 +26,6 @@ public class InvalidQueryFiltersException extends KestraRuntimeException {
|
||||
* @param invalid the invalid filter.
|
||||
*/
|
||||
public InvalidQueryFiltersException(final String invalid) {
|
||||
super(INVALID_QUERY_FILTER_MESSAGE);
|
||||
this.invalids = List.of(invalid);
|
||||
}
|
||||
|
||||
|
||||
public String formatedInvalidObjects(){
|
||||
if (invalids == null || invalids.isEmpty()){
|
||||
return INVALID_QUERY_FILTER_MESSAGE;
|
||||
}
|
||||
return String.join(", ", invalids);
|
||||
super(INVALID_QUERY_FILTER_MESSAGE.formatted(invalid));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -388,6 +388,21 @@ public record QueryFilter(
|
||||
Field.CREATED
|
||||
);
|
||||
}
|
||||
},
|
||||
ASSET_LINEAGE_EVENT {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.ASSET_ID,
|
||||
Field.NAMESPACE,
|
||||
Field.FLOW_ID,
|
||||
Field.FLOW_REVISION,
|
||||
Field.EXECUTION_ID,
|
||||
Field.TASK_ID,
|
||||
Field.TASK_RUN_ID,
|
||||
Field.CREATED
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
public abstract List<Field> supportedField();
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
|
||||
|
||||
@Plugin
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public abstract class AssetExporter<T extends Output> implements io.kestra.core.models.Plugin {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
@Pattern(regexp="^[a-zA-Z0-9][a-zA-Z0-9_-]*")
|
||||
protected String id;
|
||||
|
||||
@NotBlank
|
||||
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
|
||||
protected String type;
|
||||
|
||||
public abstract T sendAssets(RunContext runContext, Flux<AssetLineage> records) throws Exception;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
|
||||
public record AssetLineage(String tenantId, String namespace, String flowId, Integer flowRevision, String executionId,
|
||||
String taskId, String taskRunId, List<Asset> inputs, List<Asset> outputs, Instant timestamp) {
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.AssetExporter;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -48,5 +49,6 @@ public class PluginModule extends SimpleModule {
|
||||
addDeserializer(AppPluginInterface.class, new PluginDeserializer<>());
|
||||
addDeserializer(LogExporter.class, new PluginDeserializer<>());
|
||||
addDeserializer(Asset.class, new AssetDeserializer());
|
||||
addDeserializer(AssetExporter.class, new PluginDeserializer<>());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.app.AppBlockInterface;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.AssetExporter;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -110,6 +111,7 @@ public class PluginScanner {
|
||||
List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>();
|
||||
List<Class<? extends TaskRunner<?>>> taskRunners = new ArrayList<>();
|
||||
List<Class<? extends Asset>> assets = new ArrayList<>();
|
||||
List<Class<? extends AssetExporter<?>>> assetExporters = new ArrayList<>();
|
||||
List<Class<? extends AppPluginInterface>> apps = new ArrayList<>();
|
||||
List<Class<? extends AppBlockInterface>> appBlocks = new ArrayList<>();
|
||||
List<Class<? extends Chart<?>>> charts = new ArrayList<>();
|
||||
@@ -161,6 +163,11 @@ public class PluginScanner {
|
||||
log.debug("Loading Asset plugin: '{}'", plugin.getClass());
|
||||
assets.add(asset.getClass());
|
||||
}
|
||||
case AssetExporter<?> assetExporter -> {
|
||||
log.debug("Loading AssetExporter plugin: '{}'", plugin.getClass());
|
||||
//noinspection unchecked
|
||||
assetExporters.add((Class<? extends AssetExporter<?>>) assetExporter.getClass());
|
||||
}
|
||||
case AppPluginInterface app -> {
|
||||
log.debug("Loading App plugin: '{}'", plugin.getClass());
|
||||
apps.add(app.getClass());
|
||||
@@ -230,6 +237,7 @@ public class PluginScanner {
|
||||
.storages(storages)
|
||||
.secrets(secrets)
|
||||
.assets(assets)
|
||||
.assetExporters(assetExporters)
|
||||
.apps(apps)
|
||||
.appBlocks(appBlocks)
|
||||
.taskRunners(taskRunners)
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.app.AppBlockInterface;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.AssetExporter;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -41,6 +42,7 @@ public class RegisteredPlugin {
|
||||
public static final String SECRETS_GROUP_NAME = "secrets";
|
||||
public static final String TASK_RUNNERS_GROUP_NAME = "task-runners";
|
||||
public static final String ASSETS_GROUP_NAME = "assets";
|
||||
public static final String ASSETS_EXPORTERS_GROUP_NAME = "asset-exporters";
|
||||
public static final String APPS_GROUP_NAME = "apps";
|
||||
public static final String APP_BLOCKS_GROUP_NAME = "app-blocks";
|
||||
public static final String CHARTS_GROUP_NAME = "charts";
|
||||
@@ -59,6 +61,7 @@ public class RegisteredPlugin {
|
||||
private final List<Class<? extends SecretPluginInterface>> secrets;
|
||||
private final List<Class<? extends TaskRunner<?>>> taskRunners;
|
||||
private final List<Class<? extends Asset>> assets;
|
||||
private final List<Class<? extends AssetExporter<?>>> assetExporters;
|
||||
private final List<Class<? extends AppPluginInterface>> apps;
|
||||
private final List<Class<? extends AppBlockInterface>> appBlocks;
|
||||
private final List<Class<? extends Chart<?>>> charts;
|
||||
@@ -78,6 +81,7 @@ public class RegisteredPlugin {
|
||||
!secrets.isEmpty() ||
|
||||
!taskRunners.isEmpty() ||
|
||||
!assets.isEmpty() ||
|
||||
!assetExporters.isEmpty() ||
|
||||
!apps.isEmpty() ||
|
||||
!appBlocks.isEmpty() ||
|
||||
!charts.isEmpty() ||
|
||||
@@ -153,6 +157,10 @@ public class RegisteredPlugin {
|
||||
return Asset.class;
|
||||
}
|
||||
|
||||
if (this.getAssetExporters().stream().anyMatch(r -> r.getName().equals(cls))) {
|
||||
return AssetExporter.class;
|
||||
}
|
||||
|
||||
if (this.getLogExporters().stream().anyMatch(r -> r.getName().equals(cls))) {
|
||||
return LogExporter.class;
|
||||
}
|
||||
@@ -189,6 +197,7 @@ public class RegisteredPlugin {
|
||||
result.put(SECRETS_GROUP_NAME, Arrays.asList(this.getSecrets().toArray(Class[]::new)));
|
||||
result.put(TASK_RUNNERS_GROUP_NAME, Arrays.asList(this.getTaskRunners().toArray(Class[]::new)));
|
||||
result.put(ASSETS_GROUP_NAME, Arrays.asList(this.getAssets().toArray(Class[]::new)));
|
||||
result.put(ASSETS_EXPORTERS_GROUP_NAME, Arrays.asList(this.getAssetExporters().toArray(Class[]::new)));
|
||||
result.put(APPS_GROUP_NAME, Arrays.asList(this.getApps().toArray(Class[]::new)));
|
||||
result.put(APP_BLOCKS_GROUP_NAME, Arrays.asList(this.getAppBlocks().toArray(Class[]::new)));
|
||||
result.put(CHARTS_GROUP_NAME, Arrays.asList(this.getCharts().toArray(Class[]::new)));
|
||||
@@ -317,7 +326,7 @@ public class RegisteredPlugin {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
public long crc32() {
|
||||
return Optional.ofNullable(externalPlugin).map(ExternalPlugin::getCrc32).orElse(-1L);
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ public class QueryFilterTest {
|
||||
InvalidQueryFiltersException e = assertThrows(
|
||||
InvalidQueryFiltersException.class,
|
||||
() -> QueryFilter.validateQueryFilters(List.of(filter), resource));
|
||||
assertThat(e.formatedInvalidObjects()).contains("Operation");
|
||||
assertThat(e.getMessage()).contains("Operation");
|
||||
}
|
||||
|
||||
static Stream<Arguments> validOperationFilters() {
|
||||
|
||||
@@ -300,7 +300,7 @@ public abstract class AbstractJdbcRepository {
|
||||
}
|
||||
|
||||
// Special handling for START_DATE and END_DATE
|
||||
if (field == QueryFilter.Field.START_DATE || field == QueryFilter.Field.END_DATE || field == QueryFilter.Field.UPDATED) {
|
||||
if (field == QueryFilter.Field.START_DATE || field == QueryFilter.Field.END_DATE || field == QueryFilter.Field.UPDATED || field == QueryFilter.Field.CREATED) {
|
||||
if(dateColumn == null){
|
||||
throw new InvalidQueryFiltersException("When creating filtering on START_DATE and/or END_DATE, dateColumn is required but was null");
|
||||
}
|
||||
|
||||
@@ -161,7 +161,7 @@ public class ErrorController {
|
||||
|
||||
@Error(global = true)
|
||||
public HttpResponse<JsonError> error(HttpRequest<?> request, InvalidQueryFiltersException e) {
|
||||
return jsonError(request, e, HttpStatus.BAD_REQUEST, e.formatedInvalidObjects());
|
||||
return jsonError(request, e, HttpStatus.BAD_REQUEST, e.getMessage());
|
||||
}
|
||||
|
||||
@Error(global = true)
|
||||
|
||||
Reference in New Issue
Block a user