mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
@@ -158,6 +158,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
).trigger(executionTrigger).build());
|
||||
}
|
||||
|
||||
// add a NORMAL kind execution, it should be fetched correctly
|
||||
executionRepository.save(builder(
|
||||
tenantId,
|
||||
State.Type.SUCCESS,
|
||||
null
|
||||
)
|
||||
.trigger(executionTrigger)
|
||||
.kind(ExecutionKind.NORMAL)
|
||||
.build());
|
||||
|
||||
// add a test execution, this should be ignored in search & statistics
|
||||
executionRepository.save(builder(
|
||||
tenantId,
|
||||
@@ -182,16 +192,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
|
||||
static Stream<Arguments> filterCombinations() {
|
||||
return Stream.of(
|
||||
Arguments.of(QueryFilter.builder().field(Field.QUERY).value("unittest").operation(Op.EQUALS).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.unittest").operation(Op.EQUALS).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.QUERY).value("unittest").operation(Op.EQUALS).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.SCOPE).value(List.of(USER)).operation(Op.EQUALS).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.NAMESPACE).value("io.kestra.unittest").operation(Op.EQUALS).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.LABELS).value(Map.of("key", "value")).operation(Op.EQUALS).build(), 1),
|
||||
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(FLOW).operation(Op.EQUALS).build(), 15),
|
||||
Arguments.of(QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.FLOW_ID).value(FLOW).operation(Op.EQUALS).build(), 16),
|
||||
Arguments.of(QueryFilter.builder().field(Field.START_DATE).value(ZonedDateTime.now().minusMinutes(1)).operation(Op.GREATER_THAN).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.END_DATE).value(ZonedDateTime.now().plusMinutes(1)).operation(Op.LESS_THAN).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.STATE).value(Type.RUNNING).operation(Op.EQUALS).build(), 5),
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(), 28),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(), 28)
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(), 29),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(), 29)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -219,7 +229,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
inject(tenant);
|
||||
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
|
||||
List<QueryFilter> filters = List.of(QueryFilter.builder()
|
||||
@@ -283,7 +293,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.value("io.kestra")
|
||||
.build());
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -300,7 +310,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.value(executionTriggerId)
|
||||
.build());
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
|
||||
filters = List.of(QueryFilter.builder()
|
||||
@@ -310,7 +320,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.build());
|
||||
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
assertThat(executions.getFirst().getTrigger().getVariables().get("executionId")).isEqualTo(executionTriggerId);
|
||||
|
||||
@@ -321,12 +331,12 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.build());
|
||||
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, filters );
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
assertThat(executions.getFirst().getTrigger()).isNull();
|
||||
|
||||
executions = executionRepository.find(Pageable.from(1, 10), tenant, null);
|
||||
assertThat(executions.getTotal()).isEqualTo(56L);
|
||||
assertThat(executions.getTotal()).isEqualTo(58L);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -335,7 +345,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
inject(tenant);
|
||||
|
||||
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), tenant, null);
|
||||
assertThat(executions.getTotal()).isEqualTo(28L);
|
||||
assertThat(executions.getTotal()).isEqualTo(29L);
|
||||
assertThat(executions.size()).isEqualTo(10);
|
||||
|
||||
var filters = List.of(QueryFilter.builder()
|
||||
@@ -661,7 +671,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
inject(tenant);
|
||||
|
||||
List<Execution> executions = executionRepository.findAllAsync(tenant).collectList().block();
|
||||
assertThat(executions).hasSize(29); // used by the backup so it contains TEST executions
|
||||
assertThat(executions).hasSize(30); // used by the backup so it contains TEST executions
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -255,31 +255,33 @@ public abstract class AbstractLogRepositoryTest {
|
||||
for (int i = 0; i < 20; i++) {
|
||||
logRepository.save(builder.build());
|
||||
}
|
||||
// normal kind should also be retrieved
|
||||
logRepository.save(builder.executionKind(ExecutionKind.NORMAL).build());
|
||||
|
||||
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(1, 50));
|
||||
|
||||
assertThat(find.size()).isEqualTo(50);
|
||||
assertThat(find.getTotal()).isEqualTo(101L);
|
||||
assertThat(find.getTotal()).isEqualTo(102L);
|
||||
|
||||
find = logRepository.findByExecutionId(tenant, executionId, null, Pageable.from(3, 50));
|
||||
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
assertThat(find.getTotal()).isEqualTo(101L);
|
||||
assertThat(find.size()).isEqualTo(2);
|
||||
assertThat(find.getTotal()).isEqualTo(102L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskId(tenant, executionId, logEntry2.getTaskId(), null, Pageable.from(1, 50));
|
||||
|
||||
assertThat(find.size()).isEqualTo(21);
|
||||
assertThat(find.getTotal()).isEqualTo(21L);
|
||||
assertThat(find.size()).isEqualTo(22);
|
||||
assertThat(find.getTotal()).isEqualTo(22L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(1, 10));
|
||||
|
||||
assertThat(find.size()).isEqualTo(10);
|
||||
assertThat(find.getTotal()).isEqualTo(21L);
|
||||
assertThat(find.getTotal()).isEqualTo(22L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenant, executionId, logEntry2.getTaskRunId(), null, 0, Pageable.from(1, 10));
|
||||
|
||||
assertThat(find.size()).isEqualTo(10);
|
||||
assertThat(find.getTotal()).isEqualTo(21L);
|
||||
assertThat(find.getTotal()).isEqualTo(22L);
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunId(tenant, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
|
||||
|
||||
|
||||
@@ -33,20 +33,22 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
TaskRun taskRun1 = taskRun(tenant, executionId, "task");
|
||||
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"), null);
|
||||
MetricEntry testCounter = MetricEntry.of(taskRun1, counter("test"), ExecutionKind.TEST);
|
||||
MetricEntry normalCounter = MetricEntry.of(taskRun1, counter("normal"), ExecutionKind.NORMAL);
|
||||
TaskRun taskRun2 = taskRun(tenant, executionId, "task");
|
||||
MetricEntry timer = MetricEntry.of(taskRun2, timer(), null);
|
||||
metricRepository.save(counter);
|
||||
metricRepository.save(testCounter); // should only be retrieved by execution id
|
||||
metricRepository.save(normalCounter);
|
||||
metricRepository.save(timer);
|
||||
|
||||
List<MetricEntry> results = metricRepository.findByExecutionId(tenant, executionId, Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
assertThat(results.size()).isEqualTo(4);
|
||||
|
||||
results = metricRepository.findByExecutionIdAndTaskId(tenant, executionId, taskRun1.getTaskId(), Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
assertThat(results.size()).isEqualTo(4);
|
||||
|
||||
results = metricRepository.findByExecutionIdAndTaskRunId(tenant, executionId, taskRun1.getId(), Pageable.from(1, 10));
|
||||
assertThat(results.size()).isEqualTo(2);
|
||||
assertThat(results.size()).isEqualTo(3);
|
||||
|
||||
MetricAggregations aggregationResults = metricRepository.aggregateByFlowId(
|
||||
tenant,
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.filters.*;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
|
||||
import io.kestra.core.models.executions.statistics.ExecutionCount;
|
||||
import io.kestra.core.models.executions.statistics.ExecutionStatistics;
|
||||
@@ -63,7 +64,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcCrudRe
|
||||
private static final Field<String> STATE_CURRENT_FIELD = field("state_current", String.class);
|
||||
private static final Field<String> NAMESPACE_FIELD = field("namespace", String.class);
|
||||
private static final Field<Object> START_DATE_FIELD = field("start_date");
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull();
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("kind").isNull().or(field("kind").eq(ExecutionKind.NORMAL.name()));
|
||||
|
||||
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
||||
private final ApplicationContext applicationContext;
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
@@ -30,7 +31,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
public abstract class AbstractJdbcLogRepository extends AbstractJdbcCrudRepository<LogEntry> implements LogRepositoryInterface {
|
||||
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull().or(field("execution_kind").eq(ExecutionKind.NORMAL.name()));
|
||||
private static final String DATE_COLUMN = "timestamp";
|
||||
|
||||
public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository,
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
import io.kestra.core.models.dashboards.filters.AbstractFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.models.executions.metrics.MetricAggregation;
|
||||
import io.kestra.core.models.executions.metrics.MetricAggregations;
|
||||
@@ -32,7 +33,7 @@ import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcCrudRepository<MetricEntry> implements MetricRepositoryInterface {
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
|
||||
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull().or(field("execution_kind").eq(ExecutionKind.NORMAL.name()));
|
||||
|
||||
public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository,
|
||||
QueueService queueService,
|
||||
|
||||
Reference in New Issue
Block a user