feat(system): set taskrun attempt to resubmitted when a taskrun is resubmitted to a worker

Closes https://github.com/kestra-io/kestra/issues/12481
This commit is contained in:
Loïc Mathieu
2025-10-30 10:31:19 +01:00
parent 7c89eec500
commit c06ffb3063
8 changed files with 54 additions and 11 deletions

View File

@@ -195,17 +195,17 @@ public class TaskRun implements TenantInterface {
taskRunBuilder.attempts = new ArrayList<>();
taskRunBuilder.attempts.add(TaskRunAttempt.builder()
.state(new State(this.state, State.Type.KILLED))
.state(new State(this.state, State.Type.RESUBMITTED))
.build()
);
} else {
ArrayList<TaskRunAttempt> taskRunAttempts = new ArrayList<>(taskRunBuilder.attempts);
TaskRunAttempt lastAttempt = taskRunAttempts.get(taskRunBuilder.attempts.size() - 1);
if (!lastAttempt.getState().isTerminated()) {
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.KILLED));
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.RESUBMITTED));
} else {
taskRunAttempts.add(TaskRunAttempt.builder()
.state(new State().withState(State.Type.KILLED))
.state(new State().withState(State.Type.RESUBMITTED))
.build()
);
}

View File

@@ -236,14 +236,15 @@ public class State {
RETRYING,
RETRIED,
SKIPPED,
BREAKPOINT;
BREAKPOINT,
RESUBMITTED;
public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED;
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
}
public boolean isTerminatedNoFail() {
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED;
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
}
public boolean isCreated() {

View File

@@ -17,7 +17,7 @@ class TaskRunTest {
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
assertThat(taskRun.getAttempts().getFirst().getState().getHistories().getFirst()).isEqualTo(taskRun.getState().getHistories().getFirst());
assertThat(taskRun.getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(taskRun.getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.RESUBMITTED);
}
@Test
@@ -33,7 +33,7 @@ class TaskRunTest {
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
assertThat(taskRun.getAttempts().getFirst().getState().getHistories().getFirst()).isNotEqualTo(taskRun.getState().getHistories().getFirst());
assertThat(taskRun.getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(taskRun.getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.RESUBMITTED);
}
@Test
@@ -49,7 +49,7 @@ class TaskRunTest {
assertThat(taskRun.getAttempts().size()).isEqualTo(2);
assertThat(taskRun.getAttempts().get(1).getState().getHistories().getFirst()).isNotEqualTo(taskRun.getState().getHistories().getFirst());
assertThat(taskRun.getAttempts().get(1).getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(taskRun.getAttempts().get(1).getState().getCurrent()).isEqualTo(State.Type.RESUBMITTED);
}
}

View File

@@ -0,0 +1,19 @@
ALTER TABLE executions ALTER COLUMN "state_current" ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED',
'SKIPPED',
'BREAKPOINT',
'SUBMITTED',
'RESUBMITTED'
) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.state.current'));

View File

@@ -0,0 +1,19 @@
ALTER TABLE executions MODIFY COLUMN `state_current` ENUM (
'CREATED',
'RUNNING',
'PAUSED',
'RESTARTED',
'KILLING',
'SUCCESS',
'WARNING',
'FAILED',
'KILLED',
'CANCELLED',
'QUEUED',
'RETRYING',
'RETRIED',
'SKIPPED',
'BREAKPOINT',
'SUBMITTED',
'RESUBMITTED'
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TYPE state_type ADD VALUE IF NOT EXISTS 'RESUBMITTED';

View File

@@ -494,13 +494,13 @@ public class JdbcExecutor implements ExecutorInterface {
logService.logTaskRun(
workerTaskRunning.getTaskRun(),
Level.WARN,
"Re-emitting WorkerTask."
"Re-resubmitting WorkerTask."
);
} catch (QueueException e) {
logService.logTaskRun(
workerTaskRunning.getTaskRun(),
Level.ERROR,
"Unable to re-emit WorkerTask.",
"Unable to re-resubmit WorkerTask.",
e
);
}

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.State.Type;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.WorkerGroup;
@@ -130,6 +131,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
assertThat(workerTaskResult).isNotNull();
assertThat(workerTaskResult.getTaskRun().getState().getCurrent()).isEqualTo(Type.SUCCESS);
assertThat(workerTaskResult.getTaskRun().getAttempts()).hasSize(2);
assertThat(workerTaskResult.getTaskRun().getAttempts().getFirst().getState().getHistories().stream().anyMatch(it -> it.getState() == Type.RESUBMITTED)).isTrue();
newWorker.close();
}
@@ -170,6 +172,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
assertThat(workerTaskResult).isNotNull();
assertThat(workerTaskResult.getTaskRun().getState().getCurrent()).isEqualTo(Type.SUCCESS);
assertThat(workerTaskResult.getTaskRun().getAttempts()).hasSize(2);
assertThat(workerTaskResult.getTaskRun().getAttempts().getFirst().getState().getHistories().stream().anyMatch(it -> it.getState() == Type.RESUBMITTED)).isTrue();
newWorker.close();
}