diff --git a/core/src/main/java/io/kestra/core/repositories/ExecutionRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/ExecutionRepositoryInterface.java index 6bda016f18..f60b34dc9d 100644 --- a/core/src/main/java/io/kestra/core/repositories/ExecutionRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/ExecutionRepositoryInterface.java @@ -2,7 +2,6 @@ package io.kestra.core.repositories; import io.kestra.core.models.QueryFilter; import io.kestra.core.models.executions.Execution; -import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.executions.statistics.DailyExecutionStatistics; import io.kestra.core.models.executions.statistics.ExecutionCount; import io.kestra.core.models.executions.statistics.Flow; @@ -94,6 +93,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface findAllAsync(@Nullable String tenantId); + Flux findAsync(String tenantId, List filters); + Execution delete(Execution execution); Integer purge(Execution execution); diff --git a/core/src/main/java/io/kestra/core/repositories/FlowRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/FlowRepositoryInterface.java index 692e4c881b..be3d53f008 100644 --- a/core/src/main/java/io/kestra/core/repositories/FlowRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/FlowRepositoryInterface.java @@ -8,6 +8,7 @@ import io.kestra.plugin.core.dashboard.data.Flows; import io.micronaut.data.model.Pageable; import jakarta.annotation.Nullable; import jakarta.validation.ConstraintViolationException; +import reactor.core.publisher.Flux; import java.util.List; import java.util.Optional; @@ -158,6 +159,8 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface findAsync(String tenantId, List filters); + FlowWithSource create(GenericFlow flow); FlowWithSource update(GenericFlow flow, FlowInterface previous) throws ConstraintViolationException; diff --git a/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java index 169fe47a0e..3c3fbb757a 100644 --- a/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java @@ -43,9 +43,9 @@ public interface TriggerRepositoryInterface extends QueryBuilderInterface find(String tenantId, List filters); + Flux findAsync(String tenantId, List filters); + default Function sortMapping() throws IllegalArgumentException { return Function.identity(); diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java index 62b375bf8b..ed8edc57e8 100644 --- a/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java +++ b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java @@ -840,4 +840,53 @@ inject(tenant); assertThat(byFlow.get(FLOW).getId()).isEqualTo(running.getId()); } + @Test + void findAsync() { + var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName()); + + Execution execA = Execution.builder() + .id(IdUtils.create()) + .tenantId(tenant) + .namespace(NAMESPACE) + .flowId("flowA") + .flowRevision(1) + .state(new State()) + .taskRunList(List.of()) + .build(); + + Execution execB = Execution.builder() + .id(IdUtils.create()) + .tenantId(tenant) + .namespace(NAMESPACE) + .flowId("flowB") + .flowRevision(1) + .state(new State()) + .taskRunList(List.of()) + .build(); + + Execution savedA = executionRepository.save(execA); + Execution savedB = executionRepository.save(execB); + + try { + List all = executionRepository.findAllAsync(tenant).collectList().block(); + assertThat(all).isNotNull(); + assertThat(all.stream().map(Execution::getId).toList()) + .containsExactlyInAnyOrder(savedA.getId(), savedB.getId()); + + // filtered using repository find (pageable) since findAllAsync has no filters + List filters = List.of(QueryFilter.builder() + .field(QueryFilter.Field.FLOW_ID) + .operation(QueryFilter.Op.EQUALS) + .value("flowA") + .build()); + + ArrayListTotal filtered = executionRepository.find(Pageable.UNPAGED, tenant, filters); + assertThat(filtered.getTotal()).isEqualTo(1L); + assertThat(filtered.getFirst().getFlowId()).isEqualTo("flowA"); + } finally { + executionRepository.delete(savedA); + executionRepository.delete(savedB); + } + } + } diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java index 169bddcdc3..b9eca61329 100644 --- a/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java +++ b/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java @@ -29,7 +29,6 @@ import io.micronaut.data.model.Pageable; import jakarta.inject.Inject; import jakarta.inject.Singleton; import jakarta.validation.ConstraintViolationException; -import java.util.concurrent.CopyOnWriteArrayList; import lombok.*; import lombok.experimental.SuperBuilder; import org.junit.jupiter.api.BeforeAll; @@ -41,15 +40,14 @@ import org.slf4j.event.Level; import java.time.Duration; import java.time.ZonedDateTime; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import static io.kestra.core.models.flows.FlowScope.SYSTEM; import static io.kestra.core.utils.NamespaceUtils.SYSTEM_FLOWS_DEFAULT_NAMESPACE; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; @KestraTest public abstract class AbstractFlowRepositoryTest { @@ -683,6 +681,47 @@ public abstract class AbstractFlowRepositoryTest { } } + @Test + void findAsync() { + String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName()); + + FlowWithSource flowA = builder(tenant, "flowA", "taskA").build(); + FlowWithSource flowB = builder(tenant, "flowB", "taskB").build(); + + FlowWithSource savedA = flowRepository.create(GenericFlow.of(flowA)); + FlowWithSource savedB = flowRepository.create(GenericFlow.of(flowB)); + + try { + List all = flowRepository.findAsync(tenant, null) + .collectList() + .block(Duration.ofSeconds(5)); + + assertThat(all).isNotNull(); + assertThat(all.stream().map(Flow::getId).toList()) + .containsExactlyInAnyOrder(savedA.getId(), savedB.getId()); + + // with a query filter targeting flowA -> only flowA + QueryFilter filter = QueryFilter.builder() + .field(Field.QUERY) + .value(savedA.getId()) + .operation(Op.EQUALS) + .build(); + + List filtered = flowRepository.findAsync(tenant, List.of(filter)) + .collectList() + .block(Duration.ofSeconds(5)); + + assertThat(filtered).isNotNull(); + assertThat(filtered).hasSize(1); + assertThat(filtered.getFirst().getId()).isEqualTo(savedA.getId()); + } finally { + deleteFlow(savedA); + deleteFlow(savedB); + } + } + + + private static Flow createTestFlowForNamespace(String tenantId, String namespace) { return Flow.builder() .id(IdUtils.create()) diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractTriggerRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractTriggerRepositoryTest.java index 807f078293..a8ec0a8e62 100644 --- a/core/src/test/java/io/kestra/core/repositories/AbstractTriggerRepositoryTest.java +++ b/core/src/test/java/io/kestra/core/repositories/AbstractTriggerRepositoryTest.java @@ -74,7 +74,7 @@ public abstract class AbstractTriggerRepositoryTest { String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName()); triggerRepository.save(generateDefaultTrigger(tenant)); - List entries = triggerRepository.find(tenant, List.of(filter)).collectList().block(); + List entries = triggerRepository.findAsync(tenant, List.of(filter)).collectList().block(); assertThat(entries).hasSize(1); } @@ -196,4 +196,35 @@ public abstract class AbstractTriggerRepositoryTest { // Then assertThat(count).isEqualTo(1); } + + @Test + void findAsync() { + String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName()); + + Trigger.TriggerBuilder builderA = trigger(tenant).flowId("flowA").triggerId("tA"); + Trigger.TriggerBuilder builderB = trigger(tenant).flowId("flowB").triggerId("tB"); + + Trigger savedA = triggerRepository.save(builderA.build()); + Trigger savedB = triggerRepository.save(builderB.build()); + + try { + List all = triggerRepository.findAsync(tenant, null).collectList().block(); + assertThat(all).isNotNull(); + assertThat(all.stream().map(Trigger::getTriggerId).toList()) + .containsExactlyInAnyOrder(savedA.getTriggerId(), savedB.getTriggerId()); + + List filters = List.of(QueryFilter.builder() + .field(QueryFilter.Field.FLOW_ID) + .operation(QueryFilter.Op.EQUALS) + .value("flowA") + .build()); + + List filtered = triggerRepository.findAsync(tenant, filters).collectList().block(); + assertThat(filtered).hasSize(1); + assertThat(filtered.get(0).getFlowId()).isEqualTo("flowA"); + } finally { + triggerRepository.delete(savedA); + triggerRepository.delete(savedB); + } + } } diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java index 82a8159f2f..b7b965d81b 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java @@ -287,6 +287,15 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe return findPage(pageable, tenantId, condition); } + @Override + public Flux findAsync(String tenantId, List filters) { + if (filters == null || filters.isEmpty()) { + return findAllAsync(tenantId); + } + Condition condition = this.filter(filters, null, Resource.EXECUTION); + return findAsync(defaultFilter(tenantId), condition); + } + @Override public List dailyStatisticsForAllTenants( @Nullable String query, diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowRepository.java index 93d7d1c7de..1f3364f5e2 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowRepository.java @@ -46,11 +46,14 @@ import lombok.extern.slf4j.Slf4j; import org.jooq.*; import org.jooq.Record; import org.jooq.impl.DSL; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import java.io.IOException; import java.time.ZonedDateTime; import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import static io.kestra.core.utils.Rethrow.throwConsumer; @@ -802,6 +805,48 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository ); } + @Override + public Flux findAsync(String tenantId, List filters) { + return this.findAsync(tenantId, filters, Resource.FLOW); + } + + protected Flux findAsync(String tenantId, @Nullable List filters, QueryFilter.Resource resource) { + if (filters == null || filters.isEmpty()) { + return findAsync(defaultFilter(tenantId), null); + } + Condition condition = this.filter(filters, null, resource); + return findAsync(defaultFilter(tenantId), condition); + } + + protected Flux findAsync(Condition defaultFilter, Condition condition, OrderField... orderByFields) { + return Flux.create(emitter -> this.jdbcRepository + .getDslContextWrapper() + .transaction(configuration -> { + DSLContext context = DSL.using(configuration); + + var select = context + .select(SOURCE_FIELD, VALUE_FIELD, NAMESPACE_FIELD, TENANT_FIELD) + .from(this.jdbcRepository.getTable()) + .where(defaultFilter); + + if (condition != null) { + select = select.and(condition); + } + + if (orderByFields != null) { + select.orderBy(orderByFields); + } + + try (Stream> stream = select.fetchSize(FETCH_SIZE).stream()){ + stream + .map(record -> (Flow) jdbcRepository.map(record)) + .forEach(emitter::next); + } finally { + emitter.complete(); + } + }), FluxSink.OverflowStrategy.BUFFER); + } + @Override public Integer lastRevision(String tenantId, String namespace, String id) { return this.jdbcRepository diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java index f57f27747f..5d2c973a08 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java @@ -59,7 +59,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo @Override public Triggers.Fields dateFilterField() { - return null; + return Triggers.Fields.NEXT_EXECUTION_DATE; } public AbstractJdbcTriggerRepository(io.kestra.jdbc.AbstractJdbcRepository jdbcRepository, @@ -82,7 +82,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo public List findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) { JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface; - + return jdbcSchedulerContext.getContext() .select(field("value")) .from(this.jdbcRepository.getTable()) @@ -98,9 +98,9 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo .fetch() .map(r -> this.jdbcRepository.deserialize(r.get("value", String.class))); } - + public List findByNextExecutionDateReadyButLockedTriggers(ZonedDateTime now) { - + return this.jdbcRepository.getDslContextWrapper() .transactionResult(configuration -> DSL.using(configuration) .select(field("value")) @@ -115,7 +115,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo .fetch() .map(r -> this.jdbcRepository.deserialize(r.get("value", String.class)))); } - + protected Temporal toNextExecutionTime(ZonedDateTime now) { return now.toOffsetDateTime(); } @@ -199,7 +199,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo } @Override public ArrayListTotal find(Pageable pageable, String tenantId, List filters) { - var condition = filter(filters, "next_execution_date", Resource.TRIGGER); + var condition = filter(filters, fieldsMapping.get(dateFilterField()), Resource.TRIGGER); return findPage(pageable, tenantId, condition); } @@ -223,11 +223,13 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepo return findPage(pageable, tenantId, condition); } - /** {@inheritDoc} */ @Override - public Flux find(String tenantId, List filters) { - var condition = filter(filters, "next_execution_date", Resource.TRIGGER); - return findAsync(tenantId, condition); + public Flux findAsync(String tenantId, List filters) { + if (filters == null || filters.isEmpty()) { + return findAllAsync(tenantId); + } + Condition condition = this.filter(filters, fieldsMapping.get(dateFilterField()), Resource.TRIGGER); + return findAsync(defaultFilter(tenantId), condition); } protected Condition fullTextCondition(String query) { diff --git a/ui/src/components/admin/Triggers.vue b/ui/src/components/admin/Triggers.vue index 8b38495007..aed0c296dc 100644 --- a/ui/src/components/admin/Triggers.vue +++ b/ui/src/components/admin/Triggers.vue @@ -1,5 +1,11 @@