Compare commits

...

1 Commits

Author SHA1 Message Date
Loïc Mathieu
210aeddc51 feat(lineage): PoC 2025-09-22 17:47:42 +02:00
13 changed files with 156 additions and 3 deletions

View File

@@ -0,0 +1,24 @@
package io.kestra.core.lineage;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.utils.IdUtils;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Getter
@SuperBuilder(toBuilder = true)
@NoArgsConstructor
public class DataSet implements HasUID, TenantInterface, DeletedInterface {
private String tenantId;
private String namespace;
private String name;
private boolean deleted;
@Override
public String uid() {
return IdUtils.fromParts(tenantId, namespace, name);
}
}

View File

@@ -0,0 +1,20 @@
package io.kestra.core.lineage.events;
import io.kestra.core.lineage.DataSet;
public record DataSetEvent (DataSet dataSet, LineageEventType eventType) implements LineageEvent {
@Override
public String getTenantId() {
return dataSet.getTenantId();
}
@Override
public String uid() {
return dataSet.uid();
}
@Override
public String getType() {
return this.getClass().getName();
}
}

View File

@@ -0,0 +1,20 @@
package io.kestra.core.lineage.events;
import io.kestra.core.models.flows.FlowInterface;
public record JobEvent (FlowInterface flow, LineageEventType eventType) implements LineageEvent {
@Override
public String getTenantId() {
return flow.getTenantId();
}
@Override
public String uid() {
return flow.uid();
}
@Override
public String getType() {
return this.getClass().getName();
}
}

View File

@@ -0,0 +1,10 @@
package io.kestra.core.lineage.events;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
public sealed interface LineageEvent extends TenantInterface, HasUID permits DataSetEvent, JobEvent, RunEvent {
String getType();
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.lineage.events;
public enum LineageEventType {
CREATE,
UPDATE,
DELETE,
}

View File

@@ -0,0 +1,20 @@
package io.kestra.core.lineage.events;
import io.kestra.core.models.executions.Execution;
public record RunEvent(Execution execution, LineageEventType eventType) implements LineageEvent {
@Override
public String getTenantId() {
return execution.getTenantId();
}
@Override
public String uid() {
return execution.getId();
}
@Override
public String getType() {
return this.getClass().getName();
}
}

View File

@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.lineage.Lineage;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
@@ -125,6 +126,9 @@ public class Flow extends AbstractFlow implements HasUID {
@PluginProperty
List<SLA> sla;
// TODO add validation
Lineage lineage;
public Stream<String> allTypes() {
return Stream.of(
Optional.ofNullable(triggers).orElse(Collections.emptyList()).stream().map(AbstractTrigger::getType),

View File

@@ -10,6 +10,7 @@ import io.kestra.core.models.HasSource;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.Label;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.lineage.Lineage;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.serializers.JacksonMapper;
@@ -55,6 +56,10 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
return List.of();
}
default Lineage getLineage() {
return null;
}
String getSource();
@Override

View File

@@ -43,6 +43,7 @@ public class FlowWithSource extends Flow {
.concurrency(this.concurrency)
.retry(this.retry)
.sla(this.sla)
.lineage(this.lineage)
.build();
}
@@ -85,6 +86,7 @@ public class FlowWithSource extends Flow {
.concurrency(flow.concurrency)
.retry(flow.retry)
.sla(flow.sla)
.lineage(flow.lineage)
.build();
}
}

View File

@@ -9,6 +9,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.Label;
import io.kestra.core.models.flows.lineage.Lineage;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.tasks.GenericTask;
import io.kestra.core.models.triggers.GenericTrigger;
@@ -45,6 +46,8 @@ public class GenericFlow extends AbstractFlow implements HasUID {
private Concurrency concurrency;
Lineage lineage;
private List<GenericTask> tasks;
private List<GenericTrigger> triggers;

View File

@@ -0,0 +1,13 @@
package io.kestra.core.models.flows.lineage;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@Getter
@NoArgsConstructor
public class DataSet {
private String name;
private String namespace;
}

View File

@@ -0,0 +1,16 @@
package io.kestra.core.models.flows.lineage;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder
@Getter
@NoArgsConstructor
public class Lineage {
private String namespace;
private List<DataSet> inputs;
private List<DataSet> outputs;
}

View File

@@ -6,6 +6,9 @@ import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.lineage.events.LineageEvent;
import io.kestra.core.lineage.events.LineageEventType;
import io.kestra.core.lineage.events.RunEvent;
import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.*;
@@ -117,7 +120,6 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Slf4j
@Validated
@Controller("/api/v1/{tenant}/executions")
public class ExecutionController {
@Nullable
@@ -178,6 +180,9 @@ public class ExecutionController {
@Inject
private ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
@Inject
private ApplicationEventPublisher<LineageEvent> lineageEventPublisher;
@Inject
private RunContextFactory runContextFactory;
@@ -195,8 +200,6 @@ public class ExecutionController {
@Inject
private Optional<OpenTelemetry> openTelemetry;
@Inject
private ExecutionStreamingService executionStreamingService;
@Inject
private LocalPathFactory localPathFactory;
@@ -600,6 +603,9 @@ public class ExecutionController {
executionQueue.emit(result);
eventPublisher.publishEvent(new CrudEvent<>(result, CrudEventType.CREATE));
if (flow.getLineage() != null) {
lineageEventPublisher.publishEvent(new RunEvent(result, LineageEventType.CREATE));
}
if (webhook.get().getWait()) {
var subscriberId = UUID.randomUUID().toString();
@@ -704,6 +710,9 @@ public class ExecutionController {
executionQueue.emit(executionWithInputs);
eventPublisher.publishEvent(new CrudEvent<>(executionWithInputs, CrudEventType.CREATE));
if (flow.getLineage() != null) {
lineageEventPublisher.publishEvent(new RunEvent(executionWithInputs, LineageEventType.CREATE));
}
if (!wait) {
return Mono.just(ExecutionResponse.fromExecution(