mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
1 Commits
fix/remove
...
fix-execut
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01af20ad6d |
@@ -86,10 +86,11 @@ public class State {
|
||||
|
||||
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
|
||||
public Duration getDuration() {
|
||||
return Duration.between(
|
||||
this.histories.getFirst().getDate(),
|
||||
this.histories.size() > 1 ? this.histories.get(this.histories.size() - 1).getDate() : Instant.now()
|
||||
);
|
||||
if(this.getEndDate().isPresent()){
|
||||
return Duration.between(this.getStartDate(), this.getEndDate().get());
|
||||
} else {
|
||||
return Duration.between(this.getStartDate(), Instant.now());
|
||||
}
|
||||
}
|
||||
|
||||
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package io.kestra.core.models.executions;
|
||||
|
||||
import io.kestra.core.models.flows.State;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class StateDurationTest {
|
||||
|
||||
|
||||
private static final Instant NOW = Instant.now();
|
||||
private static final Instant ONE = NOW.minus(Duration.ofDays(1000));
|
||||
private static final Instant TWO = ONE.plus(Duration.ofHours(11));
|
||||
private static final Instant THREE = TWO.plus(Duration.ofHours(222));
|
||||
|
||||
@Test
|
||||
void justCreated() {
|
||||
var state = State.of(
|
||||
State.Type.CREATED,
|
||||
List.of(
|
||||
new State.History(State.Type.CREATED, ONE)
|
||||
)
|
||||
);
|
||||
assertThat(state.getDuration()).isCloseTo(Duration.between(ONE, NOW), Duration.ofMinutes(10));
|
||||
}
|
||||
|
||||
@Test
|
||||
void success() {
|
||||
var state = State.of(
|
||||
State.Type.SUCCESS,
|
||||
List.of(
|
||||
new State.History(State.Type.CREATED, ONE),
|
||||
new State.History(State.Type.RUNNING, TWO),
|
||||
new State.History(State.Type.SUCCESS, THREE)
|
||||
)
|
||||
);
|
||||
assertThat(state.getDuration()).isEqualTo(Duration.between(ONE, THREE));
|
||||
}
|
||||
|
||||
@Test
|
||||
void isRunning() {
|
||||
var state = State.of(
|
||||
State.Type.RUNNING,
|
||||
List.of(
|
||||
new State.History(State.Type.CREATED, ONE),
|
||||
new State.History(State.Type.RUNNING, TWO)
|
||||
)
|
||||
);
|
||||
assertThat(state.getDuration()).isCloseTo(Duration.between(ONE, NOW), Duration.ofMinutes(10));
|
||||
}
|
||||
}
|
||||
@@ -693,4 +693,91 @@ inject(tenant);
|
||||
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
|
||||
}
|
||||
|
||||
private static final Instant NOW = Instant.now();
|
||||
private static final Instant INSTANT_ONE = NOW.minus(Duration.ofDays(1000));
|
||||
private static final Instant INSTANT_TWO = INSTANT_ONE.plus(Duration.ofHours(11));
|
||||
private static final Instant INSTANT_THREE = INSTANT_TWO.plus(Duration.ofHours(222));
|
||||
|
||||
@Test
|
||||
protected void findShouldSortCorrectlyOnDurationAndDates() {
|
||||
// given
|
||||
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||
var createdExecution = Execution.builder()
|
||||
.id("createdExecution__"+FriendlyId.createFriendlyId())
|
||||
.namespace(NAMESPACE)
|
||||
.tenantId(tenant)
|
||||
.flowId(FLOW)
|
||||
.flowRevision(1)
|
||||
.state(
|
||||
State.of(
|
||||
State.Type.CREATED,
|
||||
List.of(
|
||||
new State.History(State.Type.CREATED, INSTANT_ONE)
|
||||
)
|
||||
)
|
||||
).build();
|
||||
assertThat(createdExecution.getState().getDuration()).isCloseTo(Duration.ofDays(1000), Duration.ofMinutes(10));
|
||||
executionRepository.save(createdExecution);
|
||||
|
||||
var successExecution = Execution.builder()
|
||||
.id("successExecution__"+FriendlyId.createFriendlyId())
|
||||
.namespace(NAMESPACE)
|
||||
.tenantId(tenant)
|
||||
.flowId(FLOW)
|
||||
.flowRevision(1)
|
||||
.state(
|
||||
State.of(
|
||||
State.Type.SUCCESS,
|
||||
List.of(
|
||||
new State.History(State.Type.CREATED, INSTANT_ONE),
|
||||
new State.History(State.Type.RUNNING, INSTANT_TWO),
|
||||
new State.History(State.Type.SUCCESS, INSTANT_THREE)
|
||||
)
|
||||
)
|
||||
).build();
|
||||
assertThat(successExecution.getState().getDuration()).isCloseTo(Duration.ofHours(233), Duration.ofMinutes(10));
|
||||
executionRepository.save(successExecution);
|
||||
|
||||
var runningExecution = Execution.builder()
|
||||
.id("runningExecution__"+FriendlyId.createFriendlyId())
|
||||
.namespace(NAMESPACE)
|
||||
.tenantId(tenant)
|
||||
.flowId(FLOW)
|
||||
.flowRevision(1)
|
||||
.state(
|
||||
State.of(
|
||||
State.Type.RUNNING,
|
||||
List.of(
|
||||
new State.History(State.Type.CREATED, INSTANT_TWO),
|
||||
new State.History(State.Type.RUNNING, INSTANT_THREE)
|
||||
)
|
||||
)
|
||||
).build();
|
||||
assertThat(runningExecution.getState().getDuration()).isCloseTo(Duration.ofDays(1000).minus(Duration.ofHours(11)), Duration.ofMinutes(10));
|
||||
executionRepository.save(runningExecution);
|
||||
|
||||
// when
|
||||
List<QueryFilter> emptyFilters = null;
|
||||
var sortedByShortestDuration = executionRepository.find(Pageable.from(Sort.of(Sort.Order.asc("state_duration"))), tenant, emptyFilters);
|
||||
// then
|
||||
assertThat(sortedByShortestDuration.stream())
|
||||
.as("assert order when finding by shortest duration")
|
||||
.usingRecursiveFieldByFieldElementComparatorOnFields("id")
|
||||
.containsExactly(
|
||||
successExecution,
|
||||
runningExecution,
|
||||
createdExecution
|
||||
);
|
||||
|
||||
// when
|
||||
var findByMoreRecentStartDate = executionRepository.find(Pageable.from(1,1, Sort.of(Sort.Order.desc("start_date"))), tenant, emptyFilters);
|
||||
// then
|
||||
assertThat(findByMoreRecentStartDate.stream())
|
||||
.as("assert order when finding by last start date")
|
||||
.usingRecursiveFieldByFieldElementComparatorOnFields("id")
|
||||
.containsExactly(
|
||||
runningExecution
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
ALTER TABLE executions
|
||||
ALTER
|
||||
COLUMN "state_duration" FLOAT NOT NULL GENERATED ALWAYS AS (
|
||||
CASE
|
||||
WHEN JQ_STRING("value", '.state.endDate') IS NULL -- in Execution.java end_date is empty if it is not terminated or paused
|
||||
THEN DATEDIFF('MILLISECOND', PARSEDATETIME(LEFT(JQ_STRING("value", '.state.startDate'), 23) || '+00:00',
|
||||
'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'), CURRENT_TIMESTAMP)
|
||||
ELSE DATEDIFF('MILLISECOND', PARSEDATETIME(LEFT(JQ_STRING("value", '.state.startDate'), 23) || '+00:00',
|
||||
'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'),
|
||||
PARSEDATETIME(LEFT(JQ_STRING("value", '.state.endDate'), 23) || '+00:00',
|
||||
'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'))
|
||||
END
|
||||
);
|
||||
@@ -0,0 +1,12 @@
|
||||
ALTER TABLE executions
|
||||
MODIFY COLUMN `state_duration`
|
||||
BIGINT GENERATED ALWAYS AS (
|
||||
TIMESTAMPDIFF(
|
||||
MICROSECOND,
|
||||
CAST(JSON_UNQUOTE(JSON_EXTRACT(value, '$.state.startDate')) AS DATETIME(6)),
|
||||
COALESCE(
|
||||
CAST(JSON_UNQUOTE(JSON_EXTRACT(value, '$.state.endDate')) AS DATETIME(6)),
|
||||
CURRENT_TIMESTAMP(6)
|
||||
)
|
||||
) / 1000
|
||||
) STORED NOT NULL;
|
||||
@@ -0,0 +1,11 @@
|
||||
ALTER TABLE executions
|
||||
ALTER COLUMN "state_duration" TYPE BIGINT
|
||||
GENERATED ALWAYS AS (
|
||||
EXTRACT(EPOCH FROM (
|
||||
COALESCE(
|
||||
PARSE_ISO8601_DATETIME(value #>> '{state,endDate}'),
|
||||
CURRENT_TIMESTAMP
|
||||
)
|
||||
- PARSE_ISO8601_DATETIME(value #>> '{state,startDate}')
|
||||
)) * 1000
|
||||
) STORED;
|
||||
Reference in New Issue
Block a user