mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat: implement Flows as a DataSource for dashboards (#11439)
* feat: implement Flows as a DataSource for dashboards * chore: review changes * fix: method signature changes from another commit apply in new flow fetchData method
This commit is contained in:
@@ -4,6 +4,7 @@ import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.SearchResult;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.plugin.core.dashboard.data.Flows;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -11,7 +12,7 @@ import jakarta.validation.ConstraintViolationException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface FlowRepositoryInterface {
|
||||
public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fields> {
|
||||
|
||||
Optional<Flow> findById(String tenantId, String namespace, String id, Optional<Integer> revision, Boolean allowDeleted);
|
||||
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
package io.kestra.plugin.core.dashboard.data;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.ColumnDescriptor;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
@Schema(
|
||||
title = "Display Flow data in a dashboard chart."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display a chart with a list of Flows.",
|
||||
full = true,
|
||||
code = { """
|
||||
charts:
|
||||
- id: list_flows
|
||||
type: io.kestra.plugin.core.dashboard.chart.Table
|
||||
data:
|
||||
type: io.kestra.plugin.core.dashboard.data.Flows
|
||||
columns:
|
||||
namespace:
|
||||
field: NAMESPACE
|
||||
id:
|
||||
field: ID
|
||||
"""
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
@JsonTypeName("Flows")
|
||||
public class Flows<C extends ColumnDescriptor<Flows.Fields>> extends DataFilter<Flows.Fields, C> implements IFlows {
|
||||
@Override
|
||||
public Class<? extends QueryBuilderInterface<Fields>> repositoryClass() {
|
||||
return FlowRepositoryInterface.class;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package io.kestra.plugin.core.dashboard.data;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.dashboards.ColumnDescriptor;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.QueryBuilderInterface;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
@Schema(
|
||||
title = "Display a chart with Flows KPI.",
|
||||
description = "Change."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display count of Flows.",
|
||||
full = true,
|
||||
code = { """
|
||||
charts:
|
||||
- id: kpi
|
||||
type: io.kestra.plugin.core.dashboard.chart.KPI
|
||||
data:
|
||||
type: io.kestra.plugin.core.dashboard.data.FlowsKPI
|
||||
columns:
|
||||
field: ID
|
||||
agg: COUNT
|
||||
"""
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
@JsonTypeName("FlowsKPI")
|
||||
public class FlowsKPI<C extends ColumnDescriptor<FlowsKPI.Fields>> extends DataFilterKPI<FlowsKPI.Fields, C> implements IFlows {
|
||||
@Override
|
||||
public Class<? extends QueryBuilderInterface<Fields>> repositoryClass() {
|
||||
return FlowRepositoryInterface.class;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package io.kestra.plugin.core.dashboard.data;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public interface IFlows extends IData<IFlows.Fields> {
|
||||
|
||||
default List<AbstractFilter<IFlows.Fields>> whereWithGlobalFilters(List<QueryFilter> filters, ZonedDateTime startDate, ZonedDateTime endDate, List<AbstractFilter<IFlows.Fields>> where) {
|
||||
List<AbstractFilter<IFlows.Fields>> updatedWhere = where != null ? new ArrayList<>(where) : new ArrayList<>();
|
||||
|
||||
if (ListUtils.isEmpty(filters)) {
|
||||
return updatedWhere;
|
||||
}
|
||||
|
||||
List<QueryFilter> namespaceFilters = filters.stream().filter(f -> f.field().equals(QueryFilter.Field.NAMESPACE)).toList();
|
||||
if (!namespaceFilters.isEmpty()) {
|
||||
updatedWhere.removeIf(filter -> filter.getField().equals(IFlows.Fields.NAMESPACE));
|
||||
namespaceFilters.forEach(f -> {
|
||||
updatedWhere.add(f.toDashboardFilterBuilder(IFlows.Fields.NAMESPACE, f.value()));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
return updatedWhere;
|
||||
}
|
||||
|
||||
enum Fields {
|
||||
ID,
|
||||
NAMESPACE,
|
||||
REVISION
|
||||
}
|
||||
}
|
||||
@@ -1,16 +1,15 @@
|
||||
package io.kestra.repository.h2;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcFlowRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
@@ -18,8 +17,9 @@ import java.util.Map;
|
||||
public class H2FlowRepository extends AbstractJdbcFlowRepository {
|
||||
@Inject
|
||||
public H2FlowRepository(@Named("flows") H2Repository<FlowInterface> repository,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
ApplicationContext applicationContext,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcFlowRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
@@ -18,8 +17,9 @@ import java.util.Map;
|
||||
public class MysqlFlowRepository extends AbstractJdbcFlowRepository {
|
||||
@Inject
|
||||
public MysqlFlowRepository(@Named("flows") MysqlRepository<FlowInterface> repository,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
ApplicationContext applicationContext,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcFlowRepository;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.Condition;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
@@ -18,8 +17,9 @@ import java.util.Map;
|
||||
public class PostgresFlowRepository extends AbstractJdbcFlowRepository {
|
||||
@Inject
|
||||
public PostgresFlowRepository(@Named("flows") PostgresRepository<FlowInterface> repository,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
ApplicationContext applicationContext,
|
||||
JdbcFilterService filterService) {
|
||||
super(repository, applicationContext, filterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.models.dashboards.ColumnDescriptor;
|
||||
import io.kestra.core.models.dashboards.Dashboard;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
@@ -37,7 +36,7 @@ public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcReposi
|
||||
private final ApplicationEventPublisher<CrudEvent<Dashboard>> eventPublisher;
|
||||
|
||||
List<QueryBuilderInterface<?>> queryBuilders;
|
||||
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@@ -45,8 +44,8 @@ public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcReposi
|
||||
public long count() {
|
||||
return jdbcRepository.count(this.defaultFilter());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Optional<Dashboard> get(String tenantId, String id) {
|
||||
return jdbcRepository
|
||||
@@ -151,7 +150,7 @@ public abstract class AbstractJdbcDashboardRepository extends AbstractJdbcReposi
|
||||
|
||||
this.jdbcRepository.persist(dashboard, fields);
|
||||
this.eventPublisher.publishEvent(CrudEvent.of(previousDashboard, dashboard));
|
||||
|
||||
|
||||
return dashboard;
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,10 @@ import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.QueryFilter.Resource;
|
||||
import io.kestra.core.models.SearchResult;
|
||||
import io.kestra.core.models.dashboards.ColumnDescriptor;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
@@ -23,14 +27,19 @@ import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import io.kestra.jdbc.JdbcMapper;
|
||||
import io.kestra.jdbc.services.JdbcFilterService;
|
||||
import io.kestra.plugin.core.dashboard.data.Flows;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.Getter;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jooq.*;
|
||||
@@ -38,7 +47,9 @@ import org.jooq.Record;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
@@ -59,10 +70,16 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
private final NamespaceUtils namespaceUtils;
|
||||
private final PluginDefaultService pluginDefaultService;
|
||||
|
||||
private final JdbcFilterService filterService;
|
||||
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<FlowInterface> jdbcRepository;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public AbstractJdbcFlowRepository(io.kestra.jdbc.AbstractJdbcRepository<FlowInterface> jdbcRepository, ApplicationContext applicationContext) {
|
||||
public AbstractJdbcFlowRepository(
|
||||
io.kestra.jdbc.AbstractJdbcRepository<FlowInterface> jdbcRepository,
|
||||
ApplicationContext applicationContext,
|
||||
JdbcFilterService filterService
|
||||
) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
this.modelValidator = applicationContext.getBean(ModelValidator.class);
|
||||
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
|
||||
@@ -75,7 +92,8 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
String namespace = record.get("namespace", String.class);
|
||||
String tenantId = record.get("tenant_id", String.class);
|
||||
try {
|
||||
Map<String, Object> map = MAPPER.readValue(source, new TypeReference<>(){});
|
||||
Map<String, Object> map = MAPPER.readValue(source, new TypeReference<>() {
|
||||
});
|
||||
|
||||
// Inject default plugin 'version' props before converting
|
||||
// to flow to correctly resolve to plugin type.
|
||||
@@ -97,6 +115,24 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
}
|
||||
}
|
||||
});
|
||||
this.filterService = filterService;
|
||||
}
|
||||
|
||||
@Getter
|
||||
private final Map<Flows.Fields, String> fieldsMapping = Map.of(
|
||||
Flows.Fields.ID, "key",
|
||||
Flows.Fields.NAMESPACE, "namespace",
|
||||
Flows.Fields.REVISION, "revision"
|
||||
);
|
||||
|
||||
@Override
|
||||
public Set<Flows.Fields> dateFields() {
|
||||
return Set.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flows.Fields dateFilterField() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -123,7 +159,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.and(field("id", String.class).eq(id))
|
||||
);
|
||||
|
||||
return this.jdbcRepository.fetchOne(from).map(it -> (Flow)it);
|
||||
return this.jdbcRepository.fetchOne(from).map(it -> (Flow) it);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -137,21 +173,21 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
|
||||
from = revision
|
||||
.map(integer -> context
|
||||
.select(VALUE_FIELD, NAMESPACE_FIELD, TENANT_FIELD)
|
||||
.from(jdbcRepository.getTable())
|
||||
.where(this.noAclDefaultFilter(tenantId))
|
||||
.and(NAMESPACE_FIELD.eq(namespace))
|
||||
.and(field("id", String.class).eq(id))
|
||||
.and(field("revision", Integer.class).eq(integer))
|
||||
).orElseGet(() -> context
|
||||
.select(VALUE_FIELD, NAMESPACE_FIELD, TENANT_FIELD)
|
||||
.from(fromLastRevision(true))
|
||||
.where(this.noAclDefaultFilter(tenantId))
|
||||
.and(NAMESPACE_FIELD.eq(namespace))
|
||||
.and(field("id", String.class).eq(id))
|
||||
.select(VALUE_FIELD, NAMESPACE_FIELD, TENANT_FIELD)
|
||||
.from(jdbcRepository.getTable())
|
||||
.where(this.noAclDefaultFilter(tenantId))
|
||||
.and(NAMESPACE_FIELD.eq(namespace))
|
||||
.and(field("id", String.class).eq(id))
|
||||
.and(field("revision", Integer.class).eq(integer))
|
||||
).orElseGet(() -> context
|
||||
.select(VALUE_FIELD, NAMESPACE_FIELD, TENANT_FIELD)
|
||||
.from(fromLastRevision(true))
|
||||
.where(this.noAclDefaultFilter(tenantId))
|
||||
.and(NAMESPACE_FIELD.eq(namespace))
|
||||
.and(field("id", String.class).eq(id))
|
||||
);
|
||||
|
||||
return this.jdbcRepository.fetchOne(from).map(it -> (Flow)it);
|
||||
return this.jdbcRepository.fetchOne(from).map(it -> (Flow) it);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -256,7 +292,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
|
||||
@Override
|
||||
public List<FlowWithSource> findRevisions(String tenantId, String namespace, String id) {
|
||||
return jdbcRepository
|
||||
return jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
Select<Record4<String, String, String, String>> select = DSL
|
||||
@@ -269,7 +305,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.orderBy(field("revision", Integer.class).asc());
|
||||
|
||||
return select.fetch()
|
||||
.map(record -> FlowWithSource.of((Flow)jdbcRepository.map(record), record.get(SOURCE_FIELD)));
|
||||
.map(record -> FlowWithSource.of((Flow) jdbcRepository.map(record), record.get(SOURCE_FIELD)));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -301,7 +337,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.from(fromLastRevision(true))
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
return (List)this.jdbcRepository.fetch(select);
|
||||
return (List) this.jdbcRepository.fetch(select);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -326,7 +362,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
select.fetch().forEach(
|
||||
item -> {
|
||||
try {
|
||||
Flow flow = (Flow)this.jdbcRepository.map(item);
|
||||
Flow flow = (Flow) this.jdbcRepository.map(item);
|
||||
flows.add(flow);
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to load the following flow:\n{}", item.get("value", String.class), e);
|
||||
@@ -354,7 +390,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
return select.fetch().map(record -> FlowWithSource.of(
|
||||
(Flow)jdbcRepository.map(record),
|
||||
(Flow) jdbcRepository.map(record),
|
||||
record.get(SOURCE_FIELD)
|
||||
));
|
||||
});
|
||||
@@ -377,7 +413,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.where(this.noAclDefaultFilter(tenantId));
|
||||
|
||||
return select.fetch().map(record -> FlowWithSource.of(
|
||||
(Flow)jdbcRepository.map(record),
|
||||
(Flow) jdbcRepository.map(record),
|
||||
record.get(SOURCE_FIELD)
|
||||
));
|
||||
});
|
||||
@@ -403,7 +439,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
// That's why we will try to deserialize each flow and log an error but not crash in case of exception.
|
||||
return select.fetch().stream().map(record -> {
|
||||
try {
|
||||
return FlowWithSource.of((Flow)jdbcRepository.map(record), record.get("source_code", String.class));
|
||||
return FlowWithSource.of((Flow) jdbcRepository.map(record), record.get("source_code", String.class));
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to load the following flow:\n{}", record.get("value", String.class), e);
|
||||
return null;
|
||||
@@ -420,9 +456,9 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.transactionResult(configuration -> {
|
||||
SelectConditionStep<Record3<Object, Object, Object>> select =
|
||||
findByNamespaceSelect(namespace)
|
||||
.and(this.defaultFilter(tenantId));
|
||||
.and(this.defaultFilter(tenantId));
|
||||
|
||||
return (List)this.jdbcRepository.fetch(select);
|
||||
return (List) this.jdbcRepository.fetch(select);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -436,7 +472,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
findByNamespacePrefixSelect(namespacePrefix)
|
||||
.and(this.defaultFilter(tenantId));
|
||||
|
||||
return (List)this.jdbcRepository.fetch(select);
|
||||
return (List) this.jdbcRepository.fetch(select);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -447,10 +483,10 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.transactionResult(configuration -> {
|
||||
SelectConditionStep<Record3<Object, Object, Object>> select =
|
||||
findByNamespaceSelect(namespace)
|
||||
.and(this.defaultExecutionFilter(tenantId));
|
||||
.and(this.defaultExecutionFilter(tenantId));
|
||||
|
||||
return this.jdbcRepository.fetch(select);
|
||||
}).stream().map(it -> (Flow)it).map(FlowForExecution::of).toList();
|
||||
}).stream().map(it -> (Flow) it).map(FlowForExecution::of).toList();
|
||||
}
|
||||
|
||||
private SelectConditionStep<Record3<Object, Object, Object>> findByNamespaceSelect(String namespace) {
|
||||
@@ -491,7 +527,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.and(this.defaultFilter(tenantId));
|
||||
|
||||
return select.fetch().map(record -> FlowWithSource.of(
|
||||
(Flow)jdbcRepository.map(record),
|
||||
(Flow) jdbcRepository.map(record),
|
||||
record.get(SOURCE_FIELD)
|
||||
));
|
||||
});
|
||||
@@ -515,7 +551,7 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.and(this.defaultFilter(tenantId));
|
||||
|
||||
return select.fetch().map(record -> FlowWithSource.of(
|
||||
(Flow)jdbcRepository.map(record),
|
||||
(Flow) jdbcRepository.map(record),
|
||||
record.get(SOURCE_FIELD)
|
||||
));
|
||||
});
|
||||
@@ -584,8 +620,8 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
context,
|
||||
select,
|
||||
pageable,
|
||||
record ->FlowWithSource.of(
|
||||
(Flow)jdbcRepository.map(record),
|
||||
record -> FlowWithSource.of(
|
||||
(Flow) jdbcRepository.map(record),
|
||||
record.get("source_code", String.class)
|
||||
)
|
||||
);
|
||||
@@ -794,4 +830,95 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
|
||||
.where(defaultFilterWithNoACL(tenantId, false)));
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<Map<String, Object>> fetchData(
|
||||
String tenantId,
|
||||
DataFilter<Flows.Fields, ? extends ColumnDescriptor<Flows.Fields>> descriptors,
|
||||
ZonedDateTime startDate,
|
||||
ZonedDateTime endDate,
|
||||
Pageable pageable
|
||||
) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
Map<String, ? extends ColumnDescriptor<Flows.Fields>> columnsWithoutDate = descriptors.getColumns().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getField() == null || !dateFields().contains(entry.getValue().getField()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
|
||||
boolean hasAgg = descriptors.getColumns().entrySet().stream().anyMatch(col -> col.getValue().getAgg() != null);
|
||||
// Generate custom fields for date as they probably need formatting
|
||||
// If they don't have aggs, we format datetime to minutes
|
||||
List<Field<Date>> dateFields = generateDateFields(descriptors, fieldsMapping, startDate, endDate, dateFields(), hasAgg ? null : DateUtils.GroupType.MINUTE);
|
||||
|
||||
|
||||
// Init request
|
||||
SelectConditionStep<Record> selectConditionStep = select(
|
||||
context,
|
||||
filterService,
|
||||
columnsWithoutDate,
|
||||
dateFields,
|
||||
this.getFieldsMapping(),
|
||||
this.jdbcRepository.getTable(),
|
||||
tenantId
|
||||
);
|
||||
|
||||
// Apply Where filter
|
||||
selectConditionStep = where(selectConditionStep, filterService, descriptors.getWhere(), fieldsMapping);
|
||||
|
||||
List<? extends ColumnDescriptor<Flows.Fields>> columnsWithoutDateWithOutAggs = columnsWithoutDate.values().stream()
|
||||
.filter(column -> column.getAgg() == null)
|
||||
.toList();
|
||||
|
||||
// Apply GroupBy for aggregation
|
||||
SelectHavingStep<Record> selectHavingStep = groupBy(
|
||||
selectConditionStep,
|
||||
columnsWithoutDateWithOutAggs,
|
||||
dateFields,
|
||||
fieldsMapping
|
||||
);
|
||||
|
||||
// Apply OrderBy
|
||||
SelectSeekStepN<Record> selectSeekStep = orderBy(selectHavingStep, descriptors);
|
||||
|
||||
// Fetch and paginate if provided
|
||||
return fetchSeekStep(selectSeekStep, pageable);
|
||||
});
|
||||
}
|
||||
|
||||
public Double fetchValue(String tenantId, DataFilterKPI<Flows.Fields, ? extends ColumnDescriptor<Flows.Fields>> dataFilter, ZonedDateTime startDate, ZonedDateTime endDate, boolean numeratorFilter) {
|
||||
return this.jdbcRepository.getDslContextWrapper().transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
ColumnDescriptor<Flows.Fields> columnDescriptor = dataFilter.getColumns();
|
||||
String columnKey = this.getFieldsMapping().get(columnDescriptor.getField());
|
||||
Field<?> field = columnToField(columnDescriptor, getFieldsMapping());
|
||||
if (columnDescriptor.getAgg() != null) {
|
||||
field = filterService.buildAggregation(field, columnDescriptor.getAgg());
|
||||
}
|
||||
|
||||
List<AbstractFilter<Flows.Fields>> filters = new ArrayList<>(ListUtils.emptyOnNull(dataFilter.getWhere()));
|
||||
if (numeratorFilter) {
|
||||
filters.addAll(dataFilter.getNumerator());
|
||||
}
|
||||
|
||||
SelectConditionStep selectStep = context
|
||||
.select(field)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
var selectConditionStep = where(
|
||||
selectStep,
|
||||
filterService,
|
||||
filters,
|
||||
getFieldsMapping()
|
||||
);
|
||||
|
||||
Record result = selectConditionStep.fetchOne();
|
||||
|
||||
return result != null ? result.getValue(field, Double.class) : null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user