feat(jdbc): adapt ui to remove lucene expression

This commit is contained in:
Ludovic DEHON
2022-06-01 13:48:08 +02:00
parent 6ea7118fdf
commit eb8868143b
43 changed files with 616 additions and 291 deletions

2
.gitignore vendored
View File

@@ -17,7 +17,7 @@ out/
### Configurations
docker-compose.override.yml
cli/src/main/resources/application-override.yml
cli/src/main/resources/application-*.yml
*/src/test/resources/application-test.yml
/local

View File

@@ -25,19 +25,31 @@ public class ExecutionUsage {
.atStartOfDay(ZoneId.systemDefault())
.minusDays(1);
List<DailyExecutionStatistics> dailyTaskRunsCount = null;
try {
dailyTaskRunsCount = executionRepository.dailyStatistics(
null,
null,
null,
startDate,
ZonedDateTime.now(),
true
);
} catch (UnsupportedOperationException ignored) {
}
return ExecutionUsage.builder()
.dailyExecutionsCount(executionRepository.dailyStatistics(
"*",
null,
null,
null,
startDate,
ZonedDateTime.now(),
false
))
.dailyTaskRunsCount(executionRepository.dailyStatistics(
"*",
startDate,
ZonedDateTime.now(),
true
))
.dailyTaskRunsCount(dailyTaskRunsCount)
.build();
}
}

View File

@@ -12,23 +12,64 @@ import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nullable;
public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Execution> {
Optional<Execution> findById(String id);
ArrayListTotal<Execution> findByFlowId(String namespace, String id, Pageable pageable);
ArrayListTotal<Execution> find(String query, Pageable pageable, List<State.Type> state);
ArrayListTotal<Execution> find(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
);
ArrayListTotal<TaskRun> findTaskRun(String query, Pageable pageable, List<State.Type> state);
ArrayListTotal<TaskRun> findTaskRun(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> states
);
Integer maxTaskRunSetting();
List<DailyExecutionStatistics> dailyStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean isTaskRun);
List<DailyExecutionStatistics> dailyStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean isTaskRun
);
Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean groupByNamespaceOnly);
Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean groupByNamespaceOnly
);
List<ExecutionCount> executionCounts(List<Flow> flows, String query, ZonedDateTime startDate, ZonedDateTime endDate);
List<ExecutionCount> executionCounts(
List<Flow> flows,
@Nullable List<State.Type> states,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
);
Execution save(Execution flow);
default Function<String, String> sortMapping() throws IllegalArgumentException {
return s -> s;
}
}

View File

@@ -5,6 +5,7 @@ import io.micronaut.data.model.Pageable;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import javax.annotation.Nullable;
import javax.validation.ConstraintViolationException;
import java.util.List;
import java.util.Optional;
@@ -41,9 +42,9 @@ public interface FlowRepositoryInterface {
List<Flow> findByNamespace(String namespace);
ArrayListTotal<Flow> find(String query, Pageable pageable);
ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nullable String namespace);
ArrayListTotal<SearchResult<Flow>> findSourceCode(String query, Pageable pageable);
ArrayListTotal<SearchResult<Flow>> findSourceCode(Pageable pageable, @Nullable String query, @Nullable String namespace);
List<String> findDistinctNamespace();

View File

@@ -4,7 +4,9 @@ import io.micronaut.data.model.Pageable;
import io.kestra.core.models.executions.LogEntry;
import org.slf4j.event.Level;
import java.time.ZonedDateTime;
import java.util.List;
import javax.annotation.Nullable;
public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry> {
List<LogEntry> findByExecutionId(String id, Level minLevel);
@@ -13,7 +15,13 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
List<LogEntry> findByExecutionIdAndTaskRunId(String executionId, String taskRunId, Level minLevel);
ArrayListTotal<LogEntry> find(String query, Pageable pageable, Level minLevel);
ArrayListTotal<LogEntry> find(
Pageable pageable,
@Nullable String query,
@Nullable Level minLevel,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
);
LogEntry save(LogEntry log);
}

View File

@@ -1,16 +1,14 @@
package io.kestra.runner.memory;
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@MemoryQueueEnabled
public class MemorySchedulerExecutionState implements SchedulerExecutionStateInterface {
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
@Inject
private ExecutionRepositoryInterface executionRepository;

View File

@@ -118,18 +118,9 @@ public class Counts extends Task implements RunnableTask<Counts.Output> {
.getApplicationContext()
.getBean(ExecutionRepositoryInterface.class);
String query = null;
if (this.states != null) {
query = "state.current:(" + this.states
.stream()
.map(Enum::name)
.collect(Collectors.joining(" OR "))
+ ")";
}
List<ExecutionCount> executionCounts = executionRepository.executionCounts(
flows,
query,
this.states,
startDate != null ? ZonedDateTime.parse(runContext.render(startDate)) : null,
endDate != null ? ZonedDateTime.parse(runContext.render(endDate)) : null
);

View File

@@ -105,7 +105,7 @@ public abstract class AbstractExecutionRepositoryTest {
protected void find() {
inject();
ArrayListTotal<Execution> executions = executionRepository.find("*", Pageable.from(1, 10), null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));
}
@@ -114,7 +114,7 @@ public abstract class AbstractExecutionRepositoryTest {
protected void findTaskRun() {
inject();
ArrayListTotal<TaskRun> executions = executionRepository.findTaskRun("*", Pageable.from(1, 10), null);
ArrayListTotal<TaskRun> executions = executionRepository.findTaskRun(Pageable.from(1, 10), null, null, null, null, null, null);
assertThat(executions.getTotal(), is(71L));
assertThat(executions.size(), is(10));
}
@@ -155,7 +155,9 @@ public abstract class AbstractExecutionRepositoryTest {
Thread.sleep(500);
Map<String, Map<String, List<DailyExecutionStatistics>>> result = executionRepository.dailyGroupByFlowStatistics(
"*",
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
@@ -180,7 +182,9 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(second.getExecutionCounts().get(State.Type.CREATED), is(0L));
result = executionRepository.dailyGroupByFlowStatistics(
"*",
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
true
@@ -210,7 +214,9 @@ public abstract class AbstractExecutionRepositoryTest {
Thread.sleep(500);
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
"*",
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
false
@@ -235,7 +241,9 @@ public abstract class AbstractExecutionRepositoryTest {
}
List<DailyExecutionStatistics> result = executionRepository.dailyStatistics(
"*",
null,
null,
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now(),
true
@@ -270,7 +278,7 @@ public abstract class AbstractExecutionRepositoryTest {
new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "third"),
new Flow(NAMESPACE, "missing")
),
"*",
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now()
);

View File

@@ -332,6 +332,11 @@ public abstract class AbstractFlowRepositoryTest {
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count(), is(1L));
}
@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace();
assertThat((long) distinctNamespace.size(), is(2L));
}
@Singleton
public static class FlowListener implements ApplicationEventListener<CrudEvent<Flow>> {

View File

@@ -19,7 +19,7 @@ public abstract class AbstractLogRepositoryTest {
@Inject
protected LogRepositoryInterface logRepository;
private static LogEntry.LogEntryBuilder logEntry() {
private static LogEntry.LogEntryBuilder logEntry(Level level) {
return LogEntry.builder()
.flowId(IdUtils.create())
.namespace("io.kestra.unittest")
@@ -28,25 +28,28 @@ public abstract class AbstractLogRepositoryTest {
.taskRunId(IdUtils.create())
.attemptNumber(0)
.timestamp(Instant.now())
.level(Level.INFO)
.level(level)
.thread("")
.message("john doe");
}
@Test
void all() {
LogEntry.LogEntryBuilder builder = logEntry();
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO);
ArrayListTotal<LogEntry> find = logRepository.find("*", Pageable.UNPAGED, null);
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, null, null, null, null);
assertThat(find.size(), is(0));
LogEntry save = logRepository.save(builder.build());
find = logRepository.find("doe", Pageable.UNPAGED, null);
find = logRepository.find(Pageable.UNPAGED, "doe", null, null, null);
assertThat(find.size(), is(1));
assertThat(find.get(0).getExecutionId(), is(save.getExecutionId()));
find = logRepository.find("*", Pageable.UNPAGED, null);
find = logRepository.find(Pageable.UNPAGED, "doe", Level.WARN,null, null);
assertThat(find.size(), is(0));
find = logRepository.find(Pageable.UNPAGED, null, null, null, null);
assertThat(find.size(), is(1));
assertThat(find.get(0).getExecutionId(), is(save.getExecutionId()));

View File

@@ -113,11 +113,13 @@ CREATE TABLE `executions` (
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL,
`state_duration` BIGINT GENERATED ALWAYS AS (value ->> '$.state.duration' * 1000) STORED NOT NULL,
`start_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.startDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,
`end_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.endDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED,
INDEX ix_id (id),
INDEX ix_namespace (namespace),
INDEX ix_flowId (flow_id),
INDEX ix_state_current (state_current),
INDEX ix_start_date (start_date),
INDEX ix_end_date (end_date),
INDEX ix_state_duration (state_duration),
INDEX ix_deleted (deleted),
FULLTEXT ix_fulltext (namespace, flow_id, id)

View File

@@ -30,6 +30,6 @@ public class PostgresFlowRepository extends AbstractFlowRepository {
@Override
protected Condition findSourceCodeCondition(String query) {
return DSL.condition("source_code @@ TO_TSQUERY('simple', ?)", query);
return this.jdbcRepository.fullTextCondition(Collections.singletonList("FULLTEXT_INDEX(source_code)"), query);
}
}

View File

@@ -1,17 +1,18 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.jdbc.repository.AbstractLogRepository;
import io.kestra.jdbc.repository.AbstractTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.impl.DSL;
import org.slf4j.event.Level;
import java.util.Collections;
import java.util.stream.Collectors;
@Singleton
@PostgresRepositoryEnabled
@@ -25,4 +26,15 @@ public class PostgresLogRepository extends AbstractLogRepository implements LogR
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query);
}
@Override
protected Condition minLevel(Level minLevel) {
return DSL.condition("level in (" +
LogEntry
.findLevelsByMin(minLevel)
.stream()
.map(s -> "'" + s + "'::log_level")
.collect(Collectors.joining(", ")) +
")");
}
}

View File

@@ -39,47 +39,49 @@ CREATE TYPE queue_type AS ENUM (
);
CREATE OR REPLACE FUNCTION FULLTEXT_REPLACE(text, text) RETURNS text
AS 'SELECT REGEXP_REPLACE(COALESCE($1, ''''), ''[^a-zA-Z\d:]'', $2, ''g'');'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
RETURNS NULL ON NULL INPUT
RETURN TRIM(BOTH $2 FROM REGEXP_REPLACE(COALESCE($1, ''), '[^a-zA-Z\d:]', $2, 'g'));
CREATE OR REPLACE FUNCTION FULLTEXT_INDEX(text) RETURNS tsvector
AS 'SELECT TO_TSVECTOR(''simple'', FULLTEXT_REPLACE($1, '' '')) || TO_TSVECTOR(''simple'', $1);'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
RETURNS NULL ON NULL INPUT
RETURN TO_TSVECTOR('simple', FULLTEXT_REPLACE($1, ' ')) || TO_TSVECTOR('simple', $1);
CREATE OR REPLACE FUNCTION FULLTEXT_SEARCH(text) RETURNS tsquery
AS 'SELECT TO_TSQUERY(''simple'', FULLTEXT_REPLACE($1, '':* & '') || '':*'');'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
RETURNS NULL ON NULL INPUT
RETURN CASE WHEN FULLTEXT_REPLACE($1, '') = '' THEN TO_TSQUERY('')
ELSE TO_TSQUERY('simple', FULLTEXT_REPLACE($1, ':* & ') || ':*')
END;
CREATE OR REPLACE FUNCTION STATE_FROMTEXT(text) RETURNS state_type
AS 'SELECT CAST($1 AS state_type);'
LANGUAGE SQL
IMMUTABLE;
IMMUTABLE
RETURN CAST($1 AS state_type);
CREATE OR REPLACE FUNCTION LOGLEVEL_FROMTEXT(text) RETURNS log_level
AS 'SELECT CAST($1 AS log_level);'
LANGUAGE SQL
IMMUTABLE;
IMMUTABLE
RETURN CAST($1 AS log_level);
CREATE OR REPLACE FUNCTION PARSE_ISO8601_DATETIME(text) RETURNS timestamp
AS 'SELECT $1::timestamp;'
CREATE OR REPLACE FUNCTION PARSE_ISO8601_DATETIME(text) RETURNS timestamptz
LANGUAGE SQL
IMMUTABLE;
IMMUTABLE
RETURN $1::timestamptz;
CREATE OR REPLACE FUNCTION PARSE_ISO8601_TIMESTAMP(text) RETURNS int
AS 'SELECT EXTRACT(epoch FROM $1::timestamptz AT TIME ZONE ''utc'') ;'
LANGUAGE SQL
IMMUTABLE;
IMMUTABLE
RETURN EXTRACT(epoch FROM $1::timestamptz AT TIME ZONE 'utc');
CREATE OR REPLACE FUNCTION PARSE_ISO8601_DURATION(text) RETURNS interval
AS 'SELECT $1::interval;'
LANGUAGE SQL
IMMUTABLE;
IMMUTABLE
RETURN $1::interval;;
/* ----------------------- queues ----------------------- */
CREATE TABLE queues (
@@ -147,6 +149,7 @@ CREATE TABLE executions (
state_current state_type NOT NULL GENERATED ALWAYS AS (STATE_FROMTEXT(value #>> '{state, current}')) STORED,
state_duration BIGINT NOT NULL GENERATED ALWAYS AS (EXTRACT(MILLISECONDS FROM PARSE_ISO8601_DURATION(value #>> '{state, duration}'))) STORED,
start_date TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value #>> '{state, startDate}')) STORED,
end_date TIMESTAMP GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value #>> '{state, endDate}')) STORED,
fulltext TSVECTOR GENERATED ALWAYS AS (
FULLTEXT_INDEX(CAST(value ->> 'namespace' AS varchar)) ||
FULLTEXT_INDEX(CAST(value ->> 'flowId' AS varchar)) ||
@@ -159,6 +162,7 @@ CREATE INDEX executions_namespace ON executions (namespace);
CREATE INDEX executions_flow_id ON executions (flow_id);
CREATE INDEX executions_state_current ON executions (state_current);
CREATE INDEX executions_start_date ON executions (start_date);
CREATE INDEX executions_end_date ON executions (end_date);
CREATE INDEX executions_state_duration ON executions (state_duration);
CREATE INDEX executions_deleted ON executions (deleted);
CREATE INDEX executions_fulltext ON executions USING GIN (fulltext);
@@ -189,7 +193,7 @@ CREATE TABLE logs (
attempt_number INT GENERATED ALWAYS AS (CAST(value ->> 'attemptNumber' AS INTEGER)) STORED,
trigger_id VARCHAR(150) GENERATED ALWAYS AS (value ->> 'triggerId') STORED,
level log_level NOT NULL GENERATED ALWAYS AS (LOGLEVEL_FROMTEXT(value ->> 'level')) STORED,
timestamp TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'timestamp')) STORED,
timestamp TIMESTAMPTZ NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'timestamp')) STORED,
fulltext TSVECTOR GENERATED ALWAYS AS (
FULLTEXT_INDEX(CAST(value ->> 'namespace' AS varchar)) ||
FULLTEXT_INDEX(CAST(value ->> 'flowId' AS varchar)) ||

View File

@@ -7,4 +7,6 @@
<appender-ref ref="STDOUT" />
<appender-ref ref="STDERR" />
</root>
<logger name="org.jooq" level="TRACE" />
</configuration>

View File

@@ -141,12 +141,11 @@ public abstract class AbstractJdbcRepository<T> {
if (i < 0) {
return r;
} else {
return r.substring(0, i) + "<mark>" + r.substring(i, i + query.length()) + "</mark>" + r.substring(i + query.length());
return r.substring(0, i) + "[mark]" + r.substring(i, i + query.length()) + "[/mark]" + r.substring(i + query.length());
}
})
.collect(Collectors.toList());
return Collections.singletonList(String.join("\n", fragments));
}

View File

@@ -57,7 +57,15 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
abstract protected Condition findCondition(String query);
public ArrayListTotal<Execution> find(String query, Pageable pageable, List<State.Type> state) {
public ArrayListTotal<Execution> find(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
@@ -71,6 +79,21 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter());
if (flowId != null && namespace != null) {
select = select.and(DSL.field("namespace").eq(namespace));
select = select.and(DSL.field("flow_id").eq(flowId));
} else if (namespace != null) {
select = select.and(DSL.field("namespace").likeIgnoreCase(namespace + "%"));
}
if (startDate != null) {
select = select.and(DSL.field("start_date").greaterOrEqual(startDate.toOffsetDateTime()));
}
if (endDate != null) {
select = select.and(DSL.field("end_date").lessOrEqual(endDate.toOffsetDateTime()));
}
if (state != null) {
select = select.and(DSL.field("state_current")
.in(state.stream().map(Enum::name).collect(Collectors.toList())));
@@ -105,7 +128,15 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
}
@Override
public ArrayListTotal<TaskRun> findTaskRun(String query, Pageable pageable, List<State.Type> state) {
public ArrayListTotal<TaskRun> findTaskRun(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> states
) {
throw new UnsupportedOperationException();
}
@@ -115,7 +146,14 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
}
@Override
public List<DailyExecutionStatistics> dailyStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean isTaskRun) {
public List<DailyExecutionStatistics> dailyStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean isTaskRun
) {
if (isTaskRun) {
throw new UnsupportedOperationException();
}
@@ -172,8 +210,8 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
.select(selectFields)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.and(DSL.field("start_date").greaterOrEqual(finalStartDate.toInstant()))
.and(DSL.field("start_date").lessOrEqual(finalEndDate.toInstant()));
.and(DSL.field("start_date").greaterOrEqual(finalStartDate.toOffsetDateTime()))
.and(DSL.field("start_date").lessOrEqual(finalEndDate.toOffsetDateTime()));
if (query != null) {
select.and(this.findCondition(query));
@@ -190,8 +228,16 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
});
}
@Override
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean groupByNamespaceOnly) {
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean groupByNamespaceOnly
) {
List<Field<?>> fields = new ArrayList<>();
fields.add(DSL.date(DSL.field("start_date", Date.class)).as("start_date"));
@@ -270,6 +316,7 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
if (result == null) {
return DailyExecutionStatistics.builder()
.startDate(date.toLocalDate())
.duration(DailyExecutionStatistics.Duration.builder().build())
.build();
}
@@ -298,7 +345,12 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
}
@Override
public List<ExecutionCount> executionCounts(List<Flow> flows, String query, ZonedDateTime startDate, ZonedDateTime endDate) {
public List<ExecutionCount> executionCounts(
List<Flow> flows,
@Nullable List<State.Type> states,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
) {
ZonedDateTime finalStartDate = startDate == null ? ZonedDateTime.now().minusDays(30) : startDate;
ZonedDateTime finalEndDate = endDate == null ? ZonedDateTime.now() : endDate;
@@ -314,11 +366,12 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.and(DSL.field("start_date").greaterOrEqual(finalStartDate.toInstant()))
.and(DSL.field("start_date").lessOrEqual(finalEndDate.toInstant()));
.and(DSL.field("start_date").greaterOrEqual(finalStartDate.toOffsetDateTime()))
.and(DSL.field("end_date").lessOrEqual(finalEndDate.toOffsetDateTime()));
if (query != null) {
select.and(this.findCondition(query));
if (states != null) {
select = select.and(DSL.field("state_current")
.in(states.stream().map(Enum::name).collect(Collectors.toList())));
}
// add flow & namespace filters
@@ -417,4 +470,19 @@ public abstract class AbstractExecutionRepository extends AbstractRepository imp
return null;
});
}
@Override
public Function<String, String> sortMapping() throws IllegalArgumentException {
Map<String, String> mapper = Map.of(
"id", "id",
"state.startDate", "start_date",
"state.endDate", "end_date",
"state.duration", "state_duration",
"namespace", "namespace",
"flowId", "flow_id",
"state.current", "state_current"
);
return mapper::get;
}
}

View File

@@ -23,6 +23,7 @@ import org.jooq.*;
import org.jooq.impl.DSL;
import java.util.*;
import javax.annotation.Nullable;
import javax.validation.ConstraintViolationException;
@Singleton
@@ -187,7 +188,7 @@ public abstract class AbstractFlowRepository extends AbstractRepository implemen
abstract protected Condition findCondition(String query);
public ArrayListTotal<Flow> find(String query, Pageable pageable) {
public ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nullable String namespace) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
@@ -199,6 +200,10 @@ public abstract class AbstractFlowRepository extends AbstractRepository implemen
select.and(this.findCondition(query));
}
if (namespace != null) {
select.and(DSL.field("namespace").likeIgnoreCase(namespace + "%"));
}
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
@@ -206,7 +211,7 @@ public abstract class AbstractFlowRepository extends AbstractRepository implemen
abstract protected Condition findSourceCodeCondition(String query);
@Override
public ArrayListTotal<SearchResult<Flow>> findSourceCode(String query, Pageable pageable) {
public ArrayListTotal<SearchResult<Flow>> findSourceCode(Pageable pageable, @Nullable String query, @Nullable String namespace) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
@@ -218,13 +223,17 @@ public abstract class AbstractFlowRepository extends AbstractRepository implemen
select.and(this.findSourceCodeCondition(query));
}
if (namespace != null) {
select.and(DSL.field("namespace").likeIgnoreCase(namespace + "%"));
}
return this.jdbcRepository.fetchPage(
context,
select,
pageable,
record -> new SearchResult<>(
this.jdbcRepository.map(record),
this.jdbcRepository.fragments(query, record.getValue("value", String.class))
this.jdbcRepository.fragments(query, record.getValue("source_code", String.class))
)
);
});
@@ -334,9 +343,9 @@ public abstract class AbstractFlowRepository extends AbstractRepository implemen
.transactionResult(configuration -> DSL
.using(configuration)
.select(DSL.field("namespace"))
.from(lastRevision(false))
.from(lastRevision(true))
.where(this.defaultFilter())
.groupBy(DSL.grouping(DSL.field("namespace")))
.groupBy(DSL.field("namespace"))
.fetch()
.map(record -> record.getValue("namespace", String.class))
);

View File

@@ -11,8 +11,10 @@ import org.jooq.*;
import org.jooq.impl.DSL;
import org.slf4j.event.Level;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@Singleton
public abstract class AbstractLogRepository extends AbstractRepository implements LogRepositoryInterface, JdbcIndexerInterface<LogEntry> {
@@ -24,7 +26,13 @@ public abstract class AbstractLogRepository extends AbstractRepository implement
abstract protected Condition findCondition(String query);
public ArrayListTotal<LogEntry> find(String query, Pageable pageable, Level minLevel) {
public ArrayListTotal<LogEntry> find(
Pageable pageable,
@Nullable String query,
@Nullable Level minLevel,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
@@ -37,11 +45,19 @@ public abstract class AbstractLogRepository extends AbstractRepository implement
.where(this.defaultFilter());
if (minLevel != null) {
select.and(minLevel(minLevel));
select = select.and(minLevel(minLevel));
}
if (query != null) {
select.and(this.findCondition(query));
select = select.and(this.findCondition(query));
}
if (startDate != null) {
select = select.and(DSL.field("timestamp").greaterOrEqual(startDate.toOffsetDateTime()));
}
if (endDate != null) {
select = select.and(DSL.field("timestamp").lessOrEqual(endDate.toOffsetDateTime()));
}
return this.jdbcRepository.fetchPage(context, select, pageable);
@@ -112,7 +128,7 @@ public abstract class AbstractLogRepository extends AbstractRepository implement
});
}
private static Condition minLevel(Level minLevel) {
protected Condition minLevel(Level minLevel) {
return DSL.field("level").in(LogEntry.findLevelsByMin(minLevel));
}
}

View File

@@ -157,7 +157,7 @@ public abstract class AbstractTemplateRepository extends AbstractRepository impl
.select(DSL.field("namespace"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.groupBy(DSL.grouping(DSL.field("namespace")))
.groupBy(DSL.field("namespace"))
.fetch()
.map(record -> record.getValue("namespace", String.class))
);

View File

@@ -225,11 +225,6 @@ public class JdbcExecutor implements ExecutorInterface {
});
}
// Listeners need the last emit
if (conditionService.isTerminatedWithListeners(flow, execution)) {
this.executionQueue.emit(execution);
}
// multiple condition
if (
conditionService.isTerminatedWithListeners(flow, execution) &&

View File

@@ -0,0 +1,30 @@
package io.kestra.jdbc.runner;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import jakarta.inject.Singleton;
import java.util.Optional;
@Singleton
@JdbcRunnerEnabled
public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface {
protected TriggerRepositoryInterface triggerRepository;
public JdbcSchedulerTriggerState(TriggerRepositoryInterface triggerRepository) {
this.triggerRepository = triggerRepository;
}
@Override
public Optional<Trigger> findLast(TriggerContext context) {
return this.triggerRepository.findLast(context);
}
@Override
public Trigger save(Trigger trigger) {
// noop save with trigger queue
return trigger;
}
}

View File

@@ -27,16 +27,16 @@ public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repo
@Test
void find() {
List<Flow> save = flowRepository.find(null, Pageable.from(1, 100, Sort.of(Sort.Order.asc("id"))));
List<Flow> save = flowRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc("id"))), null, null);
assertThat((long) save.size(), is(Helpers.FLOWS_COUNT));
save = flowRepository.find("trigger-multiplecondition", Pageable.from(1, 10, Sort.UNSORTED));
save = flowRepository.find(Pageable.from(1, 10, Sort.UNSORTED), "trigger-multiplecondition", null);
assertThat((long) save.size(), is(3L));
}
@Test
void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode("io.kestra.core.models.conditions.types.MultipleCondition", Pageable.from(1, 10, Sort.UNSORTED));
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.core.models.conditions.types.MultipleCondition", null);
assertThat((long) search.size(), is(1L));
@@ -47,7 +47,7 @@ public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repo
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().get(0), containsString("types.MultipleCondition</mark>"));
assertThat(flow.getFragments().get(0), containsString("types.MultipleCondition[/mark]"));
}
@BeforeEach

View File

@@ -14,6 +14,7 @@ import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.ShardOperationFailedException;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
@@ -38,6 +39,7 @@ import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.QueryStringQueryBuilder;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.SearchHit;
@@ -104,6 +106,28 @@ abstract public class AbstractElasticSearchRepository<T> {
.must(QueryBuilders.matchQuery("deleted", false));
}
protected static QueryStringQueryBuilder queryString(@Nullable String query) {
if (query == null) {
return QueryBuilders.queryStringQuery("*");
}
String lucene;
List<String> words = Arrays.stream(query.split("[^a-zA-Z0-9_.-]+"))
.filter(r -> !r.equals(""))
.map(QueryParser::escape)
.collect(Collectors.toList());
if (words.size() == 1) {
lucene = "(" + query + ")^5 OR " + query;
} else {
lucene = "(*" + String.join("*", words) + "*)^3 OR (*" + String.join("* AND *", words) + "*)";
}
return QueryBuilders.queryStringQuery(lucene);
}
protected Optional<T> getRequest(String index, String id) {
BoolQueryBuilder bool = this.defaultFilter()
.must(QueryBuilders.termQuery("_id", id));
@@ -322,10 +346,11 @@ abstract public class AbstractElasticSearchRepository<T> {
}
protected ArrayListTotal<T> findQueryString(String index, String query, Pageable pageable) {
BoolQueryBuilder bool = this.defaultFilter()
.must(QueryBuilders.queryStringQuery(query)
.field("*.fulltext")
);
BoolQueryBuilder bool = this.defaultFilter();
if (query != null) {
bool.must(QueryBuilders.queryStringQuery(query).field("*.fulltext"));
}
SearchSourceBuilder sourceBuilder = this.searchSource(bool, Optional.empty(), pageable);

View File

@@ -10,7 +10,6 @@ import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.data.model.Pageable;
import org.apache.lucene.search.join.ScoreMode;
import org.opensearch.action.search.SearchRequest;
@@ -50,6 +49,8 @@ import java.util.stream.Collectors;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import javax.annotation.Nullable;
@Singleton
@ElasticSearchRepositoryEnabled
public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepository<Execution> implements ExecutionRepositoryInterface {
@@ -81,6 +82,8 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
@Override
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean groupByNamespaceOnly
@@ -106,7 +109,7 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
);
SearchSourceBuilder sourceBuilder = this.searchSource(
this.dateFilters(query, startDate, endDate),
this.filters(query, startDate, endDate, namespace, flowId, null),
Optional.of(Collections.singletonList(
agg
)),
@@ -121,16 +124,16 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
((ParsedStringTerms) searchResponse.getAggregations().get(NAMESPACE_AGG)).getBuckets()
.forEach(namespaceBucket -> {
final String namespace = namespaceBucket.getKeyAsString();
final String currentNamespace = namespaceBucket.getKeyAsString();
if (groupByNamespaceOnly) {
this.parseDateAgg(namespaceBucket, result, namespace, "*");
this.parseDateAgg(namespaceBucket, result, currentNamespace, "*");
} else {
((ParsedStringTerms) namespaceBucket.getAggregations().get(FLOW_AGG)).getBuckets()
.forEach(flowBucket -> {
final String flowId = flowBucket.getKeyAsString();
final String currentFlowId = flowBucket.getKeyAsString();
this.parseDateAgg(flowBucket, result, namespace, flowId);
this.parseDateAgg(flowBucket, result, currentNamespace, currentFlowId);
});
}
});
@@ -174,7 +177,12 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
}
@Override
public List<ExecutionCount> executionCounts(List<Flow> flows, String query, ZonedDateTime startDate, ZonedDateTime endDate) {
public List<ExecutionCount> executionCounts(
List<Flow> flows,
@javax.annotation.Nullable List<State.Type> states,
@javax.annotation.Nullable ZonedDateTime startDate,
@javax.annotation.Nullable ZonedDateTime endDate
) {
if (startDate == null) {
startDate = ZonedDateTime.now().minusDays(30);
}
@@ -184,7 +192,7 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
}
SearchSourceBuilder sourceBuilder = this.searchSource(
this.dateFilters(query, startDate, endDate),
this.filters(null, startDate, endDate, null, null, states),
Optional.of(Collections.singletonList(
AggregationBuilders.filters(
"FILTERS",
@@ -228,6 +236,8 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
@Override
public List<DailyExecutionStatistics> dailyStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean isTaskRun
@@ -248,7 +258,7 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
}
SearchSourceBuilder sourceBuilder = this.searchSource(
this.dateFilters(query, startDate, endDate),
this.filters(query, startDate, endDate, namespace, flowId, null),
Optional.of(Collections.singletonList(
agg
)),
@@ -280,19 +290,34 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
}
}
private BoolQueryBuilder dateFilters(String query, ZonedDateTime startDate, ZonedDateTime endDate) {
private BoolQueryBuilder filters(String query, ZonedDateTime startDate, ZonedDateTime endDate, String namespace, String flowId, List<State.Type> state) {
BoolQueryBuilder bool = this.defaultFilter();
bool.must(QueryBuilders.rangeQuery("state.startDate")
.gte(startDate)
);
if (query != null) {
bool.must(queryString(query).field("*.fulltext"));
}
bool.must(QueryBuilders.rangeQuery("state.startDate")
.lte(endDate)
);
if (flowId != null && namespace != null) {
bool = bool.must(QueryBuilders.matchQuery("flowId", flowId));
bool = bool.must(QueryBuilders.matchQuery("namespace", namespace));
} else if (namespace != null) {
bool = bool.must(QueryBuilders.prefixQuery("namespace", namespace));
}
if (startDate != null) {
bool.must(QueryBuilders.rangeQuery("state.startDate").gte(startDate));
}
if (endDate != null) {
bool.must(QueryBuilders.rangeQuery("state.startDate").lte(endDate));
}
if (state != null) {
bool = bool.must(QueryBuilders.termsQuery("state.current", stateConvert(state)));
}
if (query != null) {
bool.must(QueryBuilders.queryStringQuery(query).field("*.fulltext"));
bool.must(queryString(query).field("*.fulltext"));
}
return bool;
@@ -360,25 +385,40 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
}
@Override
public ArrayListTotal<Execution> find(String query, Pageable pageable, List<State.Type> state) {
BoolQueryBuilder bool = this.defaultFilter()
.must(QueryBuilders.queryStringQuery(query).field("*.fulltext"));
if (state != null) {
bool = bool.must(QueryBuilders.termsQuery("state.current", stateConvert(state)));
}
public ArrayListTotal<Execution> find(
Pageable pageable,
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
) {
BoolQueryBuilder bool = this.filters(query, startDate, endDate, namespace, flowId, state);
SearchSourceBuilder sourceBuilder = this.searchSource(bool, Optional.empty(), pageable);
return this.query(INDEX_NAME, sourceBuilder);
}
@Override
public ArrayListTotal<TaskRun> findTaskRun(String query, Pageable pageable, @Nullable List<State.Type> state) {
BoolQueryBuilder filterAggQuery = QueryBuilders
.boolQuery()
.filter(QueryBuilders.queryStringQuery(query));
public ArrayListTotal<TaskRun> findTaskRun(
Pageable pageable,
@javax.annotation.Nullable String query,
@javax.annotation.Nullable String namespace,
@javax.annotation.Nullable String flowId,
@javax.annotation.Nullable ZonedDateTime startDate,
@javax.annotation.Nullable ZonedDateTime endDate,
@Nullable List<State.Type> states
) {
BoolQueryBuilder filterAggQuery = QueryBuilders.boolQuery();
if (state != null) {
filterAggQuery = filterAggQuery.must(QueryBuilders.termsQuery("taskRunList.state.current", stateConvert(state)));
if (query != null) {
filterAggQuery.must(QueryBuilders.queryStringQuery(query).field("*.fulltext"));
}
if (states != null) {
filterAggQuery = filterAggQuery.must(QueryBuilders.termsQuery("taskRunList.state.current", stateConvert(states)));
}
NestedAggregationBuilder nestedAgg = AggregationBuilders
@@ -397,7 +437,7 @@ public class ElasticSearchExecutionRepository extends AbstractElasticSearchRepos
BoolQueryBuilder mainQuery = this.defaultFilter()
.filter(QueryBuilders.nestedQuery(
"taskRunList",
QueryBuilders.queryStringQuery(query).field("*.fulltext"),
query != null ? QueryBuilders.queryStringQuery(query).field("*.fulltext") : QueryBuilders.matchAllQuery(),
ScoreMode.Total
));

View File

@@ -15,10 +15,7 @@ import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.text.Text;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.*;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
@@ -41,6 +38,8 @@ import java.util.stream.Collectors;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import javax.annotation.Nullable;
import javax.validation.ConstraintViolationException;
@Singleton
@@ -188,9 +187,16 @@ public class ElasticSearchFlowRepository extends AbstractElasticSearchRepository
}
@Override
public ArrayListTotal<Flow> find(String query, Pageable pageable) {
BoolQueryBuilder bool = this.defaultFilter()
.must(QueryBuilders.queryStringQuery(query).field("*.fulltext"));
public ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nullable String namespace) {
BoolQueryBuilder bool = this.defaultFilter();
if (query != null) {
bool.must(queryString(query).field("*.fulltext"));
}
if (namespace != null) {
bool.must(QueryBuilders.prefixQuery("namespace", namespace));
}
SearchSourceBuilder sourceBuilder = this.searchSource(bool, Optional.empty(), pageable);
sourceBuilder.fetchSource("*", "sourceCode");
@@ -199,9 +205,13 @@ public class ElasticSearchFlowRepository extends AbstractElasticSearchRepository
}
@Override
public ArrayListTotal<SearchResult<Flow>> findSourceCode(String query, Pageable pageable) {
public ArrayListTotal<SearchResult<Flow>> findSourceCode(Pageable pageable, @Nullable String query, @Nullable String namespace) {
BoolQueryBuilder bool = this.defaultFilter()
.must(QueryBuilders.queryStringQuery(query).field("sourceCode"));
.must(queryString(query).field("sourceCode"));
if (namespace != null) {
bool.must(QueryBuilders.prefixQuery("namespace", namespace));
}
SearchSourceBuilder sourceBuilder = this.searchSource(bool, Optional.empty(), pageable);
sourceBuilder.fetchSource("*", "sourceCode");

View File

@@ -15,11 +15,14 @@ import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.IdUtils;
import org.slf4j.event.Level;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import javax.annotation.Nullable;
@Singleton
@ElasticSearchRepositoryEnabled
public class ElasticSearchLogRepository extends AbstractElasticSearchRepository<LogEntry> implements LogRepositoryInterface {
@@ -36,14 +39,31 @@ public class ElasticSearchLogRepository extends AbstractElasticSearchRepository<
}
@Override
public ArrayListTotal<LogEntry> find(String query, Pageable pageable, Level minLevel) {
BoolQueryBuilder bool = this.defaultFilter()
.must(QueryBuilders.queryStringQuery(query).field("*.fulltext"));
public ArrayListTotal<LogEntry> find(
Pageable pageable,
@Nullable String query,
@Nullable Level minLevel,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
) {
BoolQueryBuilder bool = this.defaultFilter();
if (query != null) {
bool.must(QueryBuilders.queryStringQuery(query).field("*.fulltext"));
}
if (minLevel != null) {
bool.must(minLevel(minLevel));
}
if (startDate != null) {
bool.must(QueryBuilders.rangeQuery("timestamp").gte(startDate));
}
if (endDate != null) {
bool.must(QueryBuilders.rangeQuery("timestamp").lte(endDate));
}
SearchSourceBuilder sourceBuilder = this.searchSource(bool, Optional.empty(), pageable)
.sort("timestamp", SortOrder.DESC);

View File

@@ -39,14 +39,14 @@ class ElasticSearchFlowRepositoryTest extends AbstractFlowRepositoryTest {
@Test
void find() {
List<Flow> save = flowRepository.find("*", Pageable.from(1, 100, Sort.UNSORTED));
List<Flow> save = flowRepository.find(Pageable.from(1, 100, Sort.UNSORTED), null, null);
assertThat((long) save.size(), is(Helpers.FLOWS_COUNT));
}
@Test
void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode("*types.MultipleCondition*", Pageable.from(1, 10, Sort.UNSORTED));
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "*types.MultipleCondition*", null);
assertThat((long) search.size(), is(1L));
assertThat(search.get(0).getModel().getId(), is("trigger-multiplecondition-listener"));

View File

@@ -16,18 +16,20 @@ import java.util.*;
import java.util.stream.Collectors;
import jakarta.inject.Singleton;
import javax.annotation.Nullable;
@Singleton
@MemoryRepositoryEnabled
public class MemoryExecutionRepository implements ExecutionRepositoryInterface {
private Map<String, Execution> executions = new HashMap<>();
private final Map<String, Execution> executions = new HashMap<>();
@Override
public ArrayListTotal<Execution> find(String query, Pageable pageable, List<State.Type> state) {
public ArrayListTotal<Execution> find(Pageable pageable, String query, String namespace, String flowId, ZonedDateTime startDate, ZonedDateTime endDate, List<State.Type> state) {
throw new UnsupportedOperationException();
}
@Override
public ArrayListTotal<TaskRun> findTaskRun(String query, Pageable pageable, List<State.Type> state) {
public ArrayListTotal<TaskRun> findTaskRun(Pageable pageable, @io.micronaut.core.annotation.Nullable String query, @io.micronaut.core.annotation.Nullable String namespace, @io.micronaut.core.annotation.Nullable String flowId, @io.micronaut.core.annotation.Nullable ZonedDateTime startDate, @io.micronaut.core.annotation.Nullable ZonedDateTime endDate, @io.micronaut.core.annotation.Nullable List<State.Type> states) {
throw new UnsupportedOperationException();
}
@@ -60,17 +62,29 @@ public class MemoryExecutionRepository implements ExecutionRepositoryInterface {
}
@Override
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean groupByNamespaceOnly) {
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(@Nullable String query, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, boolean groupByNamespaceOnly) {
throw new UnsupportedOperationException();
}
@Override
public List<ExecutionCount> executionCounts(List<Flow> flows, String query, ZonedDateTime startDate, ZonedDateTime endDate) {
public List<ExecutionCount> executionCounts(
List<Flow> flows,
@Nullable List<State.Type> states,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
) {
throw new UnsupportedOperationException();
}
@Override
public List<DailyExecutionStatistics> dailyStatistics(String query, ZonedDateTime startDate, ZonedDateTime endDate, boolean isTaskRun) {
public List<DailyExecutionStatistics> dailyStatistics(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
boolean isTaskRun
) {
throw new UnsupportedOperationException();
}

View File

@@ -21,6 +21,7 @@ import org.apache.commons.lang3.NotImplementedException;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import javax.annotation.Nullable;
import javax.validation.ConstraintViolationException;
import java.util.*;
import java.util.stream.Collectors;
@@ -99,7 +100,7 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
.collect(Collectors.toList());
}
public ArrayListTotal<Flow> find(String query, Pageable pageable) {
public ArrayListTotal<Flow> find(Pageable pageable, @Nullable String query, @Nullable String namespace) {
//TODO Non used query, returns just all at the moment
if (pageable.getNumber() < 1) {
throw new ValueException("Page cannot be < 1");
@@ -109,7 +110,7 @@ public class MemoryFlowRepository implements FlowRepositoryInterface {
}
@Override
public ArrayListTotal<SearchResult<Flow>> findSourceCode(String query, Pageable pageable) {
public ArrayListTotal<SearchResult<Flow>> findSourceCode(Pageable pageable, @Nullable String query, @Nullable String namespace) {
throw new NotImplementedException();
}

View File

@@ -6,10 +6,13 @@ import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import org.slf4j.event.Level;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import jakarta.inject.Singleton;
import javax.annotation.Nullable;
@Singleton
@MemoryRepositoryEnabled
public class MemoryLogRepository implements LogRepositoryInterface {
@@ -31,7 +34,13 @@ public class MemoryLogRepository implements LogRepositoryInterface {
}
@Override
public ArrayListTotal<LogEntry> find(String query, Pageable pageable, Level minLevel) {
public ArrayListTotal<LogEntry> find(
Pageable pageable,
@Nullable String query,
@Nullable Level minLevel,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
) {
throw new UnsupportedOperationException();
}

View File

@@ -1,6 +1,8 @@
package io.kestra.runner.kafka;
import io.kestra.core.runners.Executor;
import io.kestra.core.schedulers.SchedulerExecutionState;
import io.micronaut.context.annotation.Replaces;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import io.kestra.core.models.executions.Execution;
@@ -12,6 +14,7 @@ import jakarta.inject.Singleton;
@Slf4j
@KafkaQueueEnabled
@Singleton
@Replaces(SchedulerExecutionState.class)
public class KafkaSchedulerExecutionState implements SchedulerExecutionStateInterface {
private final ReadOnlyKeyValueStore<String, Executor> store;

View File

@@ -14,8 +14,8 @@
@input="onDataTableValue('status', $event)"
/>
<date-range
:start="$route.query.start"
:end="$route.query.end"
:start-date="$route.query.startDate"
:end-date="$route.query.endDate"
@input="onDataTableValue($event)"
/>
<refresh-button class="float-right" @onRefresh="load" />
@@ -37,7 +37,7 @@
:responsive="true"
striped
hover
sort-by="state.startDate"
sort-by="startDate"
sort-desc
:items="executions"
:fields="fields"
@@ -117,8 +117,8 @@
import Kicon from "../Kicon"
import RestoreUrl from "../../mixins/restoreUrl";
import State from "../../utils/state";
import qb from "../../utils/queryBuilder";
import Id from "../Id";
import _merge from "lodash/merge";
export default {
mixins: [RouteContext, RestoreUrl, DataTableActions],
@@ -247,58 +247,43 @@
return State.isRunning(item.state.current);
},
onStatusChange() {
this.load(this.onDataLoaded);
},
loadQuery(stats) {
let filter = [];
let query = this.queryWithFilter();
loadQuery(base, stats) {
let queryFilter = this.queryWithFilter();
if (query.namespace) {
filter.push(`namespace:${query.namespace}*`)
}
if (query.q) {
filter.push(qb.toLucene(query.q));
}
if (query.start && !stats) {
filter.push(`state.startDate:[${query.start} TO *]`)
}
if (query.end && !stats) {
filter.push(`state.endDate:[* TO ${query.end}]`)
if (stats) {
delete queryFilter["startDate"];
delete queryFilter["endDate"];
}
if (this.$route.name === "flows/update") {
filter.push(`namespace:${this.$route.params.namespace}`);
filter.push(`flowId:${this.$route.params.id}`);
queryFilter["namespace"] = this.$route.params.namespace;
queryFilter["flowId"] = this.$route.params.id;
}
return filter.join(" AND ") || "*"
return _merge(base, queryFilter)
},
loadData(callback) {
if (this.embed === false) {
this.dailyReady = false;
this.$store
.dispatch("stat/daily", {
q: this.loadQuery(true),
.dispatch("stat/daily", this.loadQuery({
startDate: this.$moment(this.startDate).add(-1, "day").startOf("day").toISOString(true),
endDate: this.$moment(this.endDate).endOf("day").toISOString(true)
})
}, true))
.then(() => {
this.dailyReady = true;
});
}
this.$store.dispatch("execution/findExecutions", {
this.$store.dispatch("execution/findExecutions", this.loadQuery({
size: parseInt(this.$route.query.size || this.internalPageSize),
page: parseInt(this.$route.query.page || this.internalPageNumber),
q: this.loadQuery(false),
sort: this.$route.query.sort || "state.startDate:desc",
state: this.$route.query.status ? [this.$route.query.status] : this.statuses
}).finally(callback);
}, false)).finally(callback);
},
durationFrom(item) {
return (+new Date() - new Date(item.state.startDate).getTime()) / 1000

View File

@@ -116,6 +116,7 @@
<script>
import {mapState} from "vuex";
import _merge from "lodash/merge";
import permission from "../../models/permission";
import action from "../../models/action";
import NamespaceSelect from "../namespace/NamespaceSelect";
@@ -133,7 +134,6 @@
import TriggerAvatar from "./TriggerAvatar";
import MarkdownTooltip from "../layout/MarkdownTooltip"
import Kicon from "../Kicon"
import qb from "../../utils/queryBuilder";
export default {
mixins: [RouteContext, RestoreUrl, DataTableActions],
@@ -230,42 +230,31 @@
return [];
}
},
loadQuery() {
let filter = []
let query = this.queryWithFilter();
loadQuery(base) {
let queryFilter = this.queryWithFilter();
if (query.namespace) {
filter.push(`namespace:${query.namespace}*`)
}
if (query.q) {
filter.push(qb.toLucene(query.q));
}
return filter.join(" AND ") || "*"
return _merge(base, queryFilter)
},
loadData(callback) {
this.dailyReady = false;
if (this.user.hasAny(permission.EXECUTION)) {
this.$store
.dispatch("stat/daily", {
q: this.loadQuery(),
.dispatch("stat/daily", this.loadQuery({
startDate: this.$moment(this.startDate).add(-1, "day").startOf("day").toISOString(true),
endDate: this.$moment(this.endDate).endOf("day").toISOString(true)
})
}))
.then(() => {
this.dailyReady = true;
});
}
this.$store
.dispatch("flow/findFlows", {
q: this.loadQuery(),
.dispatch("flow/findFlows", this.loadQuery({
size: parseInt(this.$route.query.size || 25),
page: parseInt(this.$route.query.page || 1),
sort: this.$route.query.sort || "id:asc"
})
}))
.then(flows => {
this.dailyGroupByFlowReady = false;
callback();

View File

@@ -18,7 +18,7 @@
</template>
<template #table>
<div v-if="search === undefined">
<div v-if="search === undefined || search.length === 0">
<b-alert variant="light" class="text-muted" show>
{{ $t('no result') }}
</b-alert>
@@ -52,8 +52,8 @@
import RestoreUrl from "../../mixins/restoreUrl";
import DataTable from "../layout/DataTable";
import SearchField from "../layout/SearchField";
import qb from "../../utils/queryBuilder";
import _escape from "lodash/escape"
import _merge from "lodash/merge";
export default {
mixins: [RouteContext, RestoreUrl, DataTableActions],
@@ -89,30 +89,19 @@
.replaceAll("[mark]", "<mark>")
.replaceAll("[/mark]", "</mark>")
},
loadQuery() {
let filter = []
let query = this.queryWithFilter();
loadQuery(base) {
let queryFilter = this.queryWithFilter();
if (query.namespace) {
filter.push(`namespace:${query.namespace}*`)
}
if (query.q) {
filter.push(qb.toLucene(query.q));
}
return filter.join(" AND ") || "*"
return _merge(base, queryFilter)
},
loadData(callback) {
const query = this.loadQuery();
if (query !== "*") {
if (this.$route.query["q"] !== undefined) {
this.$store
.dispatch("flow/searchFlows", {
q: query,
.dispatch("flow/searchFlows", this.loadQuery({
size: parseInt(this.$route.query.size || 25),
page: parseInt(this.$route.query.page || 1),
sort: this.$route.query.sort
})
}))
.finally(() => {
this.saveRestoreUrl();
})
@@ -121,6 +110,7 @@
this.$store.commit("flow/setSearch", undefined);
callback();
}
}
}
};

View File

@@ -136,11 +136,11 @@
}
},
props: {
start: {
startDate: {
type: String,
default: undefined
},
end: {
endDate: {
type: String,
default: undefined
}
@@ -148,14 +148,14 @@
methods: {
onDate(value) {
this.$emit("input", {
"start": value[0] ? moment(value[0]).toISOString(true) : undefined,
"end": value[1] ? moment(value[1]).toISOString(true) : undefined
"startDate": value[0] ? moment(value[0]).toISOString(true) : undefined,
"endDate": value[1] ? moment(value[1]).toISOString(true) : undefined
});
}
},
computed: {
date() {
return [new Date(this.start), new Date(this.end)];
return [new Date(this.startDate), new Date(this.endDate)];
}
}
};

View File

@@ -15,8 +15,8 @@
@input="onDataTableValue('level', $event)"
/>
<date-range
:start="$route.query.start"
:end="$route.query.end"
:start-date="$route.query.startDate"
:end-date="$route.query.endDate"
@input="onDataTableValue($event)"
/>
<refresh-button class="float-right" @onRefresh="load" />
@@ -51,7 +51,6 @@
import LogLine from "../logs/LogLine";
import {mapState} from "vuex";
import RouteContext from "../../mixins/routeContext";
import qb from "../../utils/queryBuilder";
import RestoreUrl from "../../mixins/restoreUrl";
import DataTableActions from "../../mixins/dataTableActions";
import NamespaceSelect from "../namespace/NamespaceSelect";
@@ -60,6 +59,7 @@
import LogLevelSelector from "./LogLevelSelector";
import DataTable from "../../components/layout/DataTable";
import RefreshButton from "../../components/layout/RefreshButton";
import _merge from "lodash/merge";
export default {
mixins: [RouteContext, RestoreUrl, DataTableActions],
@@ -93,42 +93,26 @@
}
},
methods: {
loadQuery() {
let filter = []
let query = this.queryWithFilter();
if (query.namespace) {
filter.push(`namespace:${query.namespace}*`)
}
if (query.start) {
filter.push(`timestamp:[${query.start} TO *]`)
}
if (query.end) {
filter.push(`timestamp:[* TO ${query.end}]`)
}
if (query.q) {
filter.push(qb.toTextLucene(query.q));
}
loadQuery(base) {
let queryFilter = this.queryWithFilter();
if (this.isFlowEdit) {
filter.push(`namespace:${this.$route.params.namespace}`)
filter.push(`flowId:${this.$route.params.id}`)
queryFilter["namespace"] = this.$route.params.namespace;
queryFilter["flowId"] = this.$route.params.id;
}
return filter.join(" AND ") || "*"
delete queryFilter["level"];
return _merge(base, queryFilter)
},
load() {
this.isLoading = true
this.$store
.dispatch("log/findLogs", {
q: this.loadQuery(),
.dispatch("log/findLogs", this.loadQuery({
page: this.$route.query.page || this.internalPageNumber,
size: this.$route.query.size || this.internalPageSize,
minLevel: this.$route.query.level || this.logLevel
})
}))
.finally(() => {
this.isLoading = false
this.saveRestoreUrl();

View File

@@ -14,8 +14,8 @@
@input="onDataTableValue('status', $event)"
/>
<date-range
:start="$route.query.start"
:end="$route.query.end"
:start-date="$route.query.startDate"
:end-date="$route.query.endDate"
@input="onDataTableValue($event)"
/>
<refresh-button class="float-right" @onRefresh="load" />

View File

@@ -2,6 +2,7 @@ package io.kestra.webserver.controllers;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.convert.format.Format;
import io.micronaut.data.model.Pageable;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
@@ -55,6 +56,7 @@ import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
@@ -100,32 +102,50 @@ public class ExecutionController {
@Get(uri = "executions/search", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Search for executions")
public PagedResults<Execution> find(
@Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query,
@Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page,
@Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort,
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue(value = "state") List<State.Type> state
) {
return PagedResults.of(
executionRepository
.find(query, PageableUtils.from(page, size, sort), state)
);
return PagedResults.of(executionRepository.find(
PageableUtils.from(page, size, sort, executionRepository.sortMapping()),
query,
namespace,
flowId,
startDate,
endDate,
state
));
}
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "taskruns/search", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Executions"}, summary = "Search for taskruns")
public PagedResults<TaskRun> findTaskRun(
@Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query,
@Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page,
@Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size,
@Nullable @QueryValue(value = "state") List<State.Type> state,
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort,
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "A state filter") @Nullable @QueryValue(value = "state") List<State.Type> state
) {
return PagedResults.of(
executionRepository
.findTaskRun(query, PageableUtils.from(page, size, sort), state)
);
return PagedResults.of(executionRepository.findTaskRun(
PageableUtils.from(page, size, sort, executionRepository.sortMapping()),
query,
namespace,
flowId,
startDate,
endDate,
state
));
}
@ExecuteOn(TaskExecutors.IO)

View File

@@ -81,24 +81,26 @@ public class FlowController {
@Get(uri = "/search", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Flows"}, summary = "Search for flows")
public PagedResults<Flow> find(
@Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query,
@Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page,
@Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort,
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable @QueryValue(value = "namespace") String namespace
) throws HttpStatusException {
return PagedResults.of(flowRepository.find(query, PageableUtils.from(page, size, sort)));
return PagedResults.of(flowRepository.find(PageableUtils.from(page, size, sort), query, namespace));
}
@ExecuteOn(TaskExecutors.IO)
@Get(uri = "/source", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Flows"}, summary = "Search for flows source code")
public PagedResults<SearchResult<Flow>> source(
@Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query,
@Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page,
@Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort,
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable @QueryValue(value = "namespace") String namespace
) throws HttpStatusException {
return PagedResults.of(flowRepository.findSourceCode(query, PageableUtils.from(page, size, sort)));
return PagedResults.of(flowRepository.findSourceCode(PageableUtils.from(page, size, sort), query, namespace));
}
@ExecuteOn(TaskExecutors.IO)

View File

@@ -1,6 +1,7 @@
package io.kestra.webserver.controllers;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.convert.format.Format;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
@@ -11,7 +12,6 @@ import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
@@ -22,6 +22,7 @@ import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import org.slf4j.event.Level;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import io.micronaut.core.annotation.Nullable;
@@ -43,14 +44,16 @@ public class LogController {
@Get(uri = "logs/search", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Logs"}, summary = "Search for logs")
public PagedResults<LogEntry> find(
@Parameter(description = "Lucene string filter") @QueryValue(value = "q") String query,
@Parameter(description = "Lucene string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "The current page") @QueryValue(value = "page", defaultValue = "1") int page,
@Parameter(description = "The current page size") @QueryValue(value = "size", defaultValue = "10") int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue(value = "sort") List<String> sort,
@Parameter(description = "The min log level filter") @Nullable @QueryValue(value = "minLevel") Level minLevel
) {
@Parameter(description = "The min log level filter") @Nullable @QueryValue(value = "minLevel") Level minLevel,
@Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate
) {
return PagedResults.of(
logRepository.find(query, PageableUtils.from(page, size, sort), minLevel)
logRepository.find(PageableUtils.from(page, size, sort), query, minLevel, startDate, endDate)
);
}

View File

@@ -31,30 +31,37 @@ public class StatsController {
@Post(uri = "executions/daily", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Stats"}, summary = "Get daily statistics for executions")
public List<DailyExecutionStatistics> dailyStatistics(
@Parameter(description = "Lucene string filter") String q,
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime, default to now - 30 days") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime, default to now") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate
) {
// @TODO: seems to be converted back to utc by micronaut
return executionRepository.dailyStatistics(
q,
query,
namespace,
flowId,
startDate != null ? startDate.withZoneSameInstant(ZoneId.systemDefault()) : null,
endDate != null ? endDate.withZoneSameInstant(ZoneId.systemDefault()) : null,
false
);
}
@ExecuteOn(TaskExecutors.IO)
@Post(uri = "taskruns/daily", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Stats"}, summary = "Get daily statistics for taskRuns")
public List<DailyExecutionStatistics> taskRunsDailyStatistics(
@Parameter(description = "Lucene string filter") String q,
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime, default to now - 30 days") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime, default to now") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate
) {
return executionRepository.dailyStatistics(
q,
query,
namespace,
flowId,
startDate != null ? startDate.withZoneSameInstant(ZoneId.systemDefault()) : null,
endDate != null ? endDate.withZoneSameInstant(ZoneId.systemDefault()) : null,
true
@@ -65,14 +72,18 @@ public class StatsController {
@Post(uri = "executions/daily/group-by-flow", produces = MediaType.TEXT_JSON)
@Operation(tags = {"Stats"}, summary = "Get daily statistics for executions group by namespaces and flows")
public Map<String, Map<String, List<DailyExecutionStatistics>>> dailyGroupByFlowStatistics(
@Parameter(description = "Lucene string filter") String q,
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "A namespace filter prefix") @Nullable String namespace,
@Parameter(description = "A flow id filter") @Nullable String flowId,
@Parameter(description = "The start datetime, default to now - 30 days") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime startDate,
@Parameter(description = "The end datetime, default to now") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") ZonedDateTime endDate,
@Parameter(description = "Return only namespace result and skip flows") @Nullable Boolean namespaceOnly
) {
return executionRepository.dailyGroupByFlowStatistics(
q,
query,
namespace,
flowId,
startDate != null ? startDate.withZoneSameInstant(ZoneId.systemDefault()) : null,
endDate != null ? endDate.withZoneSameInstant(ZoneId.systemDefault()) : null,
namespaceOnly != null && namespaceOnly

View File

@@ -6,18 +6,27 @@ import io.micronaut.http.HttpStatus;
import io.micronaut.http.exceptions.HttpStatusException;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
public class PageableUtils {
public static Pageable from(int page, int size, List<String> sort, Function<String, String> sortMapper) throws HttpStatusException {
return Pageable.from(
page,
size,
sort(sort, sortMapper)
);
}
public static Pageable from(int page, int size, List<String> sort) throws HttpStatusException {
return Pageable.from(
page,
size,
sort(sort)
sort(sort, null)
);
}
public static Sort sort(List<String> sort) {
protected static Sort sort(List<String> sort, Function<String, String> sortMapper) {
return sort == null ? null :
Sort.of(sort
.stream()
@@ -26,7 +35,13 @@ public class PageableUtils {
if (split.length != 2) {
throw new HttpStatusException(HttpStatus.UNPROCESSABLE_ENTITY, "Invalid sort parameter");
}
return split[1].equals("asc") ? Sort.Order.asc(split[0]) : Sort.Order.desc(split[0]);
String col = split[0];
if (sortMapper != null) {
col = sortMapper.apply(col);
}
return split[1].equals("asc") ? Sort.Order.asc(col) : Sort.Order.desc(col);
})
.collect(Collectors.toList())
);