mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 20:00:14 -05:00
Compare commits
1 Commits
debug-flak
...
feat/linea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
210aeddc51 |
24
core/src/main/java/io/kestra/core/lineage/DataSet.java
Normal file
24
core/src/main/java/io/kestra/core/lineage/DataSet.java
Normal file
@@ -0,0 +1,24 @@
|
||||
package io.kestra.core.lineage;
|
||||
|
||||
import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@Getter
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@NoArgsConstructor
|
||||
public class DataSet implements HasUID, TenantInterface, DeletedInterface {
|
||||
private String tenantId;
|
||||
private String namespace;
|
||||
private String name;
|
||||
private boolean deleted;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(tenantId, namespace, name);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package io.kestra.core.lineage.events;
|
||||
|
||||
import io.kestra.core.lineage.DataSet;
|
||||
|
||||
public record DataSetEvent (DataSet dataSet, LineageEventType eventType) implements LineageEvent {
|
||||
@Override
|
||||
public String getTenantId() {
|
||||
return dataSet.getTenantId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return dataSet.uid();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return this.getClass().getName();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package io.kestra.core.lineage.events;
|
||||
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
|
||||
public record JobEvent (FlowInterface flow, LineageEventType eventType) implements LineageEvent {
|
||||
@Override
|
||||
public String getTenantId() {
|
||||
return flow.getTenantId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return flow.uid();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return this.getClass().getName();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package io.kestra.core.lineage.events;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
|
||||
public sealed interface LineageEvent extends TenantInterface, HasUID permits DataSetEvent, JobEvent, RunEvent {
|
||||
String getType();
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package io.kestra.core.lineage.events;
|
||||
|
||||
public enum LineageEventType {
|
||||
CREATE,
|
||||
UPDATE,
|
||||
DELETE,
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package io.kestra.core.lineage.events;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
|
||||
public record RunEvent(Execution execution, LineageEventType eventType) implements LineageEvent {
|
||||
@Override
|
||||
public String getTenantId() {
|
||||
return execution.getTenantId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return execution.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return this.getClass().getName();
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.flows.lineage.Lineage;
|
||||
import io.kestra.core.models.flows.sla.SLA;
|
||||
import io.kestra.core.models.listeners.Listener;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
@@ -125,6 +126,9 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
@PluginProperty
|
||||
List<SLA> sla;
|
||||
|
||||
// TODO add validation
|
||||
Lineage lineage;
|
||||
|
||||
public Stream<String> allTypes() {
|
||||
return Stream.of(
|
||||
Optional.ofNullable(triggers).orElse(Collections.emptyList()).stream().map(AbstractTrigger::getType),
|
||||
|
||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.HasSource;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.lineage.Lineage;
|
||||
import io.kestra.core.models.flows.sla.SLA;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
@@ -55,6 +56,10 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
|
||||
return List.of();
|
||||
}
|
||||
|
||||
default Lineage getLineage() {
|
||||
return null;
|
||||
}
|
||||
|
||||
String getSource();
|
||||
|
||||
@Override
|
||||
|
||||
@@ -43,6 +43,7 @@ public class FlowWithSource extends Flow {
|
||||
.concurrency(this.concurrency)
|
||||
.retry(this.retry)
|
||||
.sla(this.sla)
|
||||
.lineage(this.lineage)
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -85,6 +86,7 @@ public class FlowWithSource extends Flow {
|
||||
.concurrency(flow.concurrency)
|
||||
.retry(flow.retry)
|
||||
.sla(flow.sla)
|
||||
.lineage(flow.lineage)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.flows.lineage.Lineage;
|
||||
import io.kestra.core.models.flows.sla.SLA;
|
||||
import io.kestra.core.models.tasks.GenericTask;
|
||||
import io.kestra.core.models.triggers.GenericTrigger;
|
||||
@@ -45,6 +46,8 @@ public class GenericFlow extends AbstractFlow implements HasUID {
|
||||
|
||||
private Concurrency concurrency;
|
||||
|
||||
Lineage lineage;
|
||||
|
||||
private List<GenericTask> tasks;
|
||||
|
||||
private List<GenericTrigger> triggers;
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.kestra.core.models.flows.lineage;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class DataSet {
|
||||
private String name;
|
||||
private String namespace;
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package io.kestra.core.models.flows.lineage;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class Lineage {
|
||||
private String namespace;
|
||||
private List<DataSet> inputs;
|
||||
private List<DataSet> outputs;
|
||||
}
|
||||
@@ -6,6 +6,9 @@ import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.lineage.events.LineageEvent;
|
||||
import io.kestra.core.lineage.events.LineageEventType;
|
||||
import io.kestra.core.lineage.events.RunEvent;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.*;
|
||||
@@ -117,7 +120,6 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Slf4j
|
||||
@Validated
|
||||
@Controller("/api/v1/{tenant}/executions")
|
||||
public class ExecutionController {
|
||||
@Nullable
|
||||
@@ -178,6 +180,9 @@ public class ExecutionController {
|
||||
@Inject
|
||||
private ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
||||
|
||||
@Inject
|
||||
private ApplicationEventPublisher<LineageEvent> lineageEventPublisher;
|
||||
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
|
||||
@@ -195,8 +200,6 @@ public class ExecutionController {
|
||||
|
||||
@Inject
|
||||
private Optional<OpenTelemetry> openTelemetry;
|
||||
@Inject
|
||||
private ExecutionStreamingService executionStreamingService;
|
||||
|
||||
@Inject
|
||||
private LocalPathFactory localPathFactory;
|
||||
@@ -600,6 +603,9 @@ public class ExecutionController {
|
||||
|
||||
executionQueue.emit(result);
|
||||
eventPublisher.publishEvent(new CrudEvent<>(result, CrudEventType.CREATE));
|
||||
if (flow.getLineage() != null) {
|
||||
lineageEventPublisher.publishEvent(new RunEvent(result, LineageEventType.CREATE));
|
||||
}
|
||||
|
||||
if (webhook.get().getWait()) {
|
||||
var subscriberId = UUID.randomUUID().toString();
|
||||
@@ -704,6 +710,9 @@ public class ExecutionController {
|
||||
|
||||
executionQueue.emit(executionWithInputs);
|
||||
eventPublisher.publishEvent(new CrudEvent<>(executionWithInputs, CrudEventType.CREATE));
|
||||
if (flow.getLineage() != null) {
|
||||
lineageEventPublisher.publishEvent(new RunEvent(executionWithInputs, LineageEventType.CREATE));
|
||||
}
|
||||
|
||||
if (!wait) {
|
||||
return Mono.just(ExecutionResponse.fromExecution(
|
||||
|
||||
Reference in New Issue
Block a user