mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
fix(executions): concurrency limit exceeded for KILLED execution
Fixes #13211
This commit is contained in:
@@ -255,6 +255,10 @@ public class State {
|
||||
return this == Type.RUNNING || this == Type.KILLING;
|
||||
}
|
||||
|
||||
public boolean onlyRunning() {
|
||||
return this == Type.RUNNING;
|
||||
}
|
||||
|
||||
public boolean isFailed() {
|
||||
return this == Type.FAILED;
|
||||
}
|
||||
|
||||
@@ -482,6 +482,18 @@ public abstract class AbstractRunnerTest {
|
||||
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
|
||||
void flowConcurrencyKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyKilled();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
|
||||
void flowConcurrencyQueueKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueKilled();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/executable-fail.yml")
|
||||
void badExecutable(Execution execution) {
|
||||
|
||||
@@ -70,6 +70,7 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(Type.CANCELLED::equals);
|
||||
} finally {
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.awaitExecution(e -> e.getState().isTerminated(), execution1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,6 +86,7 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(shouldFailExecutions.stream().map(Execution::getState).map(State::getCurrent)).allMatch(State.Type.FAILED::equals);
|
||||
} finally {
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.awaitExecution(e -> e.getState().isTerminated(), execution1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,6 +242,94 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(terminated.getTaskRunList()).isNull();
|
||||
}
|
||||
|
||||
public void flowConcurrencyKilled() throws QueueException, InterruptedException {
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
|
||||
try {
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||
|
||||
// we kill execution 1, execution 2 should run but not execution 3
|
||||
killQueue.emit(ExecutionKilledExecution
|
||||
.builder()
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.executionId(execution1.getId())
|
||||
.isOnKillCascade(true)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.build()
|
||||
);
|
||||
|
||||
Execution killed = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.KILLED), execution1);
|
||||
assertThat(killed.getState().getCurrent()).isEqualTo(Type.KILLED);
|
||||
assertThat(killed.getState().getHistories().stream().anyMatch(h -> h.getState() == Type.RUNNING)).isTrue();
|
||||
|
||||
// we now check that execution 2 is running
|
||||
Execution running = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
|
||||
assertThat(running.getState().getCurrent()).isEqualTo(Type.RUNNING);
|
||||
|
||||
// we check that execution 3 is still queued
|
||||
Thread.sleep(100); // wait a little to be 100% sure
|
||||
Execution queued = runnerUtils.awaitExecution(e -> e.getState().isQueued(), execution3);
|
||||
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||
} finally {
|
||||
// kill everything to avoid dangling executions
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.killExecution(execution2);
|
||||
runnerUtils.killExecution(execution3);
|
||||
|
||||
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
||||
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueKilled() throws QueueException, InterruptedException {
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
|
||||
try {
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||
|
||||
// we kill execution 2, execution 3 should not run
|
||||
killQueue.emit(ExecutionKilledExecution
|
||||
.builder()
|
||||
.state(ExecutionKilled.State.REQUESTED)
|
||||
.executionId(execution2.getId())
|
||||
.isOnKillCascade(true)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.build()
|
||||
);
|
||||
|
||||
Execution killed = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.KILLED), execution2);
|
||||
assertThat(killed.getState().getCurrent()).isEqualTo(Type.KILLED);
|
||||
assertThat(killed.getState().getHistories().stream().noneMatch(h -> h.getState() == Type.RUNNING)).isTrue();
|
||||
|
||||
// we now check that execution 3 is still queued
|
||||
Thread.sleep(100); // wait a little to be 100% sure
|
||||
Execution queued = runnerUtils.awaitExecution(e -> e.getState().isQueued(), execution3);
|
||||
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
|
||||
} finally {
|
||||
// kill everything to avoid dangling executions
|
||||
runnerUtils.killExecution(execution1);
|
||||
runnerUtils.killExecution(execution2);
|
||||
runnerUtils.killExecution(execution3);
|
||||
|
||||
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
|
||||
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
|
||||
}
|
||||
}
|
||||
|
||||
private URI storageUpload(String tenantId) throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
id: flow-concurrency-queue-killed
|
||||
namespace: io.kestra.tests
|
||||
|
||||
concurrency:
|
||||
behavior: QUEUE
|
||||
limit: 1
|
||||
|
||||
tasks:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT1M
|
||||
@@ -1206,16 +1206,17 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
|
||||
// check if there exist a queued execution and submit it to the execution queue
|
||||
if (executor.getFlow().getConcurrency() != null) {
|
||||
|
||||
// decrement execution concurrency limit
|
||||
// if an execution was queued but never running, it would have never been counted inside the concurrency limit and should not lead to popping a new queued execution
|
||||
// this could only happen for KILLED execution.
|
||||
boolean queuedThenKilled = execution.getState().getCurrent() == State.Type.KILLED
|
||||
&& execution.getState().getHistories().stream().anyMatch(h -> h.getState().isQueued())
|
||||
&& execution.getState().getHistories().stream().noneMatch(h -> h.getState().isRunning());
|
||||
&& execution.getState().getHistories().stream().noneMatch(h -> h.getState().onlyRunning());
|
||||
// if an execution was FAILED or CANCELLED due to concurrency limit exceeded, it would have never been counter inside the concurrency limit and should not lead to popping a new queued execution
|
||||
boolean concurrencyShortCircuitState = Concurrency.possibleTransitions(execution.getState().getCurrent())
|
||||
&& execution.getState().getHistories().get(execution.getState().getHistories().size() - 2).getState().isCreated();
|
||||
if (!queuedThenKilled && !concurrencyShortCircuitState) {
|
||||
// as we may receive multiple time killed execution (one when we kill it, then one for each running worker task), we limit to the first we receive: when the state transitionned from KILLING to KILLED
|
||||
boolean killingThenKilled = execution.getState().getCurrent().isKilled() && executor.getOriginalState() == State.Type.KILLING;
|
||||
if (!queuedThenKilled && !concurrencyShortCircuitState && (!execution.getState().getCurrent().isKilled() || killingThenKilled)) {
|
||||
// decrement execution concurrency limit and pop a new queued execution if needed
|
||||
concurrencyLimitStorage.decrement(executor.getFlow());
|
||||
|
||||
if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
|
||||
|
||||
Reference in New Issue
Block a user