mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
1 Commits
dependabot
...
fix-execut
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01af20ad6d |
@@ -86,10 +86,11 @@ public class State {
|
|||||||
|
|
||||||
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
|
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
|
||||||
public Duration getDuration() {
|
public Duration getDuration() {
|
||||||
return Duration.between(
|
if(this.getEndDate().isPresent()){
|
||||||
this.histories.getFirst().getDate(),
|
return Duration.between(this.getStartDate(), this.getEndDate().get());
|
||||||
this.histories.size() > 1 ? this.histories.get(this.histories.size() - 1).getDate() : Instant.now()
|
} else {
|
||||||
);
|
return Duration.between(this.getStartDate(), Instant.now());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
|
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
|
||||||
|
|||||||
@@ -0,0 +1,55 @@
|
|||||||
|
package io.kestra.core.models.executions;
|
||||||
|
|
||||||
|
import io.kestra.core.models.flows.State;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
public class StateDurationTest {
|
||||||
|
|
||||||
|
|
||||||
|
private static final Instant NOW = Instant.now();
|
||||||
|
private static final Instant ONE = NOW.minus(Duration.ofDays(1000));
|
||||||
|
private static final Instant TWO = ONE.plus(Duration.ofHours(11));
|
||||||
|
private static final Instant THREE = TWO.plus(Duration.ofHours(222));
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void justCreated() {
|
||||||
|
var state = State.of(
|
||||||
|
State.Type.CREATED,
|
||||||
|
List.of(
|
||||||
|
new State.History(State.Type.CREATED, ONE)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertThat(state.getDuration()).isCloseTo(Duration.between(ONE, NOW), Duration.ofMinutes(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void success() {
|
||||||
|
var state = State.of(
|
||||||
|
State.Type.SUCCESS,
|
||||||
|
List.of(
|
||||||
|
new State.History(State.Type.CREATED, ONE),
|
||||||
|
new State.History(State.Type.RUNNING, TWO),
|
||||||
|
new State.History(State.Type.SUCCESS, THREE)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertThat(state.getDuration()).isEqualTo(Duration.between(ONE, THREE));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void isRunning() {
|
||||||
|
var state = State.of(
|
||||||
|
State.Type.RUNNING,
|
||||||
|
List.of(
|
||||||
|
new State.History(State.Type.CREATED, ONE),
|
||||||
|
new State.History(State.Type.RUNNING, TWO)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertThat(state.getDuration()).isCloseTo(Duration.between(ONE, NOW), Duration.ofMinutes(10));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -693,4 +693,91 @@ inject(tenant);
|
|||||||
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
|
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final Instant NOW = Instant.now();
|
||||||
|
private static final Instant INSTANT_ONE = NOW.minus(Duration.ofDays(1000));
|
||||||
|
private static final Instant INSTANT_TWO = INSTANT_ONE.plus(Duration.ofHours(11));
|
||||||
|
private static final Instant INSTANT_THREE = INSTANT_TWO.plus(Duration.ofHours(222));
|
||||||
|
|
||||||
|
@Test
|
||||||
|
protected void findShouldSortCorrectlyOnDurationAndDates() {
|
||||||
|
// given
|
||||||
|
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
|
||||||
|
var createdExecution = Execution.builder()
|
||||||
|
.id("createdExecution__"+FriendlyId.createFriendlyId())
|
||||||
|
.namespace(NAMESPACE)
|
||||||
|
.tenantId(tenant)
|
||||||
|
.flowId(FLOW)
|
||||||
|
.flowRevision(1)
|
||||||
|
.state(
|
||||||
|
State.of(
|
||||||
|
State.Type.CREATED,
|
||||||
|
List.of(
|
||||||
|
new State.History(State.Type.CREATED, INSTANT_ONE)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).build();
|
||||||
|
assertThat(createdExecution.getState().getDuration()).isCloseTo(Duration.ofDays(1000), Duration.ofMinutes(10));
|
||||||
|
executionRepository.save(createdExecution);
|
||||||
|
|
||||||
|
var successExecution = Execution.builder()
|
||||||
|
.id("successExecution__"+FriendlyId.createFriendlyId())
|
||||||
|
.namespace(NAMESPACE)
|
||||||
|
.tenantId(tenant)
|
||||||
|
.flowId(FLOW)
|
||||||
|
.flowRevision(1)
|
||||||
|
.state(
|
||||||
|
State.of(
|
||||||
|
State.Type.SUCCESS,
|
||||||
|
List.of(
|
||||||
|
new State.History(State.Type.CREATED, INSTANT_ONE),
|
||||||
|
new State.History(State.Type.RUNNING, INSTANT_TWO),
|
||||||
|
new State.History(State.Type.SUCCESS, INSTANT_THREE)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).build();
|
||||||
|
assertThat(successExecution.getState().getDuration()).isCloseTo(Duration.ofHours(233), Duration.ofMinutes(10));
|
||||||
|
executionRepository.save(successExecution);
|
||||||
|
|
||||||
|
var runningExecution = Execution.builder()
|
||||||
|
.id("runningExecution__"+FriendlyId.createFriendlyId())
|
||||||
|
.namespace(NAMESPACE)
|
||||||
|
.tenantId(tenant)
|
||||||
|
.flowId(FLOW)
|
||||||
|
.flowRevision(1)
|
||||||
|
.state(
|
||||||
|
State.of(
|
||||||
|
State.Type.RUNNING,
|
||||||
|
List.of(
|
||||||
|
new State.History(State.Type.CREATED, INSTANT_TWO),
|
||||||
|
new State.History(State.Type.RUNNING, INSTANT_THREE)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).build();
|
||||||
|
assertThat(runningExecution.getState().getDuration()).isCloseTo(Duration.ofDays(1000).minus(Duration.ofHours(11)), Duration.ofMinutes(10));
|
||||||
|
executionRepository.save(runningExecution);
|
||||||
|
|
||||||
|
// when
|
||||||
|
List<QueryFilter> emptyFilters = null;
|
||||||
|
var sortedByShortestDuration = executionRepository.find(Pageable.from(Sort.of(Sort.Order.asc("state_duration"))), tenant, emptyFilters);
|
||||||
|
// then
|
||||||
|
assertThat(sortedByShortestDuration.stream())
|
||||||
|
.as("assert order when finding by shortest duration")
|
||||||
|
.usingRecursiveFieldByFieldElementComparatorOnFields("id")
|
||||||
|
.containsExactly(
|
||||||
|
successExecution,
|
||||||
|
runningExecution,
|
||||||
|
createdExecution
|
||||||
|
);
|
||||||
|
|
||||||
|
// when
|
||||||
|
var findByMoreRecentStartDate = executionRepository.find(Pageable.from(1,1, Sort.of(Sort.Order.desc("start_date"))), tenant, emptyFilters);
|
||||||
|
// then
|
||||||
|
assertThat(findByMoreRecentStartDate.stream())
|
||||||
|
.as("assert order when finding by last start date")
|
||||||
|
.usingRecursiveFieldByFieldElementComparatorOnFields("id")
|
||||||
|
.containsExactly(
|
||||||
|
runningExecution
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,13 @@
|
|||||||
|
ALTER TABLE executions
|
||||||
|
ALTER
|
||||||
|
COLUMN "state_duration" FLOAT NOT NULL GENERATED ALWAYS AS (
|
||||||
|
CASE
|
||||||
|
WHEN JQ_STRING("value", '.state.endDate') IS NULL -- in Execution.java end_date is empty if it is not terminated or paused
|
||||||
|
THEN DATEDIFF('MILLISECOND', PARSEDATETIME(LEFT(JQ_STRING("value", '.state.startDate'), 23) || '+00:00',
|
||||||
|
'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'), CURRENT_TIMESTAMP)
|
||||||
|
ELSE DATEDIFF('MILLISECOND', PARSEDATETIME(LEFT(JQ_STRING("value", '.state.startDate'), 23) || '+00:00',
|
||||||
|
'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'),
|
||||||
|
PARSEDATETIME(LEFT(JQ_STRING("value", '.state.endDate'), 23) || '+00:00',
|
||||||
|
'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'))
|
||||||
|
END
|
||||||
|
);
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
ALTER TABLE executions
|
||||||
|
MODIFY COLUMN `state_duration`
|
||||||
|
BIGINT GENERATED ALWAYS AS (
|
||||||
|
TIMESTAMPDIFF(
|
||||||
|
MICROSECOND,
|
||||||
|
CAST(JSON_UNQUOTE(JSON_EXTRACT(value, '$.state.startDate')) AS DATETIME(6)),
|
||||||
|
COALESCE(
|
||||||
|
CAST(JSON_UNQUOTE(JSON_EXTRACT(value, '$.state.endDate')) AS DATETIME(6)),
|
||||||
|
CURRENT_TIMESTAMP(6)
|
||||||
|
)
|
||||||
|
) / 1000
|
||||||
|
) STORED NOT NULL;
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
ALTER TABLE executions
|
||||||
|
ALTER COLUMN "state_duration" TYPE BIGINT
|
||||||
|
GENERATED ALWAYS AS (
|
||||||
|
EXTRACT(EPOCH FROM (
|
||||||
|
COALESCE(
|
||||||
|
PARSE_ISO8601_DATETIME(value #>> '{state,endDate}'),
|
||||||
|
CURRENT_TIMESTAMP
|
||||||
|
)
|
||||||
|
- PARSE_ISO8601_DATETIME(value #>> '{state,startDate}')
|
||||||
|
)) * 1000
|
||||||
|
) STORED;
|
||||||
Reference in New Issue
Block a user