fix(executions): flow concurrency limit not honors when executions are created at a high rate

This is due to the fact that we now process the execution queue concurrently so there is a race when counting currently running executions. This can be seen easily using a ForEachItem as it could create tens or hundreds of executions almost instantly leading to almost all those executions started as they would all see 0 executions running...

Using a dedicated execution running queue, as done in EE, would serialize the messages and fix the issue.

However, if using multiple executor instances and concurrency limit = 1, there is a theoretical race as no locks will be done if no execution is running. A max surge of executions could be as high as the number of executor but this race is less probable to happen in real world scenario.

Fixes #10167
This commit is contained in:
Loïc Mathieu
2025-07-23 11:43:15 +02:00
parent 6dcba16314
commit ab464fff6e
20 changed files with 397 additions and 70 deletions

View File

@@ -28,6 +28,7 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
QueueInterface<Execution> execution();
@@ -58,4 +59,6 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
}

View File

@@ -27,8 +27,6 @@ public class QueueService {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == ExecutionRunning.class) {
return ((ExecutionRunning) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionEnd.class) {
return ((SubflowExecutionEnd) object).getParentExecutionId();
} else {

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
@@ -11,7 +12,7 @@ import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ExecutionRunning {
public class ExecutionRunning implements HasUID {
String tenantId;
@NotNull
@@ -26,6 +27,7 @@ public class ExecutionRunning {
@With
ConcurrencyState concurrencyState;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
}

View File

@@ -102,49 +102,39 @@ public class ExecutorService {
return this.flowExecutorInterface;
}
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
// if above the limit, handle concurrency limit based on its behavior
if (count >= flow.getConcurrency().getLimit()) {
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
// if concurrency was removed, it can be null as we always get the latest flow definition
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
return switch (flow.getConcurrency().getBehavior()) {
case QUEUE -> {
var newExecution = execution.withState(State.Type.QUEUED);
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
.build();
// when max concurrency is reached, we throttle the execution and stop processing
logService.logExecution(
newExecution,
executionRunning.getExecution(),
Level.INFO,
"Flow is queued due to concurrency limit exceeded, {} running(s)",
count
"Execution is queued due to concurrency limit exceeded, {} running(s)",
runningCount
);
// return the execution queued
yield executor
.withExecutionRunning(executionRunning)
.withExecution(newExecution, "checkConcurrencyLimit");
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
yield executionRunning
.withExecution(newExecution)
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
}
case CANCEL ->
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
case FAIL ->
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
};
}
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
var executionRunning = new ExecutionRunning(
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
executor.getExecution(),
ExecutionRunning.ConcurrencyState.RUNNING
);
return executor.withExecutionRunning(executionRunning);
// if under the limit, run it!
return executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
}
public Executor process(Executor executor) {

View File

@@ -417,6 +417,12 @@ public abstract class AbstractRunnerTest {
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"})
protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
}
@Test
@ExecuteFlow("flows/valids/executable-fail.yml")
void badExecutable(Execution execution) {

View File

@@ -8,18 +8,28 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Flux;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@@ -28,7 +38,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton
public class FlowConcurrencyCaseTest {
@Inject
private RunnerUtils runnerUtils;
private StorageInterface storageInterface;
@Inject
protected RunnerUtils runnerUtils;
@Inject
private FlowInputOutput flowIO;
@Inject
private FlowRepositoryInterface flowRepository;
@@ -237,4 +253,49 @@ public class FlowConcurrencyCaseTest {
assertThat(secondExecutionResult.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(secondExecutionResult.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.CANCELLED);
}
public void flowConcurrencyWithForEachItem() throws TimeoutException, QueueException, InterruptedException, URISyntaxException, IOException {
URI file = storageUpload();
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution forEachItem = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-for-each-item", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
assertThat(forEachItem.getState().getCurrent()).isEqualTo(Type.RUNNING);
Set<String> executionIds = new HashSet<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if ("flow-concurrency-queue".equals(e.getLeft().getFlowId()) && e.getLeft().getState().isRunning()) {
executionIds.add(e.getLeft().getId());
}
});
// wait a little to be sure there are not too many executions started
Thread.sleep(500);
assertThat(executionIds).hasSize(1);
receive.blockLast();
Execution terminated = runnerUtils.awaitExecution(e -> e.getId().equals(forEachItem.getId()) && e.getState().isTerminated(), () -> {}, Duration.ofSeconds(10));
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
}
private URI storageUpload() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), content());
return storageInterface.put(
MAIN_TENANT,
null,
new URI("/file/storage/file.txt"),
new FileInputStream(tempFile)
);
}
private List<String> content() {
return IntStream
.range(0, 7)
.mapToObj(value -> StringUtils.leftPad(value + "", 20))
.toList();
}
}

View File

@@ -0,0 +1,21 @@
id: flow-concurrency-for-each-item
namespace: io.kestra.tests
inputs:
- id: file
type: FILE
- id: batch
type: INT
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ inputs.file }}"
batch:
rows: "{{inputs.batch}}"
namespace: io.kestra.tests
flowId: flow-concurrency-queue
wait: true
transmitFailed: true
inputs:
items: "{{ taskrun.items }}"

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public H2ExecutionRunningStorage(@Named("executionrunning") H2Repository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -144,4 +144,12 @@ public class H2QueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new H2Queue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new H2Queue<>(ExecutionRunning.class, applicationContext);
}
}

View File

@@ -0,0 +1,29 @@
CREATE TABLE IF NOT EXISTS execution_running (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"tenant_id" VARCHAR(250) GENERATED ALWAYS AS (JQ_STRING("value", '.tenantId')),
"namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.namespace')),
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId'))
);
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running ("tenant_id", "namespace", "flow_id");
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning'
) NOT NULL

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.mysql.MysqlRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public MysqlExecutionRunningStorage(@Named("executionrunning") MysqlRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -144,4 +144,12 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
}
}

View File

@@ -0,0 +1,28 @@
CREATE TABLE IF NOT EXISTS execution_running (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`tenant_id` VARCHAR(250) GENERATED ALWAYS AS (value ->> '$.tenantId') STORED,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
INDEX ix_flow (tenant_id, namespace, flow_id)
);
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning'
) NOT NULL;

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.postgres.PostgresRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public PostgresExecutionRunningStorage(@Named("executionrunning") PostgresRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -144,4 +144,12 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
}
}

View File

@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS execution_running (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
tenant_id VARCHAR(250) GENERATED ALWAYS AS (value ->> 'tenantId') STORED,
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED
);
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running (tenant_id, namespace, flow_id);
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.ExecutionRunning';

View File

@@ -125,6 +125,12 @@ public class JdbcTableConfigsFactory {
return new InstantiableJdbcTableConfig("dashboards", Dashboard.class, "dashboards");
}
@Bean
@Named("executionrunning")
public InstantiableJdbcTableConfig executionRunning() {
return new InstantiableJdbcTableConfig("executionrunning", ExecutionRunning.class, "execution_running");
}
public static class InstantiableJdbcTableConfig extends JdbcTableConfig {
public InstantiableJdbcTableConfig(String name, @Nullable Class<?> cls, String table) {
super(name, cls, table);

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
@@ -19,9 +20,9 @@ public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRep
this.jdbcRepository = jdbcRepository;
}
public void save(ExecutionQueued executionQueued) {
public void save(DSLContext dslContext, ExecutionQueued executionQueued) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionQueued);
this.jdbcRepository.persist(executionQueued, fields);
this.jdbcRepository.persist(executionQueued, dslContext, fields);
}
public void pop(String tenantId, String namespace, String flowId, Consumer<Execution> consumer) {

View File

@@ -0,0 +1,71 @@
package io.kestra.jdbc.runner;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.util.Map;
import java.util.Optional;
import java.util.function.*;
public class AbstractJdbcExecutionRunningStorage extends AbstractJdbcRepository {
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutionRunning> jdbcRepository;
public AbstractJdbcExecutionRunningStorage(io.kestra.jdbc.AbstractJdbcRepository<ExecutionRunning> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}
public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
this.jdbcRepository.persist(executionRunning, dslContext, fields);
}
/**
* Count for running executions then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
* <p>
* Note: when there is no execution running, there will be no database locks, so multiple calls will return 0.
* This is only potentially an issue with multiple executor instances when the concurrency limit is set to 1.
*/
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, Integer, ExecutionRunning> consumer) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var dslContext = DSL.using(configuration);
var select = dslContext
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(this.buildTenantCondition(flow.getTenantId()))
.and(field("namespace").eq(flow.getNamespace()))
.and(field("flow_id").eq(flow.getId()));
Integer count = select.forUpdate().fetch().size();
return consumer.apply(dslContext, count);
});
}
/**
* Delete the execution running corresponding to the given execution.
*/
public void remove(Execution execution) {
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
var select = DSL
.using(configuration)
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(buildTenantCondition(execution.getTenantId()))
.and(field("key").eq(IdUtils.fromPartsAndSeparator('|', execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
.forUpdate();
Optional<ExecutionRunning> maybeExecution = this.jdbcRepository.fetchOne(select);
maybeExecution.ifPresent(executionRunning -> this.jdbcRepository.delete(executionRunning));
});
}
}

View File

@@ -6,7 +6,6 @@ import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.sla.*;
import io.kestra.core.models.tasks.ExecutableTask;
@@ -113,6 +112,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Named(QueueFactoryInterface.CLUSTER_EVENT_NAMED)
private Optional<QueueInterface<ClusterEvent>> clusterEventQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
private QueueInterface<ExecutionRunning> executionRunningQueue;
@Inject
private RunContextFactory runContextFactory;
@@ -146,6 +149,9 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Inject
private AbstractJdbcExecutionQueuedStorage executionQueuedStorage;
@Inject
private AbstractJdbcExecutionRunningStorage executionRunningStorage;
@Inject
private AbstractJdbcExecutorStateStorage executorStateStorage;
@@ -303,6 +309,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
ScheduledFuture<?> scheduledDelayFuture = scheduledDelay.scheduleAtFixedRate(
@@ -553,37 +560,22 @@ public class JdbcExecutor implements ExecutorInterface, Service {
monitors.forEach(monitor -> slaMonitorStorage.save(monitor));
}
// queue execution if needed (limit concurrency)
// handle concurrency limit, we need to use a different queue to be sure that execution running
// are processed sequentially so inside a queue with no parallelism
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
ExecutionCount count = executionRepository.executionCounts(
flow.getTenantId(),
List.of(new io.kestra.core.models.executions.statistics.Flow(flow.getNamespace(), flow.getId())),
List.of(State.Type.RUNNING, State.Type.PAUSED),
null,
null,
null
).getFirst();
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(executor.getFlow().getTenantId())
.namespace(executor.getFlow().getNamespace())
.flowId(executor.getFlow().getId())
.execution(executor.getExecution())
.concurrencyState(ExecutionRunning.ConcurrencyState.CREATED)
.build();
executor = executorService.checkConcurrencyLimit(executor, flow, execution, count.getCount());
// the execution has been queued, we save the queued execution and stops here
if (executor.getExecutionRunning() != null && executor.getExecutionRunning().getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStorage.save(ExecutionQueued.fromExecutionRunning(executor.getExecutionRunning()));
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution())).increment();
return Pair.of(
executor,
executorState
);
}
// the execution has been moved to FAILED or CANCELLED, we stop here
if (executor.getExecution().getState().isTerminated()) {
return Pair.of(
executor,
executorState
);
}
executionRunningQueue.emit(executionRunning);
return Pair.of(
executor,
executorState
);
}
// handle execution changed SLA
@@ -790,7 +782,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
// move it to the state of the child flow, and merge the outputs.
// This is important to avoid races such as RUNNING that arrives after the first SUCCESS/FAILED.
RunContext runContext = runContextFactory.of(flow, task, current.getExecution(), message.getParentTaskRun());
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId()).withState(message.getState());
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId());
if (taskRun.getState().getCurrent() != message.getState()) {
taskRun = taskRun.withState(message.getState());
}
Map<String, Object> outputs = MapUtils.deepMerge(taskRun.getOutputs(), message.getParentTaskRun().getOutputs());
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
taskRun = taskRun.withOutputs(variables);
@@ -981,6 +976,37 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
}
private void executionRunningQueue(Either<ExecutionRunning, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a running execution: {}", either.getRight().getMessage());
return;
}
ExecutionRunning executionRunning = either.getLeft();
FlowInterface flow = flowMetaStore.findByExecution(executionRunning.getExecution()).orElseThrow();
ExecutionRunning processed = executionRunningStorage.countThenProcess(flow, (dslContext, count) -> {
ExecutionRunning computed = executorService.processExecutionRunning(flow, count, executionRunning);
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
executionRunningStorage.save(dslContext, computed);
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStorage.save(dslContext, ExecutionQueued.fromExecutionRunning(computed));
}
return computed;
});
try {
executionQueue.emit(processed.getExecution());
} catch (QueueException e) {
try {
this.executionQueue.emit(
processed.getExecution().failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED)
);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", processed.getExecution().getId(), ex);
}
}
}
private Executor killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
return executionRepository.lock(executionId, pair -> {
Execution currentExecution = pair.getLeft();
@@ -1083,6 +1109,11 @@ public class JdbcExecutor implements ExecutorInterface, Service {
slaMonitorStorage.purge(executor.getExecution().getId());
}
// purge execution running
if (executor.getFlow().getConcurrency() != null) {
executionRunningStorage.remove(execution);
}
// check if there exist a queued execution and submit it to the execution queue
if (executor.getFlow().getConcurrency() != null && executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
executionQueuedStorage.pop(executor.getFlow().getTenantId(),