From e2789c7a4ad847b7a4cfa272e4dd43a0848dfff5 Mon Sep 17 00:00:00 2001 From: char zheng Date: Tue, 2 Dec 2025 13:46:29 +0800 Subject: [PATCH] fix(executions): concurrency limit exceeded for KILLED execution Fixes #13211 --- .../io/kestra/core/models/flows/State.java | 4 + .../core/runners/AbstractRunnerTest.java | 12 +++ .../core/runners/FlowConcurrencyCaseTest.java | 90 +++++++++++++++++++ .../valids/flow-concurrency-queue-killed.yml | 11 +++ .../io/kestra/jdbc/runner/JdbcExecutor.java | 11 +-- 5 files changed, 123 insertions(+), 5 deletions(-) create mode 100644 core/src/test/resources/flows/valids/flow-concurrency-queue-killed.yml diff --git a/core/src/main/java/io/kestra/core/models/flows/State.java b/core/src/main/java/io/kestra/core/models/flows/State.java index a1a55e5966..955cf38308 100644 --- a/core/src/main/java/io/kestra/core/models/flows/State.java +++ b/core/src/main/java/io/kestra/core/models/flows/State.java @@ -255,6 +255,10 @@ public class State { return this == Type.RUNNING || this == Type.KILLING; } + public boolean onlyRunning() { + return this == Type.RUNNING; + } + public boolean isFailed() { return this == Type.FAILED; } diff --git a/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java b/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java index 54600b21f1..a795beb235 100644 --- a/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java +++ b/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java @@ -482,6 +482,18 @@ public abstract class AbstractRunnerTest { flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill(); } + @Test + @LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"}) + void flowConcurrencyKilled() throws Exception { + flowConcurrencyCaseTest.flowConcurrencyKilled(); + } + + @Test + @LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"}) + void flowConcurrencyQueueKilled() throws Exception { + flowConcurrencyCaseTest.flowConcurrencyQueueKilled(); + } + @Test @ExecuteFlow("flows/valids/executable-fail.yml") void badExecutable(Execution execution) { diff --git a/core/src/test/java/io/kestra/core/runners/FlowConcurrencyCaseTest.java b/core/src/test/java/io/kestra/core/runners/FlowConcurrencyCaseTest.java index 3cdf91160e..6c30c42484 100644 --- a/core/src/test/java/io/kestra/core/runners/FlowConcurrencyCaseTest.java +++ b/core/src/test/java/io/kestra/core/runners/FlowConcurrencyCaseTest.java @@ -70,6 +70,7 @@ public class FlowConcurrencyCaseTest { assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(Type.CANCELLED::equals); } finally { runnerUtils.killExecution(execution1); + runnerUtils.awaitExecution(e -> e.getState().isTerminated(), execution1); } } @@ -85,6 +86,7 @@ public class FlowConcurrencyCaseTest { assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(State.Type.FAILED::equals); } finally { runnerUtils.killExecution(execution1); + runnerUtils.awaitExecution(e -> e.getState().isTerminated(), execution1); } } @@ -240,6 +242,94 @@ public class FlowConcurrencyCaseTest { assertThat(terminated.getTaskRunList()).isNull(); } + public void flowConcurrencyKilled() throws QueueException, InterruptedException { + Flow flow = flowRepository + .findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty()) + .orElseThrow(); + Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30)); + Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty())); + Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty())); + + try { + assertThat(execution1.getState().isRunning()).isTrue(); + assertThat(execution2.getState().getCurrent()).isEqualTo(Type.QUEUED); + assertThat(execution3.getState().getCurrent()).isEqualTo(Type.QUEUED); + + // we kill execution 1, execution 2 should run but not execution 3 + killQueue.emit(ExecutionKilledExecution + .builder() + .state(ExecutionKilled.State.REQUESTED) + .executionId(execution1.getId()) + .isOnKillCascade(true) + .tenantId(MAIN_TENANT) + .build() + ); + + Execution killed = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.KILLED), execution1); + assertThat(killed.getState().getCurrent()).isEqualTo(Type.KILLED); + assertThat(killed.getState().getHistories().stream().anyMatch(h -> h.getState() == Type.RUNNING)).isTrue(); + + // we now check that execution 2 is running + Execution running = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2); + assertThat(running.getState().getCurrent()).isEqualTo(Type.RUNNING); + + // we check that execution 3 is still queued + Thread.sleep(100); // wait a little to be 100% sure + Execution queued = runnerUtils.awaitExecution(e -> e.getState().isQueued(), execution3); + assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED); + } finally { + // kill everything to avoid dangling executions + runnerUtils.killExecution(execution1); + runnerUtils.killExecution(execution2); + runnerUtils.killExecution(execution3); + + // await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do + runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed"); + } + } + + public void flowConcurrencyQueueKilled() throws QueueException, InterruptedException { + Flow flow = flowRepository + .findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty()) + .orElseThrow(); + Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30)); + Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty())); + Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty())); + + try { + assertThat(execution1.getState().isRunning()).isTrue(); + assertThat(execution2.getState().getCurrent()).isEqualTo(Type.QUEUED); + assertThat(execution3.getState().getCurrent()).isEqualTo(Type.QUEUED); + + // we kill execution 2, execution 3 should not run + killQueue.emit(ExecutionKilledExecution + .builder() + .state(ExecutionKilled.State.REQUESTED) + .executionId(execution2.getId()) + .isOnKillCascade(true) + .tenantId(MAIN_TENANT) + .build() + ); + + Execution killed = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.KILLED), execution2); + assertThat(killed.getState().getCurrent()).isEqualTo(Type.KILLED); + assertThat(killed.getState().getHistories().stream().noneMatch(h -> h.getState() == Type.RUNNING)).isTrue(); + + // we now check that execution 3 is still queued + Thread.sleep(100); // wait a little to be 100% sure + Execution queued = runnerUtils.awaitExecution(e -> e.getState().isQueued(), execution3); + assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED); + } finally { + // kill everything to avoid dangling executions + runnerUtils.killExecution(execution1); + runnerUtils.killExecution(execution2); + runnerUtils.killExecution(execution3); + + // await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do + runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed"); + } + } + private URI storageUpload(String tenantId) throws URISyntaxException, IOException { File tempFile = File.createTempFile("file", ".txt"); diff --git a/core/src/test/resources/flows/valids/flow-concurrency-queue-killed.yml b/core/src/test/resources/flows/valids/flow-concurrency-queue-killed.yml new file mode 100644 index 0000000000..05a3356487 --- /dev/null +++ b/core/src/test/resources/flows/valids/flow-concurrency-queue-killed.yml @@ -0,0 +1,11 @@ +id: flow-concurrency-queue-killed +namespace: io.kestra.tests + +concurrency: + behavior: QUEUE + limit: 1 + +tasks: + - id: sleep + type: io.kestra.plugin.core.flow.Sleep + duration: PT1M \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index f9ddc22333..10d6ba7313 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -1206,16 +1206,17 @@ public class JdbcExecutor implements ExecutorInterface { // check if there exist a queued execution and submit it to the execution queue if (executor.getFlow().getConcurrency() != null) { - - // decrement execution concurrency limit // if an execution was queued but never running, it would have never been counted inside the concurrency limit and should not lead to popping a new queued execution - // this could only happen for KILLED execution. boolean queuedThenKilled = execution.getState().getCurrent() == State.Type.KILLED && execution.getState().getHistories().stream().anyMatch(h -> h.getState().isQueued()) - && execution.getState().getHistories().stream().noneMatch(h -> h.getState().isRunning()); + && execution.getState().getHistories().stream().noneMatch(h -> h.getState().onlyRunning()); + // if an execution was FAILED or CANCELLED due to concurrency limit exceeded, it would have never been counter inside the concurrency limit and should not lead to popping a new queued execution boolean concurrencyShortCircuitState = Concurrency.possibleTransitions(execution.getState().getCurrent()) && execution.getState().getHistories().get(execution.getState().getHistories().size() - 2).getState().isCreated(); - if (!queuedThenKilled && !concurrencyShortCircuitState) { + // as we may receive multiple time killed execution (one when we kill it, then one for each running worker task), we limit to the first we receive: when the state transitionned from KILLING to KILLED + boolean killingThenKilled = execution.getState().getCurrent().isKilled() && executor.getOriginalState() == State.Type.KILLING; + if (!queuedThenKilled && !concurrencyShortCircuitState && (!execution.getState().getCurrent().isKilled() || killingThenKilled)) { + // decrement execution concurrency limit and pop a new queued execution if needed concurrencyLimitStorage.decrement(executor.getFlow()); if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {