fix: Make sure parentTaskRun attempts are also set to Killed (#13736)

* fix: Make sure parentTaskRun attempts are also set to Killed

* test: added a test to check the correct behavior
This commit is contained in:
YannC
2025-12-18 11:08:44 +01:00
committed by GitHub
parent 7ea95f393e
commit 1a8a47c8cd
4 changed files with 39 additions and 6 deletions

View File

@@ -3,9 +3,7 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry; import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden; import io.swagger.v3.oas.annotations.Hidden;
@@ -95,8 +93,16 @@ public class TaskRun implements TenantInterface {
this.forceExecution this.forceExecution
); );
} }
public TaskRun withStateAndAttempt(State.Type state) {
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
if (newAttempts.isEmpty()) {
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
} else {
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
newAttempts.set(newAttempts.size() - 1, updatedLast);
}
public TaskRun replaceState(State newState) {
return new TaskRun( return new TaskRun(
this.tenantId, this.tenantId,
this.id, this.id,
@@ -106,9 +112,9 @@ public class TaskRun implements TenantInterface {
this.taskId, this.taskId,
this.parentTaskRunId, this.parentTaskRunId,
this.value, this.value,
this.attempts, newAttempts,
this.outputs, this.outputs,
newState, this.state.withState(state),
this.iteration, this.iteration,
this.dynamic, this.dynamic,
this.forceExecution this.forceExecution

View File

@@ -754,7 +754,7 @@ public class ExecutionService {
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId()); var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
Execution newExecution = execution; Execution newExecution = execution;
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) { if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED)); newExecution = newExecution.withTaskRun(parentTaskRun.withStateAndAttempt(State.Type.KILLED));
} }
if (parentTaskRun.getParentTaskRunId() != null) { if (parentTaskRun.getParentTaskRunId() != null) {
return killParentTaskruns(parentTaskRun, newExecution); return killParentTaskruns(parentTaskRun, newExecution);

View File

@@ -6,6 +6,7 @@ import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.Label; import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow; import io.kestra.core.models.flows.GenericFlow;
@@ -466,4 +467,20 @@ class ExecutionServiceTest {
assertThat(restart.getTaskRunList()).hasSize(2); assertThat(restart.getTaskRunList()).hasSize(2);
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS); assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
} }
@Test
@LoadFlows({"flows/valids/each-pause.yaml"})
void killExecutionWithFlowableTask() throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "each-pause");
TaskRun childTaskRun = execution.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("pause")).toList().getFirst();
Execution killed = executionService.killParentTaskruns(childTaskRun,execution);
TaskRun parentTaskRun = killed.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("each_task")).toList().getFirst();
assertThat(parentTaskRun.getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(parentTaskRun.getAttempts().getLast().getState().getCurrent()).isEqualTo(State.Type.KILLED);
}
} }

View File

@@ -0,0 +1,10 @@
id: each-pause
namespace: io.kestra.tests
tasks:
- id: each_task
type: io.kestra.plugin.core.flow.ForEach
values: '["a", "b"]'
tasks:
- id: pause
type: io.kestra.plugin.core.flow.Pause