fix(executions): don't remove worker task result for killed execution

As killing an executuion is asynchronous, it is inherently racy.
So when we kill an execution it move to the rerminal state which then will remove any worker task result to purge eagerly the queue table.

But if such worker task result arrives late and was not already processed by the executor, it will be purged before ever been able to be processed so the task would nevert be updated as KILLED.

Note: this may un-flaky some falky unit tests

Fixes https://github.com/kestra-io/kestra-ee/issues/6040
This commit is contained in:
Loïc Mathieu
2025-12-10 16:21:05 +01:00
parent 925b8c6954
commit 56febfb415
2 changed files with 3 additions and 4 deletions

View File

@@ -1246,7 +1246,8 @@ public class JdbcExecutor implements ExecutorInterface {
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
// If any of these assumptions changed, this code would not be safe anymore.
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
// One notable exception is for killed flow as the KILLED worker task result may arrive late so removing them is a racy as we may remove them before they are processed
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList()) && !execution.getState().getCurrent().isKilled()) {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();

View File

@@ -1293,7 +1293,6 @@ class ExecutionControllerRunnerTest {
assertThat(executions.getTotal()).isEqualTo(4L);
}
@FlakyTest
@Test
@LoadFlows({"flows/valids/pause-test.yaml"})
void killExecutionPaused() throws TimeoutException, QueueException {
@@ -1311,8 +1310,7 @@ class ExecutionControllerRunnerTest {
assertThat(killedExecution.getTaskRunList()).hasSize(1);
}
// This test is flaky on CI as the flow may be already SUCCESS when we kill it if CI is super slow
@RetryingTest(5)
@Test
@LoadFlows({"flows/valids/sleep-long.yml"})
void killExecution() throws TimeoutException, InterruptedException, QueueException {
// listen to the execution queue