fix(executions): set tasks to submitted after sending to the Worker

When computing the next tasks to run, all task runs are created in the CREATED state.
Then when computed tasks to send to the worker, CREATED task runs are listed and converted into worker task.
The issue is that on the next execution message, if tasks sent to the worker are still in CREATED (for ex because the Worker didn't start them yet), they would still be evaluted as to send to the worker.
Setting them to a new SUBMITTED state would prevent them to be taken into account again until they are really terminated.

This should avoid the deduplicateWorkerTask state but this is kept for now with a warning and would be removed later if it proves to work in all cases.
This commit is contained in:
Loïc Mathieu
2025-10-06 16:12:31 +02:00
parent 7e410e7b18
commit 948a5beffa
8 changed files with 64 additions and 6 deletions

View File

@@ -584,6 +584,13 @@ public class Execution implements DeletedInterface, TenantInterface {
);
}
public Optional<TaskRun> findLastSubmitted(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()
.filter(t -> t.getState().getCurrent() == State.Type.SUBMITTED)
);
}
public Optional<TaskRun> findLastRunning(List<TaskRun> taskRuns) {
return Streams.findLast(taskRuns
.stream()

View File

@@ -222,6 +222,7 @@ public class State {
@Introspected
public enum Type {
CREATED,
SUBMITTED,
RUNNING,
PAUSED,
RESTARTED,

View File

@@ -84,6 +84,12 @@ public class FlowableUtils {
return Collections.emptyList();
}
// have submitted, leave
Optional<TaskRun> lastSubmitted = execution.findLastSubmitted(taskRuns);
if (lastSubmitted.isPresent()) {
return Collections.emptyList();
}
// have running, leave
Optional<TaskRun> lastRunning = execution.findLastRunning(taskRuns);
if (lastRunning.isPresent()) {

View File

@@ -73,7 +73,7 @@ class ExecutionServiceTest {
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList()).hasSize(3);
assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5);
assertThat(restart.getId()).isEqualTo(execution.getId());
assertThat(restart.getTaskRunList().get(2).getId()).isEqualTo(execution.getTaskRunList().get(2).getId());
assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true"));
@@ -106,7 +106,7 @@ class ExecutionServiceTest {
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList()).hasSize(3);
assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5);
assertThat(restart.getId()).isNotEqualTo(execution.getId());
assertThat(restart.getTaskRunList().get(2).getId()).isNotEqualTo(execution.getTaskRunList().get(2).getId());
assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true"));
@@ -194,7 +194,7 @@ class ExecutionServiceTest {
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList()).hasSize(2);
assertThat(restart.getTaskRunList().get(1).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restart.getTaskRunList().get(1).getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList().get(1).getState().getHistories()).hasSize(5);
assertThat(restart.getId()).isNotEqualTo(execution.getId());
assertThat(restart.getTaskRunList().get(1).getId()).isNotEqualTo(execution.getTaskRunList().get(1).getId());
assertThat(restart.getLabels()).contains(new Label(Label.REPLAY, "true"));
@@ -290,7 +290,7 @@ class ExecutionServiceTest {
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList()).hasSize(3);
assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5);
assertThat(restart.getId()).isNotEqualTo(execution.getId());
assertThat(restart.getTaskRunList().get(1).getId()).isNotEqualTo(execution.getTaskRunList().get(1).getId());
@@ -345,7 +345,7 @@ class ExecutionServiceTest {
assertThat(restart.findTaskRunByTaskIdAndValue("1_each", List.of()).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getHistories()).hasSize(4);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getHistories()).hasSize(5);
assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
}

View File

@@ -0,0 +1,18 @@
ALTER TABLE executions ALTER COLUMN "state_current" ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED',
'SKIPPED',
'BREAKPOINT',
'SUBMITTED'
) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.state.current'));

View File

@@ -0,0 +1,17 @@
ALTER TABLE executions MODIFY COLUMN `state_current` ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED',
'SKIPPED',
'SUBMITTED'
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TYPE state_type ADD VALUE IF NOT EXISTS 'SUBMITTED';

View File

@@ -37,6 +37,7 @@ import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.plugin.core.flow.ForEachItem;
import io.kestra.plugin.core.flow.Template;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
@@ -648,7 +649,14 @@ public class JdbcExecutor implements ExecutorInterface {
Optional<WorkerGroup> maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask);
String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey())))
.orElse(null);
workerJobQueue.emit(workerGroupKey, workerTask);
if (workerTask.getTask() instanceof WorkingDirectory) {
// WorkingDirectory is a flowable so it will be moved to RUNNING a few lines under
workerJobQueue.emit(workerGroupKey, workerTask);
} else {
TaskRun taskRun = workerTask.getTaskRun().withState(State.Type.SUBMITTED);
workerJobQueue.emit(workerGroupKey, workerTask.withTaskRun(taskRun));
workerTaskResults.add(new WorkerTaskResult(taskRun));
}
}
if (workerTask.getTask().isFlowable()) {
workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));