Compare commits

...

1 Commits

Author SHA1 Message Date
Roman Acevedo
01af20ad6d fix(executions): make state_duration generated on queries
- fixes https://github.com/kestra-io/kestra/issues/11593
2025-10-06 10:11:44 +02:00
6 changed files with 183 additions and 4 deletions

View File

@@ -86,10 +86,11 @@ public class State {
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
public Duration getDuration() {
return Duration.between(
this.histories.getFirst().getDate(),
this.histories.size() > 1 ? this.histories.get(this.histories.size() - 1).getDate() : Instant.now()
);
if(this.getEndDate().isPresent()){
return Duration.between(this.getStartDate(), this.getEndDate().get());
} else {
return Duration.between(this.getStartDate(), Instant.now());
}
}
@JsonProperty(access = JsonProperty.Access.READ_ONLY)

View File

@@ -0,0 +1,55 @@
package io.kestra.core.models.executions;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class StateDurationTest {
private static final Instant NOW = Instant.now();
private static final Instant ONE = NOW.minus(Duration.ofDays(1000));
private static final Instant TWO = ONE.plus(Duration.ofHours(11));
private static final Instant THREE = TWO.plus(Duration.ofHours(222));
@Test
void justCreated() {
var state = State.of(
State.Type.CREATED,
List.of(
new State.History(State.Type.CREATED, ONE)
)
);
assertThat(state.getDuration()).isCloseTo(Duration.between(ONE, NOW), Duration.ofMinutes(10));
}
@Test
void success() {
var state = State.of(
State.Type.SUCCESS,
List.of(
new State.History(State.Type.CREATED, ONE),
new State.History(State.Type.RUNNING, TWO),
new State.History(State.Type.SUCCESS, THREE)
)
);
assertThat(state.getDuration()).isEqualTo(Duration.between(ONE, THREE));
}
@Test
void isRunning() {
var state = State.of(
State.Type.RUNNING,
List.of(
new State.History(State.Type.CREATED, ONE),
new State.History(State.Type.RUNNING, TWO)
)
);
assertThat(state.getDuration()).isCloseTo(Duration.between(ONE, NOW), Duration.ofMinutes(10));
}
}

View File

@@ -693,4 +693,91 @@ inject(tenant);
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
}
private static final Instant NOW = Instant.now();
private static final Instant INSTANT_ONE = NOW.minus(Duration.ofDays(1000));
private static final Instant INSTANT_TWO = INSTANT_ONE.plus(Duration.ofHours(11));
private static final Instant INSTANT_THREE = INSTANT_TWO.plus(Duration.ofHours(222));
@Test
protected void findShouldSortCorrectlyOnDurationAndDates() {
// given
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var createdExecution = Execution.builder()
.id("createdExecution__"+FriendlyId.createFriendlyId())
.namespace(NAMESPACE)
.tenantId(tenant)
.flowId(FLOW)
.flowRevision(1)
.state(
State.of(
State.Type.CREATED,
List.of(
new State.History(State.Type.CREATED, INSTANT_ONE)
)
)
).build();
assertThat(createdExecution.getState().getDuration()).isCloseTo(Duration.ofDays(1000), Duration.ofMinutes(10));
executionRepository.save(createdExecution);
var successExecution = Execution.builder()
.id("successExecution__"+FriendlyId.createFriendlyId())
.namespace(NAMESPACE)
.tenantId(tenant)
.flowId(FLOW)
.flowRevision(1)
.state(
State.of(
State.Type.SUCCESS,
List.of(
new State.History(State.Type.CREATED, INSTANT_ONE),
new State.History(State.Type.RUNNING, INSTANT_TWO),
new State.History(State.Type.SUCCESS, INSTANT_THREE)
)
)
).build();
assertThat(successExecution.getState().getDuration()).isCloseTo(Duration.ofHours(233), Duration.ofMinutes(10));
executionRepository.save(successExecution);
var runningExecution = Execution.builder()
.id("runningExecution__"+FriendlyId.createFriendlyId())
.namespace(NAMESPACE)
.tenantId(tenant)
.flowId(FLOW)
.flowRevision(1)
.state(
State.of(
State.Type.RUNNING,
List.of(
new State.History(State.Type.CREATED, INSTANT_TWO),
new State.History(State.Type.RUNNING, INSTANT_THREE)
)
)
).build();
assertThat(runningExecution.getState().getDuration()).isCloseTo(Duration.ofDays(1000).minus(Duration.ofHours(11)), Duration.ofMinutes(10));
executionRepository.save(runningExecution);
// when
List<QueryFilter> emptyFilters = null;
var sortedByShortestDuration = executionRepository.find(Pageable.from(Sort.of(Sort.Order.asc("state_duration"))), tenant, emptyFilters);
// then
assertThat(sortedByShortestDuration.stream())
.as("assert order when finding by shortest duration")
.usingRecursiveFieldByFieldElementComparatorOnFields("id")
.containsExactly(
successExecution,
runningExecution,
createdExecution
);
// when
var findByMoreRecentStartDate = executionRepository.find(Pageable.from(1,1, Sort.of(Sort.Order.desc("start_date"))), tenant, emptyFilters);
// then
assertThat(findByMoreRecentStartDate.stream())
.as("assert order when finding by last start date")
.usingRecursiveFieldByFieldElementComparatorOnFields("id")
.containsExactly(
runningExecution
);
}
}

View File

@@ -0,0 +1,13 @@
ALTER TABLE executions
ALTER
COLUMN "state_duration" FLOAT NOT NULL GENERATED ALWAYS AS (
CASE
WHEN JQ_STRING("value", '.state.endDate') IS NULL -- in Execution.java end_date is empty if it is not terminated or paused
THEN DATEDIFF('MILLISECOND', PARSEDATETIME(LEFT(JQ_STRING("value", '.state.startDate'), 23) || '+00:00',
'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'), CURRENT_TIMESTAMP)
ELSE DATEDIFF('MILLISECOND', PARSEDATETIME(LEFT(JQ_STRING("value", '.state.startDate'), 23) || '+00:00',
'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'),
PARSEDATETIME(LEFT(JQ_STRING("value", '.state.endDate'), 23) || '+00:00',
'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'))
END
);

View File

@@ -0,0 +1,12 @@
ALTER TABLE executions
MODIFY COLUMN `state_duration`
BIGINT GENERATED ALWAYS AS (
TIMESTAMPDIFF(
MICROSECOND,
CAST(JSON_UNQUOTE(JSON_EXTRACT(value, '$.state.startDate')) AS DATETIME(6)),
COALESCE(
CAST(JSON_UNQUOTE(JSON_EXTRACT(value, '$.state.endDate')) AS DATETIME(6)),
CURRENT_TIMESTAMP(6)
)
) / 1000
) STORED NOT NULL;

View File

@@ -0,0 +1,11 @@
ALTER TABLE executions
ALTER COLUMN "state_duration" TYPE BIGINT
GENERATED ALWAYS AS (
EXTRACT(EPOCH FROM (
COALESCE(
PARSE_ISO8601_DATETIME(value #>> '{state,endDate}'),
CURRENT_TIMESTAMP
)
- PARSE_ISO8601_DATETIME(value #>> '{state,startDate}')
)) * 1000
) STORED;