fix(system): WorkerTask should not FAILED when interrupting so they would be resubmitted

When a Worker is stopping, it will first wait for all running tasks to stop, then kill them. For those that didn't implement kill their thread would be interrupted.

But if the task is properly killed, or support interrupts (like the Sleep task), it would ends in FAILED then a WorkerTaskWould be send that would fail the flow preventing the WorkerTask to be resubmitted.

We nows check if the worker is terminating and should resubmit, in this case we didn't emit any WorkerTaskResult

Fixes #13108
Part-of: https://github.com/kestra-io/kestra-ee/issues/5556
This commit is contained in:
Loïc Mathieu
2025-11-21 15:44:14 +01:00
parent eb51c5be37
commit 9d6694f807
2 changed files with 13 additions and 1 deletions

View File

@@ -115,6 +115,10 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.RUNNING) {
runningLatch.countDown();
}
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.FAILED) {
fail("Worker task result should not be in FAILED state as it should be resubmitted");
}
});
workerJobQueue.emit(workerTask(Duration.ofSeconds(5)));

View File

@@ -736,6 +736,14 @@ public class DefaultWorker implements Worker {
}
io.kestra.core.models.flows.State.Type state = lastAttempt.getState().getCurrent();
if (shutdown.get() && serverConfig.workerTaskRestartStrategy() != WorkerTaskRestartStrategy.NEVER && state.isFailed()) {
// if the Worker is terminating and the task is not in success, it may have been terminated by the worker
// in this case; we return immediately without emitting any result as it would be resubmitted (except if WorkerTaskRestartStrategy is NEVER)
List<WorkerTaskResult> dynamicWorkerResults = workerTask.getRunContext().dynamicWorkerResults();
List<TaskRun> dynamicTaskRuns = dynamicWorkerResults(dynamicWorkerResults);
return new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
}
if (workerTask.getTask().getRetry() != null &&
workerTask.getTask().getRetry().getWarningOnRetry() &&
workerTask.getTaskRun().attemptNumber() > 1 &&
@@ -971,7 +979,7 @@ public class DefaultWorker implements Worker {
Attributes.of(TraceUtils.ATTR_UID, workerJobCallable.getUid()),
() -> workerSecurityService.callInSecurityContext(workerJobCallable)
);
} catch(Exception e) {
} catch (Exception e) {
// should only occur if it fails in the tracing code which should be unexpected
// we add the exception to have some log in that case
workerJobCallable.exception = e;