From 948a5beffac9008845c97bf80b8b3a99b759417c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Mon, 6 Oct 2025 16:12:31 +0200 Subject: [PATCH] fix(executions): set tasks to submitted after sending to the Worker When computing the next tasks to run, all task runs are created in the CREATED state. Then when computed tasks to send to the worker, CREATED task runs are listed and converted into worker task. The issue is that on the next execution message, if tasks sent to the worker are still in CREATED (for ex because the Worker didn't start them yet), they would still be evaluted as to send to the worker. Setting them to a new SUBMITTED state would prevent them to be taken into account again until they are really terminated. This should avoid the deduplicateWorkerTask state but this is kept for now with a warning and would be removed later if it proves to work in all cases. --- .../core/models/executions/Execution.java | 7 +++++++ .../io/kestra/core/models/flows/State.java | 1 + .../io/kestra/core/runners/FlowableUtils.java | 6 ++++++ .../core/runners/ExecutionServiceTest.java | 10 +++++----- .../migrations/h2/V1_45__taskrun_submitted.sql | 18 ++++++++++++++++++ .../mysql/V1_45__taskrun_submitted.sql | 17 +++++++++++++++++ .../postgres/V1_45__taskrun_submitted.sql | 1 + .../io/kestra/jdbc/runner/JdbcExecutor.java | 10 +++++++++- 8 files changed, 64 insertions(+), 6 deletions(-) create mode 100644 jdbc-h2/src/main/resources/migrations/h2/V1_45__taskrun_submitted.sql create mode 100644 jdbc-mysql/src/main/resources/migrations/mysql/V1_45__taskrun_submitted.sql create mode 100644 jdbc-postgres/src/main/resources/migrations/postgres/V1_45__taskrun_submitted.sql diff --git a/core/src/main/java/io/kestra/core/models/executions/Execution.java b/core/src/main/java/io/kestra/core/models/executions/Execution.java index 9b6c5c8627..36db014427 100644 --- a/core/src/main/java/io/kestra/core/models/executions/Execution.java +++ b/core/src/main/java/io/kestra/core/models/executions/Execution.java @@ -584,6 +584,13 @@ public class Execution implements DeletedInterface, TenantInterface { ); } + public Optional findLastSubmitted(List taskRuns) { + return Streams.findLast(taskRuns + .stream() + .filter(t -> t.getState().getCurrent() == State.Type.SUBMITTED) + ); + } + public Optional findLastRunning(List taskRuns) { return Streams.findLast(taskRuns .stream() diff --git a/core/src/main/java/io/kestra/core/models/flows/State.java b/core/src/main/java/io/kestra/core/models/flows/State.java index 38dac9b983..4299c444ff 100644 --- a/core/src/main/java/io/kestra/core/models/flows/State.java +++ b/core/src/main/java/io/kestra/core/models/flows/State.java @@ -222,6 +222,7 @@ public class State { @Introspected public enum Type { CREATED, + SUBMITTED, RUNNING, PAUSED, RESTARTED, diff --git a/core/src/main/java/io/kestra/core/runners/FlowableUtils.java b/core/src/main/java/io/kestra/core/runners/FlowableUtils.java index 3388249a9f..581934091a 100644 --- a/core/src/main/java/io/kestra/core/runners/FlowableUtils.java +++ b/core/src/main/java/io/kestra/core/runners/FlowableUtils.java @@ -84,6 +84,12 @@ public class FlowableUtils { return Collections.emptyList(); } + // have submitted, leave + Optional lastSubmitted = execution.findLastSubmitted(taskRuns); + if (lastSubmitted.isPresent()) { + return Collections.emptyList(); + } + // have running, leave Optional lastRunning = execution.findLastRunning(taskRuns); if (lastRunning.isPresent()) { diff --git a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java index 60af4f0ce8..1c9be2c3b4 100644 --- a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java +++ b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java @@ -73,7 +73,7 @@ class ExecutionServiceTest { assertThat(restart.getState().getHistories()).hasSize(4); assertThat(restart.getTaskRunList()).hasSize(3); assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED); - assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4); + assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5); assertThat(restart.getId()).isEqualTo(execution.getId()); assertThat(restart.getTaskRunList().get(2).getId()).isEqualTo(execution.getTaskRunList().get(2).getId()); assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true")); @@ -106,7 +106,7 @@ class ExecutionServiceTest { assertThat(restart.getState().getHistories()).hasSize(4); assertThat(restart.getTaskRunList()).hasSize(3); assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED); - assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4); + assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5); assertThat(restart.getId()).isNotEqualTo(execution.getId()); assertThat(restart.getTaskRunList().get(2).getId()).isNotEqualTo(execution.getTaskRunList().get(2).getId()); assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true")); @@ -194,7 +194,7 @@ class ExecutionServiceTest { assertThat(restart.getState().getHistories()).hasSize(4); assertThat(restart.getTaskRunList()).hasSize(2); assertThat(restart.getTaskRunList().get(1).getState().getCurrent()).isEqualTo(State.Type.RESTARTED); - assertThat(restart.getTaskRunList().get(1).getState().getHistories()).hasSize(4); + assertThat(restart.getTaskRunList().get(1).getState().getHistories()).hasSize(5); assertThat(restart.getId()).isNotEqualTo(execution.getId()); assertThat(restart.getTaskRunList().get(1).getId()).isNotEqualTo(execution.getTaskRunList().get(1).getId()); assertThat(restart.getLabels()).contains(new Label(Label.REPLAY, "true")); @@ -290,7 +290,7 @@ class ExecutionServiceTest { assertThat(restart.getState().getHistories()).hasSize(4); assertThat(restart.getTaskRunList()).hasSize(3); assertThat(restart.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED); - assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(4); + assertThat(restart.getTaskRunList().get(2).getState().getHistories()).hasSize(5); assertThat(restart.getId()).isNotEqualTo(execution.getId()); assertThat(restart.getTaskRunList().get(1).getId()).isNotEqualTo(execution.getTaskRunList().get(1).getId()); @@ -345,7 +345,7 @@ class ExecutionServiceTest { assertThat(restart.findTaskRunByTaskIdAndValue("1_each", List.of()).getState().getCurrent()).isEqualTo(State.Type.RUNNING); assertThat(restart.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.RUNNING); assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getCurrent()).isEqualTo(State.Type.FAILED); - assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getHistories()).hasSize(4); + assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getState().getHistories()).hasSize(5); assertThat(restart.findTaskRunByTaskIdAndValue("2-1-2_t2", List.of("value 1")).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED); } diff --git a/jdbc-h2/src/main/resources/migrations/h2/V1_45__taskrun_submitted.sql b/jdbc-h2/src/main/resources/migrations/h2/V1_45__taskrun_submitted.sql new file mode 100644 index 0000000000..bc2b51e356 --- /dev/null +++ b/jdbc-h2/src/main/resources/migrations/h2/V1_45__taskrun_submitted.sql @@ -0,0 +1,18 @@ +ALTER TABLE executions ALTER COLUMN "state_current" ENUM ( + 'CREATED', + 'RUNNING', + 'PAUSED', + 'RESTARTED', + 'KILLING', + 'SUCCESS', + 'WARNING', + 'FAILED', + 'KILLED', + 'CANCELLED', + 'QUEUED', + 'RETRYING', + 'RETRIED', + 'SKIPPED', + 'BREAKPOINT', + 'SUBMITTED' +) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.state.current')); \ No newline at end of file diff --git a/jdbc-mysql/src/main/resources/migrations/mysql/V1_45__taskrun_submitted.sql b/jdbc-mysql/src/main/resources/migrations/mysql/V1_45__taskrun_submitted.sql new file mode 100644 index 0000000000..58a541274a --- /dev/null +++ b/jdbc-mysql/src/main/resources/migrations/mysql/V1_45__taskrun_submitted.sql @@ -0,0 +1,17 @@ +ALTER TABLE executions MODIFY COLUMN `state_current` ENUM ( + 'CREATED', + 'RUNNING', + 'PAUSED', + 'RESTARTED', + 'KILLING', + 'SUCCESS', + 'WARNING', + 'FAILED', + 'KILLED', + 'CANCELLED', + 'QUEUED', + 'RETRYING', + 'RETRIED', + 'SKIPPED', + 'SUBMITTED' + ) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL; \ No newline at end of file diff --git a/jdbc-postgres/src/main/resources/migrations/postgres/V1_45__taskrun_submitted.sql b/jdbc-postgres/src/main/resources/migrations/postgres/V1_45__taskrun_submitted.sql new file mode 100644 index 0000000000..e03845dab8 --- /dev/null +++ b/jdbc-postgres/src/main/resources/migrations/postgres/V1_45__taskrun_submitted.sql @@ -0,0 +1 @@ +ALTER TYPE state_type ADD VALUE IF NOT EXISTS 'SUBMITTED'; \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 47d4e1c164..70f0ef3b2e 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -37,6 +37,7 @@ import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository; import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; import io.kestra.plugin.core.flow.ForEachItem; import io.kestra.plugin.core.flow.Template; +import io.kestra.plugin.core.flow.WorkingDirectory; import io.micronaut.context.annotation.Value; import io.micronaut.context.event.ApplicationEventPublisher; import io.micronaut.transaction.exceptions.CannotCreateTransactionException; @@ -648,7 +649,14 @@ public class JdbcExecutor implements ExecutorInterface { Optional maybeWorkerGroup = workerGroupService.resolveGroupFromJob(flow, workerTask); String workerGroupKey = maybeWorkerGroup.map(throwFunction(workerGroup -> workerTask.getRunContext().render(workerGroup.getKey()))) .orElse(null); - workerJobQueue.emit(workerGroupKey, workerTask); + if (workerTask.getTask() instanceof WorkingDirectory) { + // WorkingDirectory is a flowable so it will be moved to RUNNING a few lines under + workerJobQueue.emit(workerGroupKey, workerTask); + } else { + TaskRun taskRun = workerTask.getTaskRun().withState(State.Type.SUBMITTED); + workerJobQueue.emit(workerGroupKey, workerTask.withTaskRun(taskRun)); + workerTaskResults.add(new WorkerTaskResult(taskRun)); + } } if (workerTask.getTask().isFlowable()) { workerTaskResults.add(new WorkerTaskResult(workerTask.getTaskRun().withState(State.Type.RUNNING)));