mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 02:14:38 -05:00
@@ -6,8 +6,16 @@ import io.micronaut.data.model.Pageable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public interface QueryBuilderInterface<F extends Enum<F>> {
|
||||
default Set<F> dateFields() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
F dateFilterField();
|
||||
|
||||
ArrayListTotal<Map<String, Object>> fetchData(String tenantId, DataFilter<F, ? extends ColumnDescriptor<F>> filter, ZonedDateTime startDate, ZonedDateTime endDate, Pageable pageable) throws IOException;
|
||||
}
|
||||
|
||||
@@ -5,11 +5,15 @@ import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
@@ -27,4 +31,22 @@ public class H2ExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
protected Condition findCondition(String query, Map<String, String> labels) {
|
||||
return H2ExecutionRepositoryService.findCondition(this.jdbcRepository, query, labels);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
switch (groupType) {
|
||||
case MONTH:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM')", Date.class);
|
||||
case WEEK:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'YYYY-ww')", Date.class);
|
||||
case DAY:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd')", Date.class);
|
||||
case HOUR:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd HH:00:00')", Date.class);
|
||||
case MINUTE:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd HH:mm:00')", Date.class);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
@@ -23,5 +27,23 @@ public class H2LogRepository extends AbstractJdbcLogRepository {
|
||||
protected Condition findCondition(String query) {
|
||||
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
switch (groupType) {
|
||||
case MONTH:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM')", Date.class);
|
||||
case WEEK:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'YYYY-ww')", Date.class);
|
||||
case DAY:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd')", Date.class);
|
||||
case HOUR:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd HH:00:00')", Date.class);
|
||||
case MINUTE:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd HH:mm:00')", Date.class);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Singleton
|
||||
@H2RepositoryEnabled
|
||||
@@ -15,5 +20,23 @@ public class H2MetricRepository extends AbstractJdbcMetricRepository {
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
switch (groupType) {
|
||||
case MONTH:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM')", Date.class);
|
||||
case WEEK:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'YYYY-ww')", Date.class);
|
||||
case DAY:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd')", Date.class);
|
||||
case HOUR:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd HH:00:00')", Date.class);
|
||||
case MINUTE:
|
||||
return DSL.field("FORMATDATETIME(\"" + dateField + "\", 'yyyy-MM-dd HH:mm:00')", Date.class);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -10,8 +11,10 @@ import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
@@ -34,4 +37,22 @@ public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
|
||||
protected Field<Integer> weekFromTimestamp(Field<Timestamp> timestampField) {
|
||||
return this.jdbcRepository.weekFromTimestamp(timestampField);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
switch (groupType) {
|
||||
case MONTH:
|
||||
return DSL.field("DATE_FORMAT({0}, '%Y-%m')", Date.class, DSL.field(dateField));
|
||||
case WEEK:
|
||||
return DSL.field("DATE_FORMAT({0}, '%x-%v')", Date.class, DSL.field(dateField));
|
||||
case DAY:
|
||||
return DSL.field("DATE({0})", Date.class, DSL.field(dateField));
|
||||
case HOUR:
|
||||
return DSL.field("DATE_FORMAT({0}, '%Y-%m-%d %H:00:00')", Date.class, DSL.field(dateField));
|
||||
case MINUTE:
|
||||
return DSL.field("DATE_FORMAT({0}, '%Y-%m-%d %H:%i:00')", Date.class, DSL.field(dateField));
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
|
||||
@Singleton
|
||||
@MysqlRepositoryEnabled
|
||||
@@ -26,5 +30,23 @@ public class MysqlLogRepository extends AbstractJdbcLogRepository {
|
||||
query
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
switch (groupType) {
|
||||
case MONTH:
|
||||
return DSL.field("DATE_FORMAT({0}, '%Y-%m')", Date.class, DSL.field(dateField));
|
||||
case WEEK:
|
||||
return DSL.field("DATE_FORMAT({0}, '%x-%v')", Date.class, DSL.field(dateField));
|
||||
case DAY:
|
||||
return DSL.field("DATE({0})", Date.class, DSL.field(dateField));
|
||||
case HOUR:
|
||||
return DSL.field("DATE_FORMAT({0}, '%Y-%m-%d %H:00:00')", Date.class, DSL.field(dateField));
|
||||
case MINUTE:
|
||||
return DSL.field("DATE_FORMAT({0}, '%Y-%m-%d %H:%i:00')", Date.class, DSL.field(dateField));
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.Date;
|
||||
|
||||
@Singleton
|
||||
@MysqlRepositoryEnabled
|
||||
@@ -23,5 +26,23 @@ public class MysqlMetricRepository extends AbstractJdbcMetricRepository {
|
||||
protected Field<Integer> weekFromTimestamp(Field<Timestamp> timestampField) {
|
||||
return this.jdbcRepository.weekFromTimestamp(timestampField);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
switch (groupType) {
|
||||
case MONTH:
|
||||
return DSL.field("DATE_FORMAT({0}, '%Y-%m')", Date.class, DSL.field(dateField));
|
||||
case WEEK:
|
||||
return DSL.field("DATE_FORMAT({0}, '%x-%v')", Date.class, DSL.field(dateField));
|
||||
case DAY:
|
||||
return DSL.field("DATE({0})", Date.class, DSL.field(dateField));
|
||||
case HOUR:
|
||||
return DSL.field("DATE_FORMAT({0}, '%Y-%m-%d %H:00:00')", Date.class, DSL.field(dateField));
|
||||
case MINUTE:
|
||||
return DSL.field("DATE_FORMAT({0}, '%Y-%m-%d %H:%i:00')", Date.class, DSL.field(dateField));
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
@@ -10,12 +11,13 @@ import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
import org.jooq.impl.SQLDataType;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Singleton
|
||||
@PostgresRepositoryEnabled
|
||||
@@ -44,4 +46,22 @@ public class PostgresExecutionRepository extends AbstractJdbcExecutionRepository
|
||||
protected Condition findCondition(String query, Map<String, String> labels) {
|
||||
return PostgresExecutionRepositoryService.findCondition(this.jdbcRepository, query, labels);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
switch (groupType) {
|
||||
case MONTH:
|
||||
return DSL.field("TO_CHAR({0}, 'YYYY-MM')", Date.class, DSL.field(dateField));
|
||||
case WEEK:
|
||||
return DSL.field("TO_CHAR({0}, 'IYYY-IW')", Date.class, DSL.field(dateField));
|
||||
case DAY:
|
||||
return DSL.field("DATE({0})", Date.class, DSL.field(dateField));
|
||||
case HOUR:
|
||||
return DSL.field("TO_CHAR({0}, 'YYYY-MM-DD HH24:00:00')", Date.class, DSL.field(dateField));
|
||||
case MINUTE:
|
||||
return DSL.field("TO_CHAR({0}, 'YYYY-MM-DD HH24:MI:00')", Date.class, DSL.field(dateField));
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,16 +1,19 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -38,4 +41,22 @@ public class PostgresLogRepository extends AbstractJdbcLogRepository {
|
||||
.collect(Collectors.joining(", ")) +
|
||||
")");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
switch (groupType) {
|
||||
case MONTH:
|
||||
return DSL.field("TO_CHAR({0}, 'YYYY-MM')", Date.class, DSL.field(dateField));
|
||||
case WEEK:
|
||||
return DSL.field("TO_CHAR({0}, 'IYYY-IW')", Date.class, DSL.field(dateField));
|
||||
case DAY:
|
||||
return DSL.field("DATE({0})", Date.class, DSL.field(dateField));
|
||||
case HOUR:
|
||||
return DSL.field("TO_CHAR({0}, 'YYYY-MM-DD HH24:00:00')", Date.class, DSL.field(dateField));
|
||||
case MINUTE:
|
||||
return DSL.field("TO_CHAR({0}, 'YYYY-MM-DD HH24:MI:00')", Date.class, DSL.field(dateField));
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Singleton
|
||||
@PostgresRepositoryEnabled
|
||||
@@ -15,5 +20,23 @@ public class PostgresMetricRepository extends AbstractJdbcMetricRepository {
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType) {
|
||||
switch (groupType) {
|
||||
case MONTH:
|
||||
return DSL.field("TO_CHAR({0}, 'YYYY-MM')", Date.class, DSL.field(dateField));
|
||||
case WEEK:
|
||||
return DSL.field("TO_CHAR({0}, 'IYYY-IW')", Date.class, DSL.field(dateField));
|
||||
case DAY:
|
||||
return DSL.field("DATE({0})", Date.class, DSL.field(dateField));
|
||||
case HOUR:
|
||||
return DSL.field("TO_CHAR({0}, 'YYYY-MM-DD HH24:00:00')", Date.class, DSL.field(dateField));
|
||||
case MINUTE:
|
||||
return DSL.field("TO_CHAR({0}, 'YYYY-MM-DD HH24:MI:00')", Date.class, DSL.field(dateField));
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported GroupType: " + groupType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -79,6 +79,16 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
Executions.Fields.TRIGGER_EXECUTION_ID, "trigger_execution_id"
|
||||
);
|
||||
|
||||
@Override
|
||||
public Set<Executions.Fields> dateFields() {
|
||||
return Set.of(Executions.Fields.START_DATE, Executions.Fields.END_DATE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executions.Fields dateFilterField() {
|
||||
return Executions.Fields.START_DATE;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public AbstractJdbcExecutionRepository(
|
||||
io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository,
|
||||
@@ -1119,11 +1129,20 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
Map<String, ? extends ColumnDescriptor<Executions.Fields>> columnsWithoutDate = descriptors.getColumns().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getField() == null || !dateFields().contains(entry.getValue().getField()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// Generate custom fields for date as they probably need formatting
|
||||
List<Field<Date>> dateFields = generateDateFields(descriptors, fieldsMapping, startDate, endDate, dateFields());
|
||||
|
||||
// Init request
|
||||
SelectConditionStep<Record> selectConditionStep = select(
|
||||
context,
|
||||
filterService,
|
||||
descriptors,
|
||||
columnsWithoutDate,
|
||||
dateFields,
|
||||
this.getFieldsMapping(),
|
||||
this.jdbcRepository.getTable(),
|
||||
tenantId
|
||||
@@ -1132,8 +1151,17 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
// Apply Where filter
|
||||
selectConditionStep = where(selectConditionStep, filterService, descriptors, fieldsMapping);
|
||||
|
||||
List<? extends ColumnDescriptor<Executions.Fields>> columnsWithoutDateWithOutAggs = columnsWithoutDate.values().stream()
|
||||
.filter(column -> column.getAgg() == null)
|
||||
.toList();
|
||||
|
||||
// Apply GroupBy for aggregation
|
||||
SelectHavingStep<Record> selectHavingStep = groupBy(selectConditionStep, descriptors, fieldsMapping);
|
||||
SelectHavingStep<Record> selectHavingStep = groupBy(
|
||||
selectConditionStep,
|
||||
columnsWithoutDateWithOutAggs,
|
||||
dateFields,
|
||||
fieldsMapping
|
||||
);
|
||||
|
||||
// Apply OrderBy
|
||||
SelectSeekStepN<Record> selectSeekStep = orderBy(selectHavingStep, descriptors);
|
||||
@@ -1160,7 +1188,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
return NAMESPACE_FIELD;
|
||||
} else if (field.getName().equals(START_DATE_FIELD.getName())) {
|
||||
return START_DATE_FIELD;
|
||||
} else if (field.getName().equals(fieldsMapping.get(Executions.Fields.DURATION))) {
|
||||
}
|
||||
else if (field.getName().equals(fieldsMapping.get(Executions.Fields.DURATION))) {
|
||||
return DSL.field("{0} / 1000", Long.class, field);
|
||||
}
|
||||
return field;
|
||||
@@ -1215,4 +1244,23 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
return selectConditionStep;
|
||||
}
|
||||
}
|
||||
|
||||
abstract protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType);
|
||||
|
||||
protected <F extends Enum<F>> List<Field<Date>> generateDateFields(
|
||||
DataFilter<F, ? extends ColumnDescriptor<F>> descriptors,
|
||||
Map<F, String> fieldsMapping,
|
||||
ZonedDateTime startDate,
|
||||
ZonedDateTime endDate,
|
||||
Set<F> dateFields
|
||||
) {
|
||||
return descriptors.getColumns().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getAgg() == null && dateFields.contains(entry.getValue().getField()))
|
||||
.map(entry -> {
|
||||
Duration duration = Duration.between(startDate, endDate);
|
||||
return formatDateField(fieldsMapping.get(entry.getValue().getField()), DateUtils.groupByType(duration)).as(entry.getKey());
|
||||
})
|
||||
.toList();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,12 +13,13 @@ import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.kestra.plugin.core.dashboard.data.Logs;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.annotation.Nullable;
|
||||
import java.util.stream.Stream;
|
||||
import lombok.Getter;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.*;
|
||||
import org.jooq.impl.DSL;
|
||||
import org.slf4j.event.Level;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
@@ -29,8 +30,7 @@ import java.time.temporal.ChronoUnit;
|
||||
import java.util.Comparator;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
|
||||
|
||||
@@ -64,6 +64,16 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
Logs.Fields.MESSAGE, "message"
|
||||
);
|
||||
|
||||
@Override
|
||||
public Set<Logs.Fields> dateFields() {
|
||||
return Set.of(Logs.Fields.DATE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Logs.Fields dateFilterField() {
|
||||
return Logs.Fields.DATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<LogEntry> find(
|
||||
Pageable pageable,
|
||||
@@ -636,21 +646,39 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
Map<String, ? extends ColumnDescriptor<Logs.Fields>> columnsWithoutDate = descriptors.getColumns().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getField() == null || !dateFields().contains(entry.getValue().getField()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// Generate custom fields for date as they probably need formatting
|
||||
List<Field<Date>> dateFields = generateDateFields(descriptors, fieldsMapping, startDate, endDate, dateFields());
|
||||
|
||||
// Init request
|
||||
SelectConditionStep<Record> selectConditionStep = select(
|
||||
context,
|
||||
this.getFilterService(),
|
||||
descriptors,
|
||||
filterService,
|
||||
columnsWithoutDate,
|
||||
dateFields,
|
||||
this.getFieldsMapping(),
|
||||
this.jdbcRepository.getTable(),
|
||||
tenantId
|
||||
);
|
||||
|
||||
// Apply Where filter
|
||||
selectConditionStep = where(selectConditionStep, this.getFilterService(), descriptors, fieldsMapping);
|
||||
selectConditionStep = where(selectConditionStep, filterService, descriptors, fieldsMapping);
|
||||
|
||||
List<? extends ColumnDescriptor<Logs.Fields>> columnsWithoutDateWithOutAggs = columnsWithoutDate.values().stream()
|
||||
.filter(column -> column.getAgg() == null)
|
||||
.toList();
|
||||
|
||||
// Apply GroupBy for aggregation
|
||||
SelectHavingStep<Record> selectHavingStep = groupBy(selectConditionStep, descriptors, fieldsMapping);
|
||||
SelectHavingStep<Record> selectHavingStep = groupBy(
|
||||
selectConditionStep,
|
||||
columnsWithoutDateWithOutAggs,
|
||||
dateFields,
|
||||
fieldsMapping
|
||||
);
|
||||
|
||||
// Apply OrderBy
|
||||
SelectSeekStepN<Record> selectSeekStep = orderBy(selectHavingStep, descriptors);
|
||||
@@ -658,11 +686,29 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
// Fetch and paginate if provided
|
||||
List<Map<String, Object>> results = fetchSeekStep(selectSeekStep, pageable);
|
||||
|
||||
|
||||
// Fetch total count for pagination
|
||||
int total = context.fetchCount(selectConditionStep);
|
||||
|
||||
return new ArrayListTotal<>(results, total);
|
||||
});
|
||||
}
|
||||
|
||||
abstract protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType);
|
||||
|
||||
protected <F extends Enum<F>> List<Field<Date>> generateDateFields(
|
||||
DataFilter<F, ? extends ColumnDescriptor<F>> descriptors,
|
||||
Map<F, String> fieldsMapping,
|
||||
ZonedDateTime startDate,
|
||||
ZonedDateTime endDate,
|
||||
Set<F> dateFields
|
||||
) {
|
||||
return descriptors.getColumns().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getAgg() == null && dateFields.contains(entry.getValue().getField()))
|
||||
.map(entry -> {
|
||||
Duration duration = Duration.between(startDate, endDate);
|
||||
return formatDateField(fieldsMapping.get(entry.getValue().getField()), DateUtils.groupByType(duration)).as(entry.getKey());
|
||||
})
|
||||
.toList();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,10 +24,9 @@ import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface {
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository;
|
||||
@@ -54,6 +53,16 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
Metrics.Fields.DATE, "timestamp"
|
||||
);
|
||||
|
||||
@Override
|
||||
public Set<Metrics.Fields> dateFields() {
|
||||
return Set.of(Metrics.Fields.DATE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metrics.Fields dateFilterField() {
|
||||
return Metrics.Fields.DATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<MetricEntry> findByExecutionId(String tenantId, String executionId, Pageable pageable) {
|
||||
return this.query(
|
||||
@@ -347,21 +356,39 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
Map<String, ? extends ColumnDescriptor<Metrics.Fields>> columnsWithoutDate = descriptors.getColumns().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getField() == null || !dateFields().contains(entry.getValue().getField()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// Generate custom fields for date as they probably need formatting
|
||||
List<Field<Date>> dateFields = generateDateFields(descriptors, fieldsMapping, startDate, endDate, dateFields());
|
||||
|
||||
// Init request
|
||||
SelectConditionStep<Record> selectConditionStep = select(
|
||||
context,
|
||||
this.getFilterService(),
|
||||
descriptors,
|
||||
filterService,
|
||||
columnsWithoutDate,
|
||||
dateFields,
|
||||
this.getFieldsMapping(),
|
||||
this.jdbcRepository.getTable(),
|
||||
tenantId
|
||||
);
|
||||
|
||||
// Apply Where filter
|
||||
selectConditionStep = where(selectConditionStep, this.getFilterService(), descriptors, fieldsMapping);
|
||||
selectConditionStep = where(selectConditionStep, filterService, descriptors, fieldsMapping);
|
||||
|
||||
List<? extends ColumnDescriptor<Metrics.Fields>> columnsWithoutDateWithOutAggs = columnsWithoutDate.values().stream()
|
||||
.filter(column -> column.getAgg() == null)
|
||||
.toList();
|
||||
|
||||
// Apply GroupBy for aggregation
|
||||
SelectHavingStep<Record> selectHavingStep = groupBy(selectConditionStep, descriptors, fieldsMapping);
|
||||
SelectHavingStep<Record> selectHavingStep = groupBy(
|
||||
selectConditionStep,
|
||||
columnsWithoutDateWithOutAggs,
|
||||
dateFields,
|
||||
fieldsMapping
|
||||
);
|
||||
|
||||
// Apply OrderBy
|
||||
SelectSeekStepN<Record> selectSeekStep = orderBy(selectHavingStep, descriptors);
|
||||
@@ -369,11 +396,29 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
// Fetch and paginate if provided
|
||||
List<Map<String, Object>> results = fetchSeekStep(selectSeekStep, pageable);
|
||||
|
||||
|
||||
// Fetch total count for pagination
|
||||
int total = context.fetchCount(selectConditionStep);
|
||||
|
||||
return new ArrayListTotal<>(results, total);
|
||||
});
|
||||
}
|
||||
|
||||
abstract protected Field<Date> formatDateField(String dateField, DateUtils.GroupType groupType);
|
||||
|
||||
protected <F extends Enum<F>> List<Field<Date>> generateDateFields(
|
||||
DataFilter<F, ? extends ColumnDescriptor<F>> descriptors,
|
||||
Map<F, String> fieldsMapping,
|
||||
ZonedDateTime startDate,
|
||||
ZonedDateTime endDate,
|
||||
Set<F> dateFields
|
||||
) {
|
||||
return descriptors.getColumns().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getAgg() == null && dateFields.contains(entry.getValue().getField()))
|
||||
.map(entry -> {
|
||||
Duration duration = Duration.between(startDate, endDate);
|
||||
return formatDateField(fieldsMapping.get(entry.getValue().getField()), DateUtils.groupByType(duration)).as(entry.getKey());
|
||||
})
|
||||
.toList();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractJdbcRepository {
|
||||
|
||||
@@ -39,7 +40,7 @@ public abstract class AbstractJdbcRepository {
|
||||
}
|
||||
|
||||
protected Condition defaultFilterWithNoACL(String tenantId) {
|
||||
return defaultFilterWithNoACL(tenantId, false);
|
||||
return defaultFilterWithNoACL(tenantId, false);
|
||||
}
|
||||
|
||||
protected Condition defaultFilterWithNoACL(String tenantId, boolean deleted) {
|
||||
@@ -100,24 +101,27 @@ public abstract class AbstractJdbcRepository {
|
||||
protected <F extends Enum<F>> SelectConditionStep<Record> select(
|
||||
DSLContext context,
|
||||
JdbcFilterService filterService,
|
||||
DataFilter<F, ? extends ColumnDescriptor<F>> descriptors,
|
||||
Map<String, ? extends ColumnDescriptor<F>> descriptors,
|
||||
List<Field<Date>> dateFields,
|
||||
Map<F, String> fieldsMapping,
|
||||
Table<Record> table,
|
||||
String tenantId) {
|
||||
|
||||
return context
|
||||
.select(
|
||||
descriptors.getColumns().entrySet().stream()
|
||||
.map(entry -> {
|
||||
ColumnDescriptor<F> col = entry.getValue();
|
||||
String key = entry.getKey();
|
||||
Field<?> field = columnToField(col, fieldsMapping);
|
||||
if (col.getAgg() != null) {
|
||||
field = filterService.buildAggregation(field, col.getAgg());
|
||||
}
|
||||
return field.as(key);
|
||||
})
|
||||
.toList()
|
||||
Stream.concat(
|
||||
descriptors.entrySet().stream()
|
||||
.map(entry -> {
|
||||
ColumnDescriptor<F> col = entry.getValue();
|
||||
String key = entry.getKey();
|
||||
Field<?> field = columnToField(col, fieldsMapping);
|
||||
if (col.getAgg() != null) {
|
||||
field = filterService.buildAggregation(field, col.getAgg());
|
||||
}
|
||||
return field.as(key);
|
||||
}),
|
||||
dateFields.stream()
|
||||
).toList()
|
||||
)
|
||||
.from(table)
|
||||
.where(this.defaultFilter(tenantId));
|
||||
@@ -128,10 +132,10 @@ public abstract class AbstractJdbcRepository {
|
||||
* Used in the fetchData() method
|
||||
*
|
||||
* @param selectConditionStep the select condition step to which the filters will be applied
|
||||
* @param jdbcFilterService the service used to apply the filters
|
||||
* @param descriptors the data filter containing the filter conditions
|
||||
* @param fieldsMapping a map of field enums to their corresponding database column names
|
||||
* @param <F> the type of the fields enum
|
||||
* @param jdbcFilterService the service used to apply the filters
|
||||
* @param descriptors the data filter containing the filter conditions
|
||||
* @param fieldsMapping a map of field enums to their corresponding database column names
|
||||
* @param <F> the type of the fields enum
|
||||
* @return the select condition step with the applied filters
|
||||
*/
|
||||
protected <F extends Enum<F>> SelectConditionStep<Record> where(SelectConditionStep<Record> selectConditionStep, JdbcFilterService jdbcFilterService, DataFilter<F, ? extends ColumnDescriptor<F>> descriptors, Map<F, String> fieldsMapping) {
|
||||
@@ -143,27 +147,36 @@ public abstract class AbstractJdbcRepository {
|
||||
* Used in the fetchData() method
|
||||
*
|
||||
* @param selectConditionStep the select condition step to which the grouping will be applied
|
||||
* @param descriptors the data filter containing the column descriptors for grouping
|
||||
* @param fieldsMapping a map of field enums to their corresponding database column names
|
||||
* @param <F> the type of the fields enum
|
||||
* @param columnsNoDate the data filter containing the column descriptors for grouping
|
||||
* @param dateFields the data filter containing the column descriptors for grouping
|
||||
* @param fieldsMapping a map of field enums to their corresponding database column names
|
||||
* @param <F> the type of the fields enum
|
||||
* @return the select having step with the applied grouping
|
||||
*/
|
||||
protected <F extends Enum<F>> SelectHavingStep<Record> groupBy(SelectConditionStep<Record> selectConditionStep, DataFilter<F, ? extends ColumnDescriptor<F>> descriptors, Map<F, String> fieldsMapping) {
|
||||
protected <F extends Enum<F>> SelectHavingStep<Record> groupBy(
|
||||
SelectConditionStep<Record> selectConditionStep,
|
||||
List<? extends ColumnDescriptor<F>> columnsNoDate,
|
||||
List<Field<Date>> dateFields,
|
||||
Map<F, String> fieldsMapping
|
||||
) {
|
||||
return selectConditionStep.groupBy(
|
||||
descriptors.getColumns().values().stream()
|
||||
.filter(col -> col.getAgg() == null)
|
||||
.map(col -> field(fieldsMapping.get(col.getField())))
|
||||
.toList()
|
||||
Stream.concat(
|
||||
columnsNoDate.stream()
|
||||
.filter(col -> col.getAgg() == null)
|
||||
.map(col -> field(fieldsMapping.get(col.getField()))),
|
||||
dateFields.stream()
|
||||
).toList()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Applies ordering to the given select step based on the provided descriptors.
|
||||
* Used in the fetchData() method
|
||||
*
|
||||
* @param selectHavingStep the select step to which the ordering will be applied
|
||||
* @param descriptors the data filter containing the order by information
|
||||
* @param <F> the type of the fields enum
|
||||
* @param descriptors the data filter containing the order by information
|
||||
* @param <F> the type of the fields enum
|
||||
* @return the select step with the applied ordering
|
||||
*/
|
||||
protected <F extends Enum<F>> SelectSeekStepN<Record> orderBy(SelectHavingStep<Record> selectHavingStep, DataFilter<F, ? extends ColumnDescriptor<F>> descriptors) {
|
||||
@@ -186,7 +199,7 @@ public abstract class AbstractJdbcRepository {
|
||||
* Used in the fetchData() method
|
||||
*
|
||||
* @param selectSeekStep the select step to fetch the results from
|
||||
* @param pageable the pageable object containing the pagination information
|
||||
* @param pageable the pageable object containing the pagination information
|
||||
* @return the list of fetched results
|
||||
*/
|
||||
protected List<Map<String, Object>> fetchSeekStep(SelectSeekStepN<Record> selectSeekStep, @Nullable Pageable pageable) {
|
||||
@@ -201,6 +214,4 @@ public abstract class AbstractJdbcRepository {
|
||||
protected <F extends Enum<F>> Field<?> columnToField(ColumnDescriptor<?> column, Map<F, String> fieldsMapping) {
|
||||
return column.getField() != null ? field(fieldsMapping.get(column.getField())) : null;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -205,7 +205,7 @@ public class DashboardController {
|
||||
) throws IOException {
|
||||
Chart<?> parsed = YAML_PARSER.parse(chart, Chart.class);
|
||||
|
||||
return PagedResults.of(this.dashboardRepository.generate(tenantService.resolveTenant(), (DataChart) parsed, ZonedDateTime.now(), ZonedDateTime.now().minusDays(7), null));
|
||||
return PagedResults.of(this.dashboardRepository.generate(tenantService.resolveTenant(), (DataChart) parsed, ZonedDateTime.now().minusDays(8), ZonedDateTime.now(), null));
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
|
||||
Reference in New Issue
Block a user