chore(system): share JDBC repository code in an abstract CRUD repository

This commit is contained in:
Loïc Mathieu
2025-11-14 12:57:33 +01:00
parent 4fc6948037
commit 735697ac71
38 changed files with 668 additions and 797 deletions

View File

@@ -23,12 +23,12 @@ import java.util.Objects;
@Singleton
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
private final FlowRepositoryInterface flowRepository;
private final ExecutionRepositoryInterface executionRepository;
private final DashboardRepositoryInterface dashboardRepository;
private final boolean enabled;
@Inject
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
ExecutionRepositoryInterface executionRepository,
@@ -37,26 +37,26 @@ public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.Us
this.flowRepository = flowRepository;
this.executionRepository = executionRepository;
this.dashboardRepository = dashboardRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public UsageEvent report(final Instant now, TimeInterval interval) {
return UsageEvent
.builder()
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
.dashboards(new Count(dashboardRepository.count()))
.dashboards(new Count(dashboardRepository.countAllForAllTenants()))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
Objects.requireNonNull(tenant, "tenant is null");
@@ -67,7 +67,7 @@ public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.Us
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
.build();
}
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized

View File

@@ -16,14 +16,14 @@ import java.util.Map;
import java.util.Optional;
public interface DashboardRepositoryInterface {
/**
* Gets the total number of Dashboards.
*
* @return the total number.
*/
long count();
long countAllForAllTenants();
Boolean isEnabled();
Optional<Dashboard> get(String tenantId, String id);

View File

@@ -39,7 +39,7 @@ public interface TriggerRepositoryInterface extends QueryBuilderInterface<Trigge
* @param tenantId the tenant of the triggers
* @return The count.
*/
int count(@Nullable String tenantId);
long countAll(@Nullable String tenantId);
/**
* Find all triggers that match the query, return a flux of triggers

View File

@@ -192,7 +192,7 @@ public abstract class AbstractTriggerRepositoryTest {
.build()
);
// When
int count = triggerRepository.count(tenant);
long count = triggerRepository.countAll(tenant);
// Then
assertThat(count).isEqualTo(1);
}

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.h2;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.QueryBuilderInterface;
import io.kestra.jdbc.repository.AbstractJdbcDashboardRepository;
import io.micronaut.context.event.ApplicationEventPublisher;
@@ -17,9 +18,10 @@ import java.util.List;
public class H2DashboardRepository extends AbstractJdbcDashboardRepository {
@Inject
public H2DashboardRepository(@Named("dashboards") H2Repository<Dashboard> repository,
QueueService queueService,
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
List<QueryBuilderInterface<?>> queryBuilders) {
super(repository, eventPublisher, queryBuilders);
super(repository, queueService, eventPublisher, queryBuilders);
}
@Override

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.h2;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
@@ -22,10 +23,11 @@ import java.util.*;
public class H2ExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public H2ExecutionRepository(@Named("executions") H2Repository<Execution> repository,
QueueService queueService,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, applicationContext, executorStateStorage, filterService);
super(repository, queueService, applicationContext, executorStateStorage, filterService);
}
@Override

View File

@@ -1,27 +1,20 @@
package io.kestra.repository.h2;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
import io.kestra.jdbc.services.JdbcFilterService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Singleton
@H2RepositoryEnabled
public class H2KvMetadataRepository extends AbstractJdbcKvMetadataRepository {
@Inject
public H2KvMetadataRepository(@Named("kvMetadata") H2Repository<PersistedKvMetadata> repository) {
super(repository);
public H2KvMetadataRepository(@Named("kvMetadata") H2Repository<PersistedKvMetadata> repository, QueueService queueService, ApplicationContext applicationContext) {
super(repository, queueService);
}

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.h2;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -19,8 +20,9 @@ import java.util.List;
public class H2LogRepository extends AbstractJdbcLogRepository {
@Inject
public H2LogRepository(@Named("logs") H2Repository<LogEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, filterService);
super(repository, queueService, filterService);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.h2;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -17,8 +18,9 @@ import java.util.Date;
public class H2MetricRepository extends AbstractJdbcMetricRepository {
@Inject
public H2MetricRepository(@Named("metrics") H2Repository<MetricEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, filterService);
super(repository, queueService, filterService);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.h2;
import io.kestra.core.models.Setting;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -12,7 +13,8 @@ import jakarta.inject.Singleton;
public class H2SettingRepository extends AbstractJdbcSettingRepository {
@Inject
public H2SettingRepository(@Named("settings") H2Repository<Setting> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, applicationContext);
super(repository, queueService, applicationContext);
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.h2;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -17,8 +18,9 @@ import java.util.List;
public class H2TemplateRepository extends AbstractJdbcTemplateRepository {
@Inject
public H2TemplateRepository(@Named("templates") H2Repository<Template> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, applicationContext);
super(repository, queueService, applicationContext);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.h2;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -17,8 +18,9 @@ import java.util.Date;
public class H2TriggerRepository extends AbstractJdbcTriggerRepository {
@Inject
public H2TriggerRepository(@Named("triggers") H2Repository<Trigger> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, filterService);
super(repository, queueService, filterService);
}
@Override

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.mysql;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.QueryBuilderInterface;
import io.kestra.jdbc.repository.AbstractJdbcDashboardRepository;
import io.micronaut.context.event.ApplicationEventPublisher;
@@ -17,9 +18,10 @@ import java.util.List;
public class MysqlDashboardRepository extends AbstractJdbcDashboardRepository {
@Inject
public MysqlDashboardRepository(@Named("dashboards") MysqlRepository<Dashboard> repository,
QueueService queueService,
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
List<QueryBuilderInterface<?>> queryBuilders) {
super(repository, eventPublisher, queryBuilders);
super(repository, queueService, eventPublisher, queryBuilders);
}
@Override

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
@@ -25,10 +26,11 @@ import static io.kestra.core.models.QueryFilter.Op.EQUALS;
public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public MysqlExecutionRepository(@Named("executions") MysqlRepository<Execution> repository,
QueueService queueService,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, applicationContext, executorStateStorage, filterService);
super(repository, queueService, applicationContext, executorStateStorage, filterService);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -17,9 +18,10 @@ import java.util.List;
public class MysqlKvMetadataRepository extends AbstractJdbcKvMetadataRepository {
@Inject
public MysqlKvMetadataRepository(
@Named("kvMetadata") MysqlRepository<PersistedKvMetadata> repository
@Named("kvMetadata") MysqlRepository<PersistedKvMetadata> repository,
QueueService queueService
) {
super(repository);
super(repository, queueService);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -19,8 +20,9 @@ import java.util.Date;
public class MysqlLogRepository extends AbstractJdbcLogRepository {
@Inject
public MysqlLogRepository(@Named("logs") MysqlRepository<LogEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, filterService);
super(repository, queueService, filterService);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -18,8 +19,9 @@ import java.util.Date;
public class MysqlMetricRepository extends AbstractJdbcMetricRepository {
@Inject
public MysqlMetricRepository(@Named("metrics") MysqlRepository<MetricEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, filterService);
super(repository, queueService, filterService);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.Setting;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -12,7 +13,8 @@ import jakarta.inject.Singleton;
public class MysqlSettingRepository extends AbstractJdbcSettingRepository {
@Inject
public MysqlSettingRepository(@Named("settings") MysqlRepository<Setting> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, applicationContext);
super(repository, queueService, applicationContext);
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -17,8 +18,9 @@ import java.util.Arrays;
public class MysqlTemplateRepository extends AbstractJdbcTemplateRepository {
@Inject
public MysqlTemplateRepository(@Named("templates") MysqlRepository<Template> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, applicationContext);
super(repository, queueService, applicationContext);
}
@Override

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.ScheduleContextInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.jdbc.runner.JdbcSchedulerContext;
@@ -25,8 +26,9 @@ import java.util.List;
public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
@Inject
public MysqlTriggerRepository(@Named("triggers") MysqlRepository<Trigger> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, filterService);
super(repository, queueService, filterService);
}
@Override

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.postgres;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.QueryBuilderInterface;
import io.kestra.jdbc.repository.AbstractJdbcDashboardRepository;
import io.micronaut.context.event.ApplicationEventPublisher;
@@ -17,9 +18,10 @@ import java.util.List;
public class PostgresDashboardRepository extends AbstractJdbcDashboardRepository {
@Inject
public PostgresDashboardRepository(@Named("dashboards") PostgresRepository<Dashboard> repository,
QueueService queueService,
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
List<QueryBuilderInterface<?>> queryBuilders) {
super(repository, eventPublisher, queryBuilders);
super(repository, queueService, eventPublisher, queryBuilders);
}
@Override

View File

@@ -3,6 +3,7 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
@@ -24,10 +25,11 @@ import java.util.*;
public class PostgresExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public PostgresExecutionRepository(@Named("executions") PostgresRepository<Execution> repository,
QueueService queueService,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService) {
super(repository, applicationContext, executorStateStorage, filterService);
super(repository, queueService, applicationContext, executorStateStorage, filterService);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -17,9 +18,10 @@ import java.util.List;
public class PostgresKvMetadataRepository extends AbstractJdbcKvMetadataRepository {
@Inject
public PostgresKvMetadataRepository(
@Named("kvMetadata") PostgresRepository<PersistedKvMetadata> repository
@Named("kvMetadata") PostgresRepository<PersistedKvMetadata> repository,
QueueService queueService
) {
super(repository);
super(repository, queueService);
}
@Override

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.dashboards.filters.AbstractFilter;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -26,8 +27,9 @@ public class PostgresLogRepository extends AbstractJdbcLogRepository {
@Inject
public PostgresLogRepository(@Named("logs") PostgresRepository<LogEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, filterService);
super(repository, queueService, filterService);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -17,8 +18,9 @@ import java.util.Date;
public class PostgresMetricRepository extends AbstractJdbcMetricRepository {
@Inject
public PostgresMetricRepository(@Named("metrics") PostgresRepository<MetricEntry> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, filterService);
super(repository, queueService, filterService);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.Setting;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -12,7 +13,8 @@ import jakarta.inject.Singleton;
public class PostgresSettingRepository extends AbstractJdbcSettingRepository {
@Inject
public PostgresSettingRepository(@Named("settings") PostgresRepository<Setting> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, applicationContext);
super(repository, queueService, applicationContext);
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.queues.QueueService;
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -17,8 +18,9 @@ import java.util.Collections;
public class PostgresTemplateRepository extends AbstractJdbcTemplateRepository {
@Inject
public PostgresTemplateRepository(@Named("templates") PostgresRepository<Template> repository,
QueueService queueService,
ApplicationContext applicationContext) {
super(repository, applicationContext);
super(repository, queueService, applicationContext);
}
@Override

View File

@@ -1,6 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.kestra.jdbc.services.JdbcFilterService;
@@ -17,8 +18,9 @@ import java.util.Date;
public class PostgresTriggerRepository extends AbstractJdbcTriggerRepository {
@Inject
public PostgresTriggerRepository(@Named("triggers") PostgresRepository<Trigger> repository,
QueueService queueService,
JdbcFilterService filterService) {
super(repository, filterService);
super(repository, queueService, filterService);
}
@Override

View File

@@ -0,0 +1,439 @@
package io.kestra.jdbc.repository;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.utils.ListUtils;
import io.micronaut.data.model.Pageable;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
/**
* Base JDBC repository for CRUD operations.
* <p>
* NOTE: it uses the <code>defaultFilter(tenantId)</code> for querying.
* If the child repository uses a default filter, it should override it.
* <p>
* For example, to avoid supporting allowDeleted:
* <pre>{@code
* @Override
* protected Condition defaultFilter(String tenantId) {
* return buildTenantCondition(tenantId);
* }
*
* @Override
* protected Condition defaultFilter() {
* return DSL.trueCondition();
* }
* }</pre>
*
* @param <T> the type of the persisted entity.
*/
public abstract class AbstractJdbcCrudRepository<T> extends AbstractJdbcRepository {
protected static final Field<String> KEY_FIELD = field("key", String.class);
protected static final Field<String> VALUE_FIELD = field("value", String.class);
protected io.kestra.jdbc.AbstractJdbcRepository<T> jdbcRepository;
protected QueueService queueService;
public AbstractJdbcCrudRepository(io.kestra.jdbc.AbstractJdbcRepository<T> jdbcRepository, QueueService queueService) {
this.jdbcRepository = jdbcRepository;
this.queueService = queueService;
}
/**
* Creates an item: persist it inside the database and return it.
* It uses an insert on conflict update to avoid concurrent write issues.
*/
public T create(T item) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(item);
this.jdbcRepository.persist(item, fields);
return item;
}
/**
* Save an item: persist it inside the database and return it.
* It uses an insert on conflict update to avoid concurrent write issues.
*/
public T save(T item) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(item);
this.jdbcRepository.persist(item, fields);
return item;
}
/**
* Creates an item: persist it inside the database and return it.
* It uses an insert on conflict update to avoid concurrent write issues.
*/
public T save(DSLContext context, T item) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(item);
this.jdbcRepository.persist(item, context, fields);
return item;
}
/**
* Save a list of items: persist them inside the database and return the updated count.
*/
public int saveBatch(List<T> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}
return this.jdbcRepository.persistBatch(items);
}
/**
* Update an item: persist it inside the database and return it.
* It uses an update statement, so the item must be already present in the database.
*/
public T update(T current) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSL.using(configuration)
.update(this.jdbcRepository.getTable())
.set(this.jdbcRepository.persistFields((current)))
.where(KEY_FIELD.eq(queueService.key(current)))
.execute();
return current;
});
}
/**
* Find one item that matches the condition.
* <p>
* It uses LIMIT 1 and doesn't throw if the query returns more than one result.
*
* @see #findOne(String, Condition, boolean, OrderField...)
* @see #findOne(Condition, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> Optional<T> findOne(String tenantId, Condition condition, OrderField<F>... orderByFields) {
return findOne(defaultFilter(tenantId), condition, orderByFields);
}
/**
* Find one item that matches the condition.
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
* <p>
* It uses LIMIT 1 and doesn't throw if the query returns more than one result.
*
* @see #findOne(String, Condition, OrderField...)
* @see #findOne(Condition, Condition, OrderField[])
*/
@SafeVarargs
protected final <F> Optional<T> findOne(String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
return findOne(defaultFilter(tenantId, allowDeleted), condition, orderByFields);
}
/**
* Find one item that matches the condition.
* <p>
* It uses LIMIT 1 and doesn't throw if the query returns more than one result.
*
* @see #findOne(String, Condition, OrderField...)
* @see #findOne(String, Condition, boolean, OrderField...)
*/
@SafeVarargs
protected final <F> Optional<T> findOne(Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter)
.and(condition);
if (orderByFields != null) {
select.orderBy(orderByFields);
}
select.limit(1);
return this.jdbcRepository.fetchOne(select);
});
}
/**
* List all items that match the condition.
*
* @see #findAsync(String, Condition, OrderField...)
* @see #findPage(Pageable, String, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> List<T> find(String tenantId, Condition condition, OrderField<F>... orderByFields) {
return find(defaultFilter(tenantId), condition, orderByFields);
}
/**
* List all items that match the condition.
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
*
* @see #findAsync(String, Condition, boolean, OrderField...)
* @see #findPage(Pageable, String, Condition, boolean, OrderField...)
*/
@SafeVarargs
protected final <F> List<T> find(String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
return find(defaultFilter(tenantId, allowDeleted), condition, orderByFields);
}
/**
* List all items that match the condition.
*
* @see #findAsync(Condition, Condition, OrderField...)
* @see #findPage(Pageable, Condition, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> List<T> find(Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter)
.and(condition);
if (orderByFields != null) {
select.orderBy(orderByFields);
}
return this.jdbcRepository.fetch(select);
});
}
/**
* Find all items that match the condition and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
*
* @see #find(String, Condition, OrderField...)
* @see #findPage(Pageable, String, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> Flux<T> findAsync(String tenantId, Condition condition, OrderField<F>... orderByFields) {
return findAsync(defaultFilter(tenantId), condition, orderByFields);
}
/**
* Find all items that match the condition and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
*
* @see #find(String, Condition, boolean, OrderField...)
* @see #findPage(Pageable, String, Condition, boolean, OrderField...)
*/
@SafeVarargs
protected final <F> Flux<T> findAsync(String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
return findAsync(defaultFilter(tenantId, allowDeleted), condition, orderByFields);
}
/**
* Find all items that match the condition and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
*
* @see #find(Condition, Condition, OrderField...)
* @see #findPage(Pageable, Condition, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> Flux<T> findAsync(Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
var select = context
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter)
.and(condition);
if (orderByFields != null) {
select.orderBy(orderByFields);
}
try (Stream<Record1<String>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
/**
* Find a page of items that match the condition and return them.
*
* @see #find(String, Condition, OrderField...)
* @see #findAsync(String, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> ArrayListTotal<T> findPage(Pageable pageable, String tenantId, Condition condition, OrderField<F>... orderByFields) {
return findPage(pageable, defaultFilter(tenantId), condition, orderByFields);
}
/**
* Find a page of items that match the condition and return them.
* You can use <code>allowDeleted</code> to decide whether deleted items should be included or not.
*
* @see #find(String, Condition, boolean, OrderField...)
* @see #findAsync(String, Condition, boolean, OrderField...)
*/
@SafeVarargs
protected final <F> ArrayListTotal<T> findPage(Pageable pageable, String tenantId, Condition condition, boolean allowDeleted, OrderField<F>... orderByFields) {
return findPage(pageable, defaultFilter(tenantId, allowDeleted), condition, orderByFields);
}
/**
* Find a page of items that match the condition and return them.
*
* @see #find(Condition, Condition, OrderField...)
* @see #findAsync(Condition, Condition, OrderField...)
*/
@SafeVarargs
protected final <F> ArrayListTotal<T> findPage(Pageable pageable, Condition defaultFilter, Condition condition, OrderField<F>... orderByFields) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
var select = context
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter)
.and(condition);
if (orderByFields != null) {
select.orderBy(orderByFields);
}
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
/**
* Find all items.
*
* @see #findAllAsync(String)
*/
public List<T> findAll(String tenantId) {
return findAll(defaultFilter(tenantId));
}
/**
* Find all items.
*
* @see #findAllAsync(Condition)
*/
protected List<T> findAll(Condition defaultFilter) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter);
return this.jdbcRepository.fetch(select);
});
}
/**
* Find all items and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
*
* @see #findAll(String)
*/
public Flux<T> findAllAsync(String tenantId) {
return findAllAsync(defaultFilter(tenantId));
}
/**
* Find all items and return a reactive stream.
* To avoid any potential issues with databases that load all the resultset in memory, it batches the results by <code>FETCH_SIZE</code>.
*
* @see #findAll(Condition)
*/
protected Flux<T> findAllAsync(Condition defaultFilter) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
var select = context
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(defaultFilter);
try (Stream<Record1<String>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
/**
* Find all items, for all tenants.
* WARNING: this method should never be used inside the API as it didn't enforce tenant selection!
*/
public List<T> findAllForAllTenants() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(VALUE_FIELD)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
return this.jdbcRepository.fetch(select);
});
}
/**
* Count items that match the condition.
*
* @see #countAll(String)
* @see #countAllForAllTenants()
*/
protected long count(String tenantId, Condition condition) {
return this.jdbcRepository.count(this.defaultFilter(tenantId).and(condition));
}
/**
* Count all items.
*
* @see #count(String, Condition)
* @see #countAllForAllTenants()
*/
public long countAll(String tenantId) {
return this.jdbcRepository.count(this.defaultFilter(tenantId));
}
/**
* Count all items for all tenants.
* WARNING: this method should never be used inside the API as it didn't enforce tenant selection!
*
* @see #count(String, Condition)
* @see #countAll(String)
*/
public long countAllForAllTenants() {
return this.jdbcRepository.count(this.defaultFilter());
}
}

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.DataChart;
import io.kestra.core.models.dashboards.charts.DataChartKPI;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.DashboardRepositoryInterface;
import io.kestra.core.repositories.QueryBuilderInterface;
@@ -14,7 +15,6 @@ import io.kestra.plugin.core.dashboard.chart.kpis.KpiOption;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.data.model.Pageable;
import jakarta.validation.ConstraintViolationException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.*;
import org.jooq.impl.DSL;
@@ -30,19 +30,17 @@ import java.util.Optional;
import static io.kestra.core.utils.MathUtils.roundDouble;
@Slf4j
@AllArgsConstructor
public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcRepository implements DashboardRepositoryInterface {
protected io.kestra.jdbc.AbstractJdbcRepository<Dashboard> jdbcRepository;
public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcCrudRepository<Dashboard> implements DashboardRepositoryInterface {
private final ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher;
private final List<QueryBuilderInterface<?>> queryBuilders;
List<QueryBuilderInterface<?>> queryBuilders;
/**
* {@inheritDoc}
**/
@Override
public long count() {
return jdbcRepository.count(this.defaultFilter());
public AbstractJdbcDashboardRepository(io.kestra.jdbc.AbstractJdbcRepository<Dashboard> jdbcRepository,
QueueService queueService,
ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher,
List<QueryBuilderInterface<?>> queryBuilders) {
super(jdbcRepository, queueService);
this.eventPublisher = eventPublisher;
this.queryBuilders = queryBuilders;
}
@@ -77,58 +75,12 @@ public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcReposi
@Override
public ArrayListTotal<Dashboard> list(Pageable pageable, String tenantId, String query) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
select = select.and(this.findCondition(query));
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@Override
public List<Dashboard> findAll(String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return this.jdbcRepository.fetch(select);
});
return findPage(pageable, tenantId, this.findCondition(query));
}
@Override
public List<Dashboard> findAllWithNoAcl(String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(jdbcRepository.getTable())
.where(this.defaultFilterWithNoACL(tenantId));
return this.jdbcRepository.fetch(select);
});
return findAll(this.defaultFilterWithNoACL(tenantId));
}
@Override

View File

@@ -17,6 +17,7 @@ import io.kestra.core.models.flows.FlowScope;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.Executor;
@@ -57,14 +58,13 @@ import java.util.stream.Stream;
import static io.kestra.core.models.QueryFilter.Field.KIND;
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcQueueIndexerInterface<Execution> {
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRepository<Execution> implements ExecutionRepositoryInterface, JdbcQueueIndexerInterface<Execution> {
private static final int FETCH_SIZE = 100;
private static final Field<String> STATE_CURRENT_FIELD = field("state_current", String.class);
private static final Field<String> NAMESPACE_FIELD = field("namespace", String.class);
private static final Field<Object> START_DATE_FIELD = field("start_date");
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull();
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
private final ApplicationContext applicationContext;
protected final AbstractJdbcExecutorStateStorage executorStateStorage;
@@ -100,11 +100,12 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
@SuppressWarnings("unchecked")
public AbstractJdbcExecutionRepository(
io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository,
QueueService queueService,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage,
JdbcFilterService filterService
) {
this.jdbcRepository = jdbcRepository;
super(jdbcRepository, queueService);
this.executorStateStorage = executorStateStorage;
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.namespaceUtils = applicationContext.getBean(NamespaceUtils.class);
@@ -130,27 +131,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
@Override
public Flux<Execution> findAllByTriggerExecutionId(String tenantId,
String triggerExecutionId) {
return Flux.create(
emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(field("trigger_execution_id").eq(triggerExecutionId));
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
stream.map(this.jdbcRepository::map).forEach(emitter::next);
} finally {
emitter.complete();
}
}),
FluxSink.OverflowStrategy.BUFFER
);
var condition = field("trigger_execution_id").eq(triggerExecutionId);
return findAsync(tenantId, condition);
}
/**
@@ -158,20 +140,10 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
**/
@Override
public Optional<Execution> findLatestForStates(String tenantId, String namespace, String flowId, List<State.Type> states) {
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId, false))
.and(field("namespace").eq(namespace))
.and(field("flow_id").eq(flowId))
.and(statesFilter(states))
.orderBy(field("start_date").desc());
return this.jdbcRepository.fetchOne(from);
});
var condition = field("namespace").eq(namespace)
.and(field("flow_id").eq(flowId))
.and(this.statesFilter(states));
return findOne(tenantId, condition, field("start_date").desc());
}
@Override
@@ -185,19 +157,12 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
}
public Optional<Execution> findById(String tenantId, String id, boolean allowDeleted, boolean withAccessControl) {
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(withAccessControl ? this.defaultFilter(tenantId, allowDeleted) : this.defaultFilterWithNoACL(tenantId, allowDeleted))
.and(field("key").eq(id));
return this.jdbcRepository.fetchOne(from);
});
Condition defaultFilter = withAccessControl ? this.defaultFilter(tenantId, allowDeleted) : this.defaultFilterWithNoACL(tenantId, allowDeleted);
Condition condition = field("key").eq(id);
return findOne(defaultFilter, condition);
}
abstract protected Condition findCondition(String query, Map<String, String> labels);
protected Condition findQueryCondition(String query) {
@@ -218,20 +183,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
@Nullable List<QueryFilter> filters
) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = this.findSelect(
context,
tenantId,
filters
);
return this.jdbcRepository.fetchPage(context, select, pageable);
});
return findPage(pageable, tenantId, this.computeFindCondition(filters));
}
@Override
@@ -283,27 +235,11 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
);
}
private SelectConditionStep<Record1<Object>> findSelect(
DSLContext context,
@Nullable String tenantId,
@Nullable List<QueryFilter> filters
) {
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId, false));
private Condition computeFindCondition(@Nullable List<QueryFilter> filters) {
boolean hasKindFilter = filters != null && filters.stream()
.anyMatch(f -> KIND.value().equalsIgnoreCase(f.field().name()) );
if (!hasKindFilter) {
select = select.and(NORMAL_KIND_CONDITION);
}
select = select.and(this.filter(filters, "start_date", Resource.EXECUTION));
return select;
return hasKindFilter ? this.filter(filters, "start_date", Resource.EXECUTION) :
this.filter(filters, "start_date", Resource.EXECUTION).and(NORMAL_KIND_CONDITION);
}
private SelectConditionStep<Record1<Object>> findSelect(
@@ -345,43 +281,10 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
return select;
}
@Override
public Flux<Execution> findAllAsync(@Nullable String tenantId) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()) {
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
@Override
public ArrayListTotal<Execution> findByFlowId(String tenantId, String namespace, String id, Pageable pageable) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(field("namespace").eq(namespace))
.and(field("flow_id").eq(id));
return this.jdbcRepository.fetchPage(context, select, pageable);
});
var condition = field("namespace").eq(namespace).and(field("flow_id").eq(id));
return findPage(pageable, tenantId, condition);
}
@Override
@@ -892,47 +795,6 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
});
}
@Override
public Execution save(Execution execution) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(execution);
this.jdbcRepository.persist(execution, fields);
return execution;
}
@Override
public Execution save(DSLContext dslContext, Execution execution) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(execution);
this.jdbcRepository.persist(execution, dslContext, fields);
return execution;
}
@Override
public int saveBatch(List<Execution> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}
return this.jdbcRepository.persistBatch(items);
}
@Override
public Execution update(Execution execution) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSL.using(configuration)
.update(this.jdbcRepository.getTable())
.set(this.jdbcRepository.persistFields((execution)))
.where(field("key").eq(execution.getId()))
.execute();
return execution;
});
}
@SneakyThrows
@Override
public Execution delete(Execution execution) {

View File

@@ -1,7 +1,6 @@
package io.kestra.jdbc.repository;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
@@ -169,6 +168,7 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
.set(this.jdbcRepository.persistFields(flowTopology));
}
@Override
public FlowTopology save(FlowTopology flowTopology) {
this.jdbcRepository.persist(flowTopology);
@@ -184,6 +184,7 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
return flowTopology;
}
protected Condition buildTenantCondition(String prefix, String tenantId) {
return tenantId == null ? field(prefix + "_tenant_id").isNull() : field(prefix + "_tenant_id").eq(tenantId);
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.TenantAndNamespace;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.micronaut.data.model.Pageable;
@@ -17,14 +18,13 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepository implements KvMetadataRepositoryInterface {
protected final io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository;
public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcCrudRepository<PersistedKvMetadata> implements KvMetadataRepositoryInterface {
@SuppressWarnings("unchecked")
public AbstractJdbcKvMetadataRepository(
io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository
io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository,
QueueService queueService
) {
this.jdbcRepository = jdbcRepository;
super(jdbcRepository, queueService);
}
private static Condition lastCondition(boolean isLast) {
@@ -44,38 +44,22 @@ public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepos
@Override
public Optional<PersistedKvMetadata> findByName(String tenantId, String namespace, String name) {
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId, true))
.and(field("namespace").eq(namespace))
.and(field("name").eq(name))
.and(lastCondition());
return this.jdbcRepository.fetchOne(from);
});
var condition = field("namespace").eq(namespace)
.and(field("name").eq(name))
.and(lastCondition());
return findOne(tenantId, condition, true);
}
private SelectConditionStep<Record1<Object>> findSelect(
DSLContext context,
@Nullable String tenantId,
private Condition findSelect(
@Nullable List<QueryFilter> filters,
boolean allowDeleted,
boolean allowExpired,
FetchVersion fetchBehavior
) {
SelectConditionStep<Record1<Object>> condition = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId, allowDeleted))
.and(allowExpired ? DSL.trueCondition() : DSL.or(
field("expiration_date").greaterThan(Instant.now()),
field("expiration_date").isNull()
))
.and(this.filter(filters, "updated", QueryFilter.Resource.KV_METADATA));
var condition = allowExpired ? DSL.trueCondition() : DSL.or(
field("expiration_date").greaterThan(Instant.now()),
field("expiration_date").isNull());
condition = condition.and(this.filter(filters, "updated", QueryFilter.Resource.KV_METADATA));
switch (fetchBehavior) {
case LATEST -> condition = condition.and(lastCondition());
@@ -87,22 +71,8 @@ public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepos
@Override
public ArrayListTotal<PersistedKvMetadata> find(Pageable pageable, String tenantId, List<QueryFilter> filters, boolean allowDeleted, boolean allowExpired, FetchVersion fetchBehavior) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = this.findSelect(
context,
tenantId,
filters,
allowDeleted,
allowExpired,
fetchBehavior
);
return this.jdbcRepository.fetchPage(context, select, pageable);
});
var condition = findSelect(filters, allowExpired, fetchBehavior);
return this.findPage(pageable, tenantId, condition, allowDeleted);
}
@Override

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.filters.AbstractFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.utils.DateUtils;
@@ -22,22 +23,20 @@ import org.jooq.Record;
import org.jooq.impl.DSL;
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
public abstract class AbstractJdbcLogRepository extends AbstractJdbcCrudRepository<LogEntry> implements LogRepositoryInterface {
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
public static final String DATE_COLUMN = "timestamp";
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;
private static final String DATE_COLUMN = "timestamp";
public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository,
QueueService queueService,
JdbcFilterService filterService) {
this.jdbcRepository = jdbcRepository;
super(jdbcRepository, queueService);
this.filterService = filterService;
}
@@ -86,21 +85,8 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
@Nullable String tenantId,
@Nullable List<QueryFilter> filters
) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
return this.jdbcRepository.fetchPage(context, select, pageable);
});
var condition = NORMAL_KIND_CONDITION.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
return findPage(pageable, tenantId, condition);
}
@Override
@@ -108,48 +94,8 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
@Nullable String tenantId,
List<QueryFilter> filters
){
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
select.orderBy(field(DATE_COLUMN).asc());
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
@Override
public Flux<LogEntry> findAllAsync(@Nullable String tenantId) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
var condition = NORMAL_KIND_CONDITION.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
return findAsync(tenantId, condition, field(DATE_COLUMN).asc());
}
@Override
@@ -302,23 +248,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
);
}
@Override
public LogEntry save(LogEntry log) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(log);
this.jdbcRepository.persist(log, fields);
return log;
}
@Override
public int saveBatch(List<LogEntry> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}
return this.jdbcRepository.persistBatch(items);
}
@Override
public Integer purge(Execution execution) {
return this.jdbcRepository
@@ -457,47 +386,14 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
}
private ArrayListTotal<LogEntry> query(String tenantId, Condition condition, Level minLevel, Pageable pageable) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
select = select.and(condition);
if (minLevel != null) {
select = select.and(minLevel(minLevel));
}
return this.jdbcRepository.fetchPage(context, select, pageable
);
});
var theCondition = minLevel != null ? condition.and(minLevel(minLevel)) : condition;
return findPage(pageable, tenantId, theCondition);
}
private List<LogEntry> query(String tenantId, Condition condition, Level minLevel, boolean withAccessControl) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(withAccessControl ? this.defaultFilter(tenantId) : this.defaultFilterWithNoACL(tenantId));
select = select.and(condition);
if (minLevel != null) {
select = select.and(minLevel(minLevel));
}
return this.jdbcRepository.fetch(select
.orderBy(field(DATE_COLUMN).sort(SortOrder.ASC))
);
});
var defaultFilter = withAccessControl ? this.defaultFilter(tenantId) : this.defaultFilterWithNoACL(tenantId);
var theCondition = minLevel != null ? condition.and(minLevel(minLevel)) : condition;
return find(defaultFilter, theCondition, field(DATE_COLUMN).sort(SortOrder.ASC));
}
private Condition minLevel(Level minLevel) {
@@ -512,7 +408,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
return this.jdbcRepository.getDslContextWrapper().transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
ColumnDescriptor<Logs.Fields> columnDescriptor = dataFilter.getColumns();
String columnKey = this.getFieldsMapping().get(columnDescriptor.getField());
Field<?> field = columnToField(columnDescriptor, getFieldsMapping());
if (columnDescriptor.getAgg() != null) {
field = filterService.buildAggregation(field, columnDescriptor.getAgg());

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.metrics.MetricAggregation;
import io.kestra.core.models.executions.metrics.MetricAggregations;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.utils.DateUtils;
@@ -20,8 +21,6 @@ import lombok.Getter;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.Duration;
import java.time.ZoneId;
@@ -31,15 +30,14 @@ import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface {
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcCrudRepository<MetricEntry> implements MetricRepositoryInterface {
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
protected io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository;
public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository,
QueueService queueService,
JdbcFilterService filterService) {
this.jdbcRepository = jdbcRepository;
super(jdbcRepository, queueService);
this.filterService = filterService;
}
@@ -71,54 +69,33 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
@Override
public ArrayListTotal<MetricEntry> findByExecutionId(String tenantId, String executionId, Pageable pageable) {
return this.query(
return this.findPage(
pageable,
tenantId,
field("execution_id").eq(executionId)
, pageable
);
}
@Override
public ArrayListTotal<MetricEntry> findByExecutionIdAndTaskId(String tenantId, String executionId, String taskId, Pageable pageable) {
return this.query(
return this.findPage(
pageable,
tenantId,
field("execution_id").eq(executionId)
.and(field("task_id").eq(taskId)),
pageable
.and(field("task_id").eq(taskId))
);
}
@Override
public ArrayListTotal<MetricEntry> findByExecutionIdAndTaskRunId(String tenantId, String executionId, String taskRunId, Pageable pageable) {
return this.query(
return this.findPage(
pageable,
tenantId,
field("execution_id").eq(executionId)
.and(field("taskrun_id").eq(taskRunId)),
pageable
.and(field("taskrun_id").eq(taskRunId))
);
}
@Override
public Flux<MetricEntry> findAllAsync(@io.micronaut.core.annotation.Nullable String tenantId) {
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
emitter.complete();
}
}), FluxSink.OverflowStrategy.BUFFER);
}
@Override
public List<String> flowMetrics(
String tenantId,
@@ -198,23 +175,6 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
.build();
}
@Override
public MetricEntry save(MetricEntry metric) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(metric);
this.jdbcRepository.persist(metric, fields);
return metric;
}
@Override
public int saveBatch(List<MetricEntry> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}
return this.jdbcRepository.persistBatch(items);
}
@Override
public Integer purge(Execution execution) {
return this.jdbcRepository
@@ -264,22 +224,6 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
});
}
private ArrayListTotal<MetricEntry> query(String tenantId, Condition condition, Pageable pageable) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
select = select.and(condition);
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
private List<MetricAggregation> aggregate(
String tenantId,
Condition condition,
@@ -396,7 +340,6 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
return this.jdbcRepository.getDslContextWrapper().transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
ColumnDescriptor<Metrics.Fields> columnDescriptor = dataFilter.getColumns();
String columnKey = this.getFieldsMapping().get(columnDescriptor.getField());
Field<?> field = columnToField(columnDescriptor, getFieldsMapping());
if (columnDescriptor.getAgg() != null) {
field = filterService.buildAggregation(field, columnDescriptor.getAgg());

View File

@@ -3,30 +3,28 @@ package io.kestra.jdbc.repository;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.Setting;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import lombok.SneakyThrows;
import org.jooq.Field;
import org.jooq.Record1;
import org.jooq.Select;
import org.jooq.SelectJoinStep;
import org.jooq.*;
import org.jooq.impl.DSL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public abstract class AbstractJdbcSettingRepository extends AbstractJdbcRepository implements SettingRepositoryInterface {
protected final io.kestra.jdbc.AbstractJdbcRepository<Setting> jdbcRepository;
public abstract class AbstractJdbcSettingRepository extends AbstractJdbcCrudRepository<Setting> implements SettingRepositoryInterface {
private final ApplicationEventPublisher<CrudEvent<Setting>> eventPublisher;
@SuppressWarnings("unchecked")
public AbstractJdbcSettingRepository(
io.kestra.jdbc.AbstractJdbcRepository<Setting> jdbcRepository,
QueueService queueService,
ApplicationContext applicationContext
) {
this.jdbcRepository = jdbcRepository;
super(jdbcRepository, queueService);
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
}
@@ -36,31 +34,12 @@ public abstract class AbstractJdbcSettingRepository extends AbstractJdbcReposito
@Override
public Optional<Setting> findByKey(String key) {
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(field("key").eq(key));
return this.jdbcRepository.fetchOne(from);
});
return findOne(DSL.trueCondition(), field("key").eq(key));
}
@Override
public List<Setting> findAll() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectJoinStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable());
return this.jdbcRepository.fetch(select);
});
return findAll(DSL.trueCondition());
}
@Override
@@ -85,4 +64,14 @@ public abstract class AbstractJdbcSettingRepository extends AbstractJdbcReposito
return setting;
}
@Override
protected Condition defaultFilter(String tenantId) {
return buildTenantCondition(tenantId);
}
@Override
protected Condition defaultFilter() {
return DSL.trueCondition();
}
}

View File

@@ -6,6 +6,7 @@ import io.kestra.core.models.templates.Template;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.micronaut.context.ApplicationContext;
@@ -20,78 +21,26 @@ import java.util.Optional;
import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolationException;
public abstract class AbstractJdbcTemplateRepository extends AbstractJdbcRepository implements TemplateRepositoryInterface {
public abstract class AbstractJdbcTemplateRepository extends AbstractJdbcCrudRepository<Template> implements TemplateRepositoryInterface {
private final QueueInterface<Template> templateQueue;
private final ApplicationEventPublisher<CrudEvent<Template>> eventPublisher;
protected io.kestra.jdbc.AbstractJdbcRepository<Template> jdbcRepository;
@SuppressWarnings("unchecked")
public AbstractJdbcTemplateRepository(io.kestra.jdbc.AbstractJdbcRepository<Template> jdbcRepository, ApplicationContext applicationContext) {
this.jdbcRepository = jdbcRepository;
public AbstractJdbcTemplateRepository(io.kestra.jdbc.AbstractJdbcRepository<Template> jdbcRepository, QueueService queueService, ApplicationContext applicationContext) {
super(jdbcRepository, queueService);
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.templateQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TEMPLATE_NAMED));
}
@Override
public Optional<Template> findById(String tenantId, String namespace, String id) {
return jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Select<Record1<Object>> from = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(field("namespace").eq(namespace))
.and(field("id").eq(id));
return this.jdbcRepository.fetchOne(from);
});
}
@Override
public List<Template> findAll(String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return this.jdbcRepository.fetch(select);
});
var condition = field("namespace").eq(namespace).and(field("id").eq(id));
return findOne(tenantId, condition);
}
@Override
public List<Template> findAllWithNoAcl(String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilterWithNoACL(tenantId));
return this.jdbcRepository.fetch(select);
});
}
@Override
public List<Template> findAllForAllTenants() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
return this.jdbcRepository.fetch(select);
});
return findAll(this.defaultFilterWithNoACL(tenantId));
}
abstract protected Condition findCondition(String query);
@@ -102,70 +51,35 @@ public abstract class AbstractJdbcTemplateRepository extends AbstractJdbcReposit
@Nullable String tenantId,
@Nullable String namespace
) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
Condition condition = computeCondition(query, namespace);
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
if (query != null) {
select.and(this.findCondition(query));
}
if (namespace != null) {
select.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
}
return this.jdbcRepository.fetchPage(context, select, pageable);
});
return findPage(pageable, tenantId, condition);
}
@Override
public List<Template> find(@Nullable String query, @Nullable String tenantId, @Nullable String namespace) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
Condition condition = computeCondition(query, namespace);
SelectConditionStep<Record1<Object>> select = context
.select(
field("value")
)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return find(tenantId, condition);
}
if (query != null) {
select.and(this.findCondition(query));
}
private Condition computeCondition(@Nullable String query, @Nullable String namespace) {
Condition condition = DSL.trueCondition();
if (namespace != null) {
select.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
}
if (query != null) {
condition = condition.and(this.findCondition(query));
}
if (namespace != null) {
condition = condition.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
}
return this.jdbcRepository.fetch(select);
});
return condition;
}
@Override
public List<Template> findByNamespace(String tenantId, String namespace) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(field("namespace").eq(namespace))
.and(this.defaultFilter(tenantId));
return this.jdbcRepository.fetch(select);
});
var condition = field("namespace").eq(namespace);
return this.find(tenantId, condition);
}
@Override

View File

@@ -12,6 +12,7 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.ScheduleContextInterface;
@@ -23,13 +24,11 @@ import io.kestra.jdbc.services.JdbcFilterService;
import io.kestra.plugin.core.dashboard.data.ITriggers;
import io.kestra.plugin.core.dashboard.data.Triggers;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import lombok.Getter;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
@@ -37,11 +36,9 @@ import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcRepository implements TriggerRepositoryInterface, JdbcQueueIndexerInterface<Trigger> {
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcCrudRepository<Trigger> implements TriggerRepositoryInterface, JdbcQueueIndexerInterface<Trigger> {
public static final Field<Object> NAMESPACE_FIELD = field("namespace");
protected io.kestra.jdbc.AbstractJdbcRepository<Trigger> jdbcRepository;
private final JdbcFilterService filterService;
@Getter
@@ -66,83 +63,21 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
}
public AbstractJdbcTriggerRepository(io.kestra.jdbc.AbstractJdbcRepository<Trigger> jdbcRepository,
QueueService queueService,
JdbcFilterService filterService) {
this.jdbcRepository = jdbcRepository;
super(jdbcRepository, queueService);
this.filterService = filterService;
}
@Override
public Optional<Trigger> findLast(TriggerContext trigger) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(field("key").eq(trigger.uid()));
return this.jdbcRepository.fetchOne(select);
});
return findOne(DSL.trueCondition(), field("key").eq(trigger.uid()));
}
@Override
public Optional<Trigger> findByExecution(Execution execution) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
field("execution_id").eq(execution.getId())
);
return this.jdbcRepository.fetchOne(select);
});
}
@Override
public List<Trigger> findAll(String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return this.jdbcRepository.fetch(select);
});
}
@Override
public List<Trigger> findAllForAllTenants() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectJoinStep<Record1<Object>> select = DSL
.using(configuration)
.select(field("value"))
.from(this.jdbcRepository.getTable());
return this.jdbcRepository.fetch(select);
});
}
@Override
public int count(@Nullable String tenantId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> DSL
.using(configuration)
.selectCount()
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.fetchOne(0, int.class));
return findOne(execution.getTenantId(), field("execution_id").eq(execution.getId()));
}
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) {
@@ -188,24 +123,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface) {
JdbcSchedulerContext jdbcSchedulerContext = (JdbcSchedulerContext) scheduleContextInterface;
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
this.jdbcRepository.persist(trigger, jdbcSchedulerContext.getContext(), fields);
return trigger;
}
@Override
public Trigger save(Trigger trigger) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
this.jdbcRepository.persist(trigger, fields);
return trigger;
}
@Override
public Trigger save(DSLContext dslContext, Trigger trigger) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(trigger);
this.jdbcRepository.persist(trigger, dslContext, fields);
save(jdbcSchedulerContext.getContext(), trigger);
return trigger;
}
@@ -224,26 +142,12 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
});
}
@Override
public void delete(Trigger trigger) {
this.jdbcRepository.delete(trigger);
}
@Override
public Trigger update(Trigger trigger) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSL.using(configuration)
.update(this.jdbcRepository.getTable())
.set(this.jdbcRepository.persistFields((trigger)))
.where(field("key").eq(trigger.uid()))
.execute();
return trigger;
});
}
// Allow to update a trigger from a flow & an abstract trigger
// using forUpdate to avoid the lastTrigger to be updated by another thread
// before doing the update
@@ -295,84 +199,47 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
}
@Override
public ArrayListTotal<Trigger> find(Pageable pageable, String tenantId, List<QueryFilter> filters) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<?> select = generateSelect(context, tenantId, filters);
return this.jdbcRepository.fetchPage(context, select, pageable);
});
var condition = filter(filters, "next_execution_date", Resource.TRIGGER);
return findPage(pageable, tenantId, condition);
}
private SelectConditionStep<?> generateSelect(DSLContext context, String tenantId, List<QueryFilter> filters){
SelectConditionStep<?> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return select.and(filter(filters, "next_execution_date", Resource.TRIGGER));
}
@Override
public ArrayListTotal<Trigger> find(Pageable pageable, String query, String tenantId, String namespace, String flowId, String workerId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
var condition = this.fullTextCondition(query).and(this.defaultFilter());
SelectConditionStep<Record1<Object>> select = context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(this.fullTextCondition(query))
.and(this.defaultFilter(tenantId));
if (namespace != null) {
condition = condition.and(DSL.or(NAMESPACE_FIELD.eq(namespace), NAMESPACE_FIELD.likeIgnoreCase(namespace + ".%")));
}
if (namespace != null) {
select.and(DSL.or(NAMESPACE_FIELD.eq(namespace), NAMESPACE_FIELD.likeIgnoreCase(namespace + ".%")));
}
if (flowId != null) {
condition = condition.and(field("flow_id").eq(flowId));
}
if (flowId != null) {
select.and(field("flow_id").eq(flowId));
}
if (workerId != null) {
condition = condition.and(field("worker_id").eq(workerId));
}
if (workerId != null) {
select.and(field("worker_id").eq(workerId));
}
select.and(this.defaultFilter());
return this.jdbcRepository.fetchPage(context, select, pageable);
});
return findPage(pageable, tenantId, condition);
}
/** {@inheritDoc} */
@Override
public Flux<Trigger> find(String tenantId, List<QueryFilter> filters) {
return Flux.create(
emitter -> this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
SelectConditionStep<?> select = generateSelect(context, tenantId, filters);
select.fetch()
.map(this.jdbcRepository::map)
.forEach(emitter::next);
emitter.complete();
}),
FluxSink.OverflowStrategy.BUFFER
);
var condition = filter(filters, "next_execution_date", Resource.TRIGGER);
return findAsync(tenantId, condition);
}
protected Condition fullTextCondition(String query) {
return query == null ? DSL.trueCondition() : jdbcRepository.fullTextCondition(List.of("fulltext"), query);
}
@Override
protected Condition findQueryCondition(String query) {
return fullTextCondition(query);
}
@Override
protected Condition defaultFilter(String tenantId, boolean allowDeleted) {
return buildTenantCondition(tenantId);
}