From 1a8a47c8cdbc2d629a9d248b6aa08bd3fcb5edfb Mon Sep 17 00:00:00 2001 From: YannC <37600690+Skraye@users.noreply.github.com> Date: Thu, 18 Dec 2025 11:08:44 +0100 Subject: [PATCH] fix: Make sure parentTaskRun attempts are also set to Killed (#13736) * fix: Make sure parentTaskRun attempts are also set to Killed * test: added a test to check the correct behavior --- .../kestra/core/models/executions/TaskRun.java | 16 +++++++++++----- .../kestra/core/services/ExecutionService.java | 2 +- .../core/runners/ExecutionServiceTest.java | 17 +++++++++++++++++ .../test/resources/flows/valids/each-pause.yaml | 10 ++++++++++ 4 files changed, 39 insertions(+), 6 deletions(-) create mode 100644 core/src/test/resources/flows/valids/each-pause.yaml diff --git a/core/src/main/java/io/kestra/core/models/executions/TaskRun.java b/core/src/main/java/io/kestra/core/models/executions/TaskRun.java index 07b9663a0f..962de5faa0 100644 --- a/core/src/main/java/io/kestra/core/models/executions/TaskRun.java +++ b/core/src/main/java/io/kestra/core/models/executions/TaskRun.java @@ -3,9 +3,7 @@ package io.kestra.core.models.executions; import com.fasterxml.jackson.annotation.JsonInclude; import io.kestra.core.models.TenantInterface; import io.kestra.core.models.flows.State; -import io.kestra.core.models.tasks.FlowableTask; import io.kestra.core.models.tasks.ResolvedTask; -import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.retrys.AbstractRetry; import io.kestra.core.utils.IdUtils; import io.swagger.v3.oas.annotations.Hidden; @@ -95,8 +93,16 @@ public class TaskRun implements TenantInterface { this.forceExecution ); } + public TaskRun withStateAndAttempt(State.Type state) { + List newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of()); + + if (newAttempts.isEmpty()) { + newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build()); + } else { + TaskRunAttempt updatedLast = newAttempts.getLast().withState(state); + newAttempts.set(newAttempts.size() - 1, updatedLast); + } - public TaskRun replaceState(State newState) { return new TaskRun( this.tenantId, this.id, @@ -106,9 +112,9 @@ public class TaskRun implements TenantInterface { this.taskId, this.parentTaskRunId, this.value, - this.attempts, + newAttempts, this.outputs, - newState, + this.state.withState(state), this.iteration, this.dynamic, this.forceExecution diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index dd15393d3c..7c80a11603 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -754,7 +754,7 @@ public class ExecutionService { var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId()); Execution newExecution = execution; if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) { - newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED)); + newExecution = newExecution.withTaskRun(parentTaskRun.withStateAndAttempt(State.Type.KILLED)); } if (parentTaskRun.getParentTaskRunId() != null) { return killParentTaskruns(parentTaskRun, newExecution); diff --git a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java index 7c59469633..2afc2dd6fd 100644 --- a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java +++ b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java @@ -6,6 +6,7 @@ import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.models.Label; import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.flows.GenericFlow; @@ -466,4 +467,20 @@ class ExecutionServiceTest { assertThat(restart.getTaskRunList()).hasSize(2); assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS); } + + @Test + @LoadFlows({"flows/valids/each-pause.yaml"}) + void killExecutionWithFlowableTask() throws Exception { + Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "each-pause"); + + TaskRun childTaskRun = execution.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("pause")).toList().getFirst(); + + Execution killed = executionService.killParentTaskruns(childTaskRun,execution); + + TaskRun parentTaskRun = killed.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("each_task")).toList().getFirst(); + + assertThat(parentTaskRun.getState().getCurrent()).isEqualTo(State.Type.KILLED); + assertThat(parentTaskRun.getAttempts().getLast().getState().getCurrent()).isEqualTo(State.Type.KILLED); + + } } \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/each-pause.yaml b/core/src/test/resources/flows/valids/each-pause.yaml new file mode 100644 index 0000000000..52b90a4dcc --- /dev/null +++ b/core/src/test/resources/flows/valids/each-pause.yaml @@ -0,0 +1,10 @@ +id: each-pause +namespace: io.kestra.tests + +tasks: + - id: each_task + type: io.kestra.plugin.core.flow.ForEach + values: '["a", "b"]' + tasks: + - id: pause + type: io.kestra.plugin.core.flow.Pause \ No newline at end of file