refactor: add GenericFlow to support un-typed flow deserialization

Add new FlowId, FlowInterface and GenericFlow classes to support
deserialization of flow with un-typed plugins (i.e., tasks, triggers)
in order to inject defaults prior to strongly-typed deserialization.
This commit is contained in:
Florian Hussonnois
2025-03-14 14:33:21 +01:00
committed by Florian Hussonnois
parent fc8732f96e
commit 8f29a72df7
124 changed files with 2420 additions and 1609 deletions

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.h2;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -16,7 +17,7 @@ import java.util.Map;
@H2RepositoryEnabled
public class H2FlowRepository extends AbstractJdbcFlowRepository {
@Inject
public H2FlowRepository(@Named("flows") H2Repository<Flow> repository,
public H2FlowRepository(@Named("flows") H2Repository<FlowInterface> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.h2;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.jdbc.AbstractJdbcRepository;
import org.jooq.*;
import org.jooq.impl.DSL;
@@ -14,7 +15,7 @@ import static io.kestra.core.models.QueryFilter.Op.EQUALS;
import static io.kestra.jdbc.repository.AbstractJdbcRepository.field;
public abstract class H2FlowRepositoryService {
public static Condition findCondition(AbstractJdbcRepository<Flow> jdbcRepository, String query, Map<String, String> labels) {
public static Condition findCondition(AbstractJdbcRepository<? extends FlowInterface> jdbcRepository, String query, Map<String, String> labels) {
List<Condition> conditions = new ArrayList<>();
if (query != null) {
@@ -35,7 +36,7 @@ public abstract class H2FlowRepositoryService {
return conditions.isEmpty() ? DSL.trueCondition() : DSL.and(conditions);
}
public static Condition findSourceCodeCondition(AbstractJdbcRepository<Flow> jdbcRepository, String query) {
public static Condition findSourceCodeCondition(AbstractJdbcRepository<? extends FlowInterface> jdbcRepository, String query) {
return jdbcRepository.fullTextCondition(List.of("source_code"), query);
}

View File

@@ -28,7 +28,7 @@ public class H2Queue<T> extends JdbcQueue<T> {
AbstractJdbcRepository.field("offset")
)
.from(this.table)
.where(AbstractJdbcRepository.field("type").eq(this.cls.getName()))
.where(AbstractJdbcRepository.field("type").eq(queueType()))
.and(DSL.or(List.of(
AbstractJdbcRepository.field("consumers").isNull(),
DSL.condition("NOT(ARRAY_CONTAINS(\"consumers\", ?))", queueType)

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
@@ -87,8 +88,8 @@ public class H2QueueFactory implements QueueFactoryInterface {
@Singleton
@Named(QueueFactoryInterface.FLOW_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<FlowWithSource> flow() {
return new H2Queue<>(FlowWithSource.class, applicationContext);
public QueueInterface<FlowInterface> flow() {
return new H2Queue<>(FlowInterface.class, applicationContext);
}
@Override

View File

@@ -0,0 +1,40 @@
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.models.flows.FlowWithSource',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface'
) NOT NULL;
UPDATE queues set "type" = 'io.kestra.core.models.flows.FlowInterface' WHERE "type" = 'io.kestra.core.models.flows.FlowWithSource';
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface'
) NOT NULL;

View File

@@ -1,12 +1,12 @@
package io.kestra.runner.h2;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.FlowListenersTest;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import jakarta.inject.Inject;
@@ -26,12 +26,15 @@ class H2FlowListenersTest extends FlowListenersTest {
@Inject
@Named(QueueFactoryInterface.FLOW_NAMED)
QueueInterface<FlowWithSource> flowQueue;
QueueInterface<FlowInterface> flowQueue;
@Inject
PluginDefaultService pluginDefaultService;
@Test
public void all() {
// we don't inject FlowListeners to remove a flaky test
this.suite(new FlowListeners(flowRepository, flowQueue));
this.suite(new FlowListeners(flowRepository, flowQueue, pluginDefaultService));
}
@BeforeEach