|
|
|
|
@@ -1,6 +1,7 @@
|
|
|
|
|
package io.kestra.core.repositories;
|
|
|
|
|
|
|
|
|
|
import com.devskiller.friendly_id.FriendlyId;
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
|
|
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
|
|
|
|
import io.kestra.core.junit.annotations.KestraTest;
|
|
|
|
|
@@ -23,6 +24,7 @@ import io.kestra.core.models.flows.State.Type;
|
|
|
|
|
import io.kestra.core.models.property.Property;
|
|
|
|
|
import io.kestra.core.models.tasks.ResolvedTask;
|
|
|
|
|
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
|
|
|
|
import io.kestra.core.serializers.JacksonMapper;
|
|
|
|
|
import io.kestra.core.utils.IdUtils;
|
|
|
|
|
import io.kestra.core.utils.NamespaceUtils;
|
|
|
|
|
import io.kestra.core.utils.TestsUtils;
|
|
|
|
|
@@ -38,17 +40,18 @@ import org.junit.jupiter.params.provider.MethodSource;
|
|
|
|
|
import org.slf4j.event.Level;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.time.Duration;
|
|
|
|
|
import java.time.Instant;
|
|
|
|
|
import java.time.ZoneId;
|
|
|
|
|
import java.time.ZonedDateTime;
|
|
|
|
|
import java.sql.Timestamp;
|
|
|
|
|
import java.time.*;
|
|
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
|
|
import java.time.temporal.ChronoUnit;
|
|
|
|
|
import java.util.*;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
|
|
|
|
import static io.kestra.core.models.flows.FlowScope.USER;
|
|
|
|
|
import static java.time.temporal.ChronoUnit.MINUTES;
|
|
|
|
|
import static java.time.temporal.ChronoUnit.SECONDS;
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
|
|
|
import static org.mockito.Mockito.doReturn;
|
|
|
|
|
@@ -119,7 +122,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Random rand = new Random();
|
|
|
|
|
doReturn(Duration.ofSeconds(rand.nextInt(150)))
|
|
|
|
|
doReturn(Optional.of(Duration.ofSeconds(rand.nextInt(150))))
|
|
|
|
|
.when(finalState)
|
|
|
|
|
.getDuration();
|
|
|
|
|
|
|
|
|
|
@@ -602,8 +605,10 @@ public abstract class AbstractExecutionRepositoryTest {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
protected void fetchData() throws IOException {
|
|
|
|
|
String tenantId = "data-tenant";
|
|
|
|
|
protected void dashboard_fetchData() throws IOException {
|
|
|
|
|
var tenantId = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
|
|
|
|
var executionDuration = Duration.ofMinutes(220);
|
|
|
|
|
var executionCreateDate = Instant.now();
|
|
|
|
|
Execution execution = Execution.builder()
|
|
|
|
|
.tenantId(tenantId)
|
|
|
|
|
.id(IdUtils.create())
|
|
|
|
|
@@ -611,29 +616,30 @@ public abstract class AbstractExecutionRepositoryTest {
|
|
|
|
|
.flowId("some-execution")
|
|
|
|
|
.flowRevision(1)
|
|
|
|
|
.labels(Label.from(Map.of("country", "FR")))
|
|
|
|
|
.state(new State(State.Type.CREATED, List.of(new State.History(State.Type.CREATED, Instant.now()))))
|
|
|
|
|
.state(new State(Type.SUCCESS,
|
|
|
|
|
List.of(new State.History(State.Type.CREATED, executionCreateDate), new State.History(Type.SUCCESS, executionCreateDate.plus(executionDuration)))))
|
|
|
|
|
.taskRunList(List.of())
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
execution = executionRepository.save(execution);
|
|
|
|
|
|
|
|
|
|
var now = ZonedDateTime.now();
|
|
|
|
|
ArrayListTotal<Map<String, Object>> data = executionRepository.fetchData(tenantId, Executions.builder()
|
|
|
|
|
.type(Executions.class.getName())
|
|
|
|
|
.columns(Map.of(
|
|
|
|
|
"count", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.ID).agg(AggregationType.COUNT).build(),
|
|
|
|
|
"country", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.LABELS).labelKey("country").build(),
|
|
|
|
|
"date", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.START_DATE).build()
|
|
|
|
|
"id", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.ID).build(),
|
|
|
|
|
"date", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.START_DATE).build(),
|
|
|
|
|
"duration", ColumnDescriptor.<Executions.Fields>builder().field(Executions.Fields.DURATION).build()
|
|
|
|
|
)).build(),
|
|
|
|
|
ZonedDateTime.now().minus(1, ChronoUnit.HOURS),
|
|
|
|
|
ZonedDateTime.now(),
|
|
|
|
|
now.minusHours(1),
|
|
|
|
|
now,
|
|
|
|
|
null
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(data.getTotal()).isEqualTo(1L);
|
|
|
|
|
assertThat(data.get(0).get("count")).isEqualTo(1L);
|
|
|
|
|
assertThat(data.get(0).get("country")).isEqualTo("FR");
|
|
|
|
|
Instant startDate = execution.getState().getStartDate();
|
|
|
|
|
assertThat(data.get(0).get("date")).isEqualTo(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(ZonedDateTime.ofInstant(startDate, ZoneId.systemDefault()).withSecond(0).withNano(0)));
|
|
|
|
|
assertThat(data).first().hasFieldOrPropertyWithValue("count", 1);
|
|
|
|
|
assertThat(data).first().hasFieldOrPropertyWithValue("id", execution.getId());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static Execution buildWithCreatedDate(String tenant, Instant instant) {
|
|
|
|
|
@@ -776,6 +782,197 @@ inject(tenant);
|
|
|
|
|
.containsOnly(exec1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
record ExecutionSortTestData(Execution createdExecution, Execution successExecution, Execution runningExecution, Execution failedExecution){
|
|
|
|
|
static ExecutionSortTestData insertExecutionsTestData(String tenant, ExecutionRepositoryInterface executionRepository) {
|
|
|
|
|
final Instant clock = Instant.now();
|
|
|
|
|
final AtomicInteger passedTime = new AtomicInteger();
|
|
|
|
|
var ten = 10;
|
|
|
|
|
|
|
|
|
|
var createdExecution = Execution.builder()
|
|
|
|
|
.id("createdExecution__" + FriendlyId.createFriendlyId())
|
|
|
|
|
.namespace(NAMESPACE)
|
|
|
|
|
.tenantId(tenant)
|
|
|
|
|
.flowId(FLOW)
|
|
|
|
|
.flowRevision(1)
|
|
|
|
|
.state(
|
|
|
|
|
State.of(
|
|
|
|
|
State.Type.CREATED,
|
|
|
|
|
List.of(
|
|
|
|
|
new State.History(State.Type.CREATED, clock)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
).build();
|
|
|
|
|
executionRepository.save(createdExecution);
|
|
|
|
|
|
|
|
|
|
var successExecution = Execution.builder()
|
|
|
|
|
.id("successExecution__" + FriendlyId.createFriendlyId())
|
|
|
|
|
.namespace(NAMESPACE)
|
|
|
|
|
.tenantId(tenant)
|
|
|
|
|
.flowId(FLOW)
|
|
|
|
|
.flowRevision(1)
|
|
|
|
|
.state(
|
|
|
|
|
State.of(
|
|
|
|
|
State.Type.SUCCESS,
|
|
|
|
|
List.of(
|
|
|
|
|
new State.History(State.Type.CREATED, clock.plus(passedTime.addAndGet(ten), SECONDS)),
|
|
|
|
|
new State.History(Type.QUEUED, clock.plus(passedTime.get(), SECONDS)),
|
|
|
|
|
new State.History(State.Type.RUNNING, clock.plus(passedTime.addAndGet(ten), SECONDS)),
|
|
|
|
|
new State.History(State.Type.SUCCESS, clock.plus(passedTime.addAndGet(ten), SECONDS))
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
).build();
|
|
|
|
|
try {
|
|
|
|
|
var res= JacksonMapper.ofJson().writeValueAsString(successExecution);
|
|
|
|
|
System.out.println(res);
|
|
|
|
|
} catch (JsonProcessingException e) {
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
}
|
|
|
|
|
assertThat(successExecution.getState().getDuration().get()).isCloseTo(Duration.ofSeconds(20), Duration.ofMillis(3));
|
|
|
|
|
executionRepository.save(successExecution);
|
|
|
|
|
|
|
|
|
|
var runningExecution = Execution.builder()
|
|
|
|
|
.id("runningExecution__" + FriendlyId.createFriendlyId())
|
|
|
|
|
.namespace(NAMESPACE)
|
|
|
|
|
.tenantId(tenant)
|
|
|
|
|
.flowId(FLOW)
|
|
|
|
|
.flowRevision(1)
|
|
|
|
|
.state(
|
|
|
|
|
State.of(
|
|
|
|
|
State.Type.RUNNING,
|
|
|
|
|
List.of(
|
|
|
|
|
new State.History(State.Type.CREATED, clock.plus(passedTime.addAndGet(ten), SECONDS)),
|
|
|
|
|
new State.History(State.Type.RUNNING, clock.plus(passedTime.addAndGet(ten), SECONDS))
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
).build();
|
|
|
|
|
assertThat(runningExecution.getState().getDuration()).isEmpty();
|
|
|
|
|
executionRepository.save(runningExecution);
|
|
|
|
|
|
|
|
|
|
var failedExecution = Execution.builder()
|
|
|
|
|
.id("failedExecution__" + FriendlyId.createFriendlyId())
|
|
|
|
|
.namespace(NAMESPACE)
|
|
|
|
|
.tenantId(tenant)
|
|
|
|
|
.flowId(FLOW)
|
|
|
|
|
.flowRevision(1)
|
|
|
|
|
.state(
|
|
|
|
|
State.of(
|
|
|
|
|
Type.FAILED,
|
|
|
|
|
List.of(
|
|
|
|
|
new State.History(State.Type.CREATED, clock.plus(passedTime.addAndGet(ten), SECONDS)),
|
|
|
|
|
new State.History(Type.FAILED, clock.plus(passedTime.addAndGet(ten), SECONDS))
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
).build();
|
|
|
|
|
assertThat(failedExecution.getState().getDuration().get()).isCloseTo(Duration.ofSeconds(10), Duration.ofMillis(3));
|
|
|
|
|
executionRepository.save(failedExecution);
|
|
|
|
|
|
|
|
|
|
return new ExecutionSortTestData(createdExecution, successExecution, runningExecution, failedExecution);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
protected void findShouldSortCorrectlyOnDurationAsc() {
|
|
|
|
|
// given
|
|
|
|
|
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
|
|
|
|
var testData = ExecutionSortTestData.insertExecutionsTestData(tenant, executionRepository);
|
|
|
|
|
|
|
|
|
|
// when
|
|
|
|
|
List<QueryFilter> emptyFilters = null;
|
|
|
|
|
var sort = Sort.of(Sort.Order.asc("state_duration"));
|
|
|
|
|
var sortedByShortestDuration = executionRepository.find(Pageable.from(sort), tenant, emptyFilters);
|
|
|
|
|
|
|
|
|
|
// then
|
|
|
|
|
assertThat(sortedByShortestDuration.stream())
|
|
|
|
|
.as("assert non-terminated are at the top (list position 0 and 1)")
|
|
|
|
|
.map(Execution::getId)
|
|
|
|
|
.elements(0, 1).containsExactlyInAnyOrder(
|
|
|
|
|
testData.runningExecution().getId(),
|
|
|
|
|
testData.createdExecution().getId()
|
|
|
|
|
);
|
|
|
|
|
assertThat(sortedByShortestDuration.stream())
|
|
|
|
|
.as("assert terminated are at the bot and sorted")
|
|
|
|
|
.map(Execution::getId)
|
|
|
|
|
.elements(2, 3).containsExactly(
|
|
|
|
|
testData.failedExecution().getId(),
|
|
|
|
|
testData.successExecution().getId()
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
protected void findShouldSortCorrectlyOnDurationDesc() {
|
|
|
|
|
// given
|
|
|
|
|
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
|
|
|
|
var testData = ExecutionSortTestData.insertExecutionsTestData(tenant, executionRepository);
|
|
|
|
|
|
|
|
|
|
// when
|
|
|
|
|
List<QueryFilter> emptyFilters = null;
|
|
|
|
|
var sort = Sort.of(Sort.Order.desc("state_duration"));
|
|
|
|
|
var sortedByLongestDuration = executionRepository.find(Pageable.from(sort), tenant, emptyFilters);
|
|
|
|
|
|
|
|
|
|
// then
|
|
|
|
|
assertThat(sortedByLongestDuration.stream())
|
|
|
|
|
.as("assert terminated are at the top and sorted")
|
|
|
|
|
.map(Execution::getId)
|
|
|
|
|
.elements(0, 1).containsExactly(
|
|
|
|
|
testData.successExecution().getId(),
|
|
|
|
|
testData.failedExecution().getId()
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(sortedByLongestDuration.stream())
|
|
|
|
|
.as("assert non-terminated are at the bottom (list position 2 and 3)")
|
|
|
|
|
.map(Execution::getId)
|
|
|
|
|
.elements(2, 3).containsExactlyInAnyOrder(
|
|
|
|
|
testData.runningExecution().getId(),
|
|
|
|
|
testData.createdExecution().getId()
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
protected void findShouldOrderByStartDateAsc() {
|
|
|
|
|
// given
|
|
|
|
|
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
|
|
|
|
var testData = ExecutionSortTestData.insertExecutionsTestData(tenant, executionRepository);
|
|
|
|
|
|
|
|
|
|
// when
|
|
|
|
|
List<QueryFilter> emptyFilters = null;
|
|
|
|
|
var sort = Sort.of(Sort.Order.asc("start_date"));
|
|
|
|
|
var page = Pageable.from(1, 1, sort);
|
|
|
|
|
var findByMoreRecentStartDate = executionRepository.find(
|
|
|
|
|
page,
|
|
|
|
|
tenant,
|
|
|
|
|
emptyFilters
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// then
|
|
|
|
|
assertThat(findByMoreRecentStartDate.stream())
|
|
|
|
|
.as("assert order when finding by first start date")
|
|
|
|
|
.map(Execution::getId)
|
|
|
|
|
.containsExactly(testData.createdExecution().getId());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
protected void findShouldOrderByStartDateDesc() {
|
|
|
|
|
// given
|
|
|
|
|
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
|
|
|
|
var testData = ExecutionSortTestData.insertExecutionsTestData(tenant, executionRepository);
|
|
|
|
|
|
|
|
|
|
// when
|
|
|
|
|
List<QueryFilter> emptyFilters = null;
|
|
|
|
|
var sort = Sort.of(Sort.Order.desc("start_date"));
|
|
|
|
|
var page = Pageable.from(1, 1, sort);
|
|
|
|
|
var findByMoreRecentStartDate = executionRepository.find(
|
|
|
|
|
page,
|
|
|
|
|
tenant,
|
|
|
|
|
emptyFilters
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// then
|
|
|
|
|
assertThat(findByMoreRecentStartDate.stream())
|
|
|
|
|
.as("assert order when finding by last start date")
|
|
|
|
|
.map(Execution::getId)
|
|
|
|
|
.containsExactly(testData.failedExecution().getId());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
|
|
|
|
|
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
|
|
|
|
|