fix(system): refactor concurrency limit to use a counter

A counter allow to lock by flow which solves the race when two executions are created at the same time and the executoion_runnings table is empty.

Evaluating concurrency limit on the main executionQueue method also avoid an unexpected behavior where the CREATED execution is processed twice as its status didn't change immediatly when QUEUED.

Closes https://github.com/kestra-io/kestra-ee/issues/4877
This commit is contained in:
Loïc Mathieu
2025-10-07 16:50:09 +02:00
parent 6dea3d2a56
commit 4a9564be3c
23 changed files with 271 additions and 250 deletions

View File

@@ -6,6 +6,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -15,8 +16,6 @@ import picocli.CommandLine;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@CommandLine.Command(
name = "submit-queued-execution",
description = {"Submit all queued execution to the executor",
@@ -49,9 +48,11 @@ public class SubmitQueuedCommand extends AbstractCommand {
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), throwConsumer(execution -> executionQueue.emit(execution.withState(State.Type.CREATED))));
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
executionQueue.emit(restart);
cpt++;
}
}

View File

@@ -26,7 +26,6 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
QueueInterface<Execution> execution();
@@ -59,7 +58,5 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
}

View File

@@ -0,0 +1,31 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ConcurrencyLimit implements HasUID {
@NotNull
String tenantId;
@NotNull
String namespace;
@NotNull
String flowId;
@With
Integer running;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId);
}
}

View File

@@ -0,0 +1,16 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
import io.kestra.repository.h2.H2Repository;
import io.kestra.repository.h2.H2RepositoryEnabled;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2ConcurrencyLimitStorage extends AbstractJdbcConcurrencyLimitStorage {
public H2ConcurrencyLimitStorage(@Named("concurrencylimit") H2Repository<ConcurrencyLimit> repository) {
super(repository);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public H2ExecutionRunningStorage(@Named("executionrunning") H2Repository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -145,14 +145,6 @@ public class H2QueueFactory implements QueueFactoryInterface {
return new H2Queue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new H2Queue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)

View File

@@ -1,2 +0,0 @@
-- We must truncate the table as in 0.24 there was a bug that lead to records not purged in this table
truncate table execution_running;

View File

@@ -1,12 +1,17 @@
CREATE TABLE IF NOT EXISTS execution_running (
CREATE TABLE IF NOT EXISTS concurrency_limit (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"tenant_id" VARCHAR(250) GENERATED ALWAYS AS (JQ_STRING("value", '.tenantId')),
"namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.namespace')),
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId'))
);
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId')),
"running" INT NOT NULL GENERATED ALWAYS AS (JQ_INTEGER("value", '.running'))
);
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running ("tenant_id", "namespace", "flow_id");
CREATE INDEX IF NOT EXISTS concurrency_limit__flow ON concurrency_limit ("tenant_id", "namespace", "flow_id");
DROP TABLE IF EXISTS execution_running;
DELETE FROM queues WHERE "type" = 'io.kestra.core.runners.ExecutionRunning';
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
@@ -25,5 +30,5 @@ ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning'
'io.kestra.core.runners.MultipleConditionEvent'
) NOT NULL

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
import io.kestra.repository.mysql.MysqlRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlConcurrencyLimitStorage extends AbstractJdbcConcurrencyLimitStorage {
public MysqlConcurrencyLimitStorage(@Named("concurrencylimit") MysqlRepository<ConcurrencyLimit> repository) {
super(repository);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.runner.mysql;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.mysql.MysqlRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public MysqlExecutionRunningStorage(@Named("executionrunning") MysqlRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -145,14 +145,6 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)

View File

@@ -1,2 +0,0 @@
-- We must truncate the table as in 0.24 there was a bug that lead to records not purged in this table
truncate table execution_running;

View File

@@ -1,12 +1,17 @@
CREATE TABLE IF NOT EXISTS execution_running (
CREATE TABLE IF NOT EXISTS concurrency_limit (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`tenant_id` VARCHAR(250) GENERATED ALWAYS AS (value ->> '$.tenantId') STORED,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
`running` INT GENERATED ALWAYS AS (value ->> '$.running') STORED NOT NULL,
INDEX ix_flow (tenant_id, namespace, flow_id)
);
DROP TABLE IF EXISTS execution_running;
DELETE FROM queues WHERE type = 'io.kestra.core.runners.ExecutionRunning';
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
@@ -24,5 +29,5 @@ ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning'
) NOT NULL;
'io.kestra.core.runners.MultipleConditionEvent'
) NOT NULL;

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.jdbc.runner.AbstractJdbcConcurrencyLimitStorage;
import io.kestra.repository.postgres.PostgresRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresConcurrencyLimitStorage extends AbstractJdbcConcurrencyLimitStorage {
public PostgresConcurrencyLimitStorage(@Named("concurrencylimit") PostgresRepository<ConcurrencyLimit> repository) {
super(repository);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.postgres.PostgresRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public PostgresExecutionRunningStorage(@Named("executionrunning") PostgresRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -145,14 +145,6 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)

View File

@@ -1,2 +0,0 @@
-- We must truncate the table as in 0.24 there was a bug that lead to records not purged in this table
truncate table execution_running;

View File

@@ -1,11 +1,12 @@
CREATE TABLE IF NOT EXISTS execution_running (
CREATE TABLE IF NOT EXISTS concurrency_limit (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
tenant_id VARCHAR(250) GENERATED ALWAYS AS (value ->> 'tenantId') STORED,
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED,
running INT NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'running' AS INTEGER)) STORED
);
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running (tenant_id, namespace, flow_id);
CREATE INDEX IF NOT EXISTS concurrency_limit__flow ON concurrency_limit (tenant_id, namespace, flow_id);
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.ExecutionRunning';
DROP TABLE IF EXISTS execution_running;

View File

@@ -126,9 +126,9 @@ public class JdbcTableConfigsFactory {
}
@Bean
@Named("executionrunning")
public InstantiableJdbcTableConfig executionRunning() {
return new InstantiableJdbcTableConfig("executionrunning", ExecutionRunning.class, "execution_running");
@Named("concurrencylimit")
public InstantiableJdbcTableConfig concurrencyLimit() {
return new InstantiableJdbcTableConfig("concurrencylimit", ConcurrencyLimit.class, "concurrency_limit");
}
public static class InstantiableJdbcTableConfig extends JdbcTableConfig {

View File

@@ -0,0 +1,117 @@
package io.kestra.jdbc.runner;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Insert;
import org.jooq.SQLDialect;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
public class AbstractJdbcConcurrencyLimitStorage extends AbstractJdbcRepository {
protected io.kestra.jdbc.AbstractJdbcRepository<ConcurrencyLimit> jdbcRepository;
public AbstractJdbcConcurrencyLimitStorage(io.kestra.jdbc.AbstractJdbcRepository<ConcurrencyLimit> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}
/**
* Fetch the concurrency limit counter then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
* <p>
* Note that to avoid a race when no concurrency limit counter exists, it first always try to insert a 0 counter.
*/
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, ConcurrencyLimit, Pair<ExecutionRunning, ConcurrencyLimit>> consumer) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var dslContext = DSL.using(configuration);
// Note: ideally, we should emit an INSERT IGNORE or ON CONFLICT DO NOTHING but H2 didn't support it.
// So to avoid the case where no concurrency limit exist and two executors starts a flow concurrently, we select/insert and if the insert fail select again
// Anyway this would only occur once in a flow lifecycle so even if it's not elegant it should work
// But as this pattern didn't work with Postgres, we emit INSERT IGNORE in postgres so we're sure it works their also.
var selected = fetchOne(dslContext, flow).orElseGet(() -> {
try {
var zeroConcurrencyLimit = ConcurrencyLimit.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.running(0)
.build();
Map<Field<Object>, Object> finalFields = this.jdbcRepository.persistFields(zeroConcurrencyLimit);
var insert = dslContext
.insertInto(this.jdbcRepository.getTable())
.set(field("key"), this.jdbcRepository.key(zeroConcurrencyLimit))
.set(finalFields);
if (dslContext.configuration().dialect().supports(SQLDialect.POSTGRES)) {
insert.onDuplicateKeyIgnore().execute();
} else {
insert.execute();
}
} catch (DataAccessException e) {
// we ignore any constraint violation
}
// refetch to have a lock on it
// at this point we are sure the record is inserted so it should never throw
return fetchOne(dslContext, flow).orElseThrow();
});
var pair = consumer.apply(dslContext, selected);
save(dslContext, pair.getRight());
return pair.getLeft();
});
}
/**
* Decrement the concurrency limit counter.
* Must only be called when a flow having concurrency limit ends.
*/
public void decrement(FlowInterface flow) {
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
var dslContext = DSL.using(configuration);
fetchOne(dslContext, flow).ifPresent(
concurrencyLimit -> save(dslContext, concurrencyLimit.withRunning(concurrencyLimit.getRunning() == 0 ? 0 : concurrencyLimit.getRunning() - 1))
);
});
}
/**
* Increment the concurrency limit counter.
* Must only be called when a queued execution is popped, other use cases must pass thought the standard process of creating an execution.
*/
public void increment(DSLContext dslContext, FlowInterface flow) {
fetchOne(dslContext, flow).ifPresent(
concurrencyLimit -> save(dslContext, concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1))
);
}
private Optional<ConcurrencyLimit> fetchOne(DSLContext dslContext, FlowInterface flow) {
var select = dslContext
.select()
.from(this.jdbcRepository.getTable())
.where(this.buildTenantCondition(flow.getTenantId()))
.and(field("namespace").eq(flow.getNamespace()))
.and(field("flow_id").eq(flow.getId()));
return Optional.ofNullable(select.forUpdate().fetchOne())
.map(record -> this.jdbcRepository.map(record));
}
private void save(DSLContext dslContext, ConcurrencyLimit concurrencyLimit) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(concurrencyLimit);
this.jdbcRepository.persist(concurrencyLimit, dslContext, fields);
}
}

View File

@@ -11,6 +11,7 @@ import org.jooq.impl.DSL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRepository {
@@ -25,12 +26,12 @@ public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRep
this.jdbcRepository.persist(executionQueued, dslContext, fields);
}
public void pop(String tenantId, String namespace, String flowId, Consumer<Execution> consumer) {
public void pop(String tenantId, String namespace, String flowId, BiConsumer<DSLContext, Execution> consumer) {
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
var select = DSL
.using(configuration)
var dslContext = DSL.using(configuration);
var select = dslContext
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(buildTenantCondition(tenantId))
@@ -43,7 +44,7 @@ public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRep
Optional<ExecutionQueued> maybeExecution = this.jdbcRepository.fetchOne(select);
if (maybeExecution.isPresent()) {
consumer.accept(maybeExecution.get().getExecution());
consumer.accept(dslContext, maybeExecution.get().getExecution());
this.jdbcRepository.delete(maybeExecution.get());
}
});

View File

@@ -1,83 +0,0 @@
package io.kestra.jdbc.runner;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.util.Map;
import java.util.Optional;
import java.util.function.*;
public class AbstractJdbcExecutionRunningStorage extends AbstractJdbcRepository {
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutionRunning> jdbcRepository;
public AbstractJdbcExecutionRunningStorage(io.kestra.jdbc.AbstractJdbcRepository<ExecutionRunning> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}
public void save(ExecutionRunning executionRunning) {
jdbcRepository.getDslContextWrapper().transaction(
configuration -> save(DSL.using(configuration), executionRunning)
);
}
public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
this.jdbcRepository.persist(executionRunning, dslContext, fields);
}
/**
* Count for running executions then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
* <p>
* Note: when there is no execution running, there will be no database locks, so multiple calls will return 0.
* This is only potentially an issue with multiple executor instances when the concurrency limit is set to 1.
*/
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, Integer, ExecutionRunning> consumer) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var dslContext = DSL.using(configuration);
var select = dslContext
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(this.buildTenantCondition(flow.getTenantId()))
.and(field("namespace").eq(flow.getNamespace()))
.and(field("flow_id").eq(flow.getId()));
Integer count = select.forUpdate().fetch().size();
return consumer.apply(dslContext, count);
});
}
/**
* Delete the execution running corresponding to the given execution.
* @return true if the execution was deleted, false if it was not existing
*/
public boolean remove(Execution execution) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var select = DSL
.using(configuration)
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(buildTenantCondition(execution.getTenantId()))
.and(field("key").eq(IdUtils.fromPartsAndSeparator('|', execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
.forUpdate();
Optional<ExecutionRunning> maybeExecution = this.jdbcRepository.fetchOne(select);
return maybeExecution
.map(executionRunning -> {
this.jdbcRepository.delete(executionRunning);
return true;
})
.orElse(false);
});
}
}

View File

@@ -64,8 +64,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
import static io.kestra.core.utils.Rethrow.*;
@SuppressWarnings("deprecation")
@Singleton
@@ -117,10 +116,6 @@ public class JdbcExecutor implements ExecutorInterface {
@Named(QueueFactoryInterface.CLUSTER_EVENT_NAMED)
private Optional<QueueInterface<ClusterEvent>> clusterEventQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
private QueueInterface<ExecutionRunning> executionRunningQueue;
@Inject
@Named(QueueFactoryInterface.MULTIPLE_CONDITION_EVENT_NAMED)
private QueueInterface<MultipleConditionEvent> multipleConditionEventQueue;
@@ -159,7 +154,7 @@ public class JdbcExecutor implements ExecutorInterface {
private AbstractJdbcExecutionQueuedStorage executionQueuedStorage;
@Inject
private AbstractJdbcExecutionRunningStorage executionRunningStorage;
private AbstractJdbcConcurrencyLimitStorage concurrencyLimitStorage;
@Inject
private AbstractJdbcExecutorStateStorage executorStateStorage;
@@ -318,7 +313,6 @@ public class JdbcExecutor implements ExecutorInterface {
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
this.receiveCancellations.addFirst(this.multipleConditionEventQueue.receive(Executor.class, this::multipleConditionEventQueue));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
@@ -603,11 +597,23 @@ public class JdbcExecutor implements ExecutorInterface {
.concurrencyState(ExecutionRunning.ConcurrencyState.CREATED)
.build();
executionRunningQueue.emit(executionRunning);
return Pair.of(
executor,
executorState
);
ExecutionRunning processed = concurrencyLimitStorage.countThenProcess(flow, (dslContext, concurrencyLimit) -> {
ExecutionRunning computed = executorService.processExecutionRunning(flow, concurrencyLimit.getRunning(), executionRunning.withExecution(execution)); // be sure that the execution running contains the latest value of the execution
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
return Pair.of(computed, concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1));
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStorage.save(dslContext, ExecutionQueued.fromExecutionRunning(computed));
}
return Pair.of(computed, concurrencyLimit);
});
// if the execution is queued or terminated due to concurrency limit, we stop here
if (processed.getExecution().getState().isTerminated() || processed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
return Pair.of(
executor.withExecution(processed.getExecution(), "handleConcurrencyLimit"),
executorState
);
}
}
// handle execution changed SLA
@@ -1017,37 +1023,6 @@ public class JdbcExecutor implements ExecutorInterface {
}
}
private void executionRunningQueue(Either<ExecutionRunning, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a running execution: {}", either.getRight().getMessage());
return;
}
ExecutionRunning executionRunning = either.getLeft();
// we need to update the execution after applying concurrency limit so we use the lock for that
Executor executor = executionRepository.lock(executionRunning.getExecution().getId(), pair -> {
Execution execution = pair.getLeft();
Executor newExecutor = new Executor(execution, null);
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
ExecutionRunning processed = executionRunningStorage.countThenProcess(flow, (dslContext, count) -> {
ExecutionRunning computed = executorService.processExecutionRunning(flow, count, executionRunning.withExecution(execution)); // be sure that the execution running contains the latest value of the execution
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
executionRunningStorage.save(dslContext, computed);
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStorage.save(dslContext, ExecutionQueued.fromExecutionRunning(computed));
}
return computed;
});
return Pair.of(
newExecutor.withExecution(processed.getExecution(), "handleExecutionRunning"),
pair.getRight()
);
});
toExecution(executor);
}
private Executor killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
return executionRepository.lock(executionId, pair -> {
Execution currentExecution = pair.getLeft();
@@ -1161,31 +1136,31 @@ public class JdbcExecutor implements ExecutorInterface {
// check if there exist a queued execution and submit it to the execution queue
if (executor.getFlow().getConcurrency() != null) {
// purge execution running
boolean hasExecutionRunning = executionRunningStorage.remove(execution);
// decrement execution concurrency limit
// if an execution was queued but never running, it would have never been counted inside the concurrency limit and should not lead to popping a new queued execution
// this could only happen for KILLED execution.
boolean queuedThenKilled = execution.getState().getCurrent() == State.Type.KILLED
&& execution.getState().getHistories().stream().anyMatch(h -> h.getState().isQueued())
&& execution.getState().getHistories().stream().noneMatch(h -> h.getState().isRunning());
if (!queuedThenKilled) {
concurrencyLimitStorage.decrement(executor.getFlow());
// some execution may have concurrency limit but no execution running: for ex QUEUED -> KILLED, in this case we should not pop any execution
if (hasExecutionRunning && executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
executor.getFlow().getNamespace(),
executor.getFlow().getId(),
throwConsumer(queued -> {
var newExecution = queued.withState(State.Type.RUNNING);
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(newExecution.getTenantId())
.namespace(newExecution.getNamespace())
.flowId(newExecution.getFlowId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
.build();
executionRunningStorage.save(executionRunning);
executionQueue.emit(newExecution);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
if (executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
var finalFlow = executor.getFlow();
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
executor.getFlow().getNamespace(),
executor.getFlow().getId(),
throwBiConsumer((dslContext, queued) -> {
var newExecution = queued.withState(State.Type.RUNNING);
concurrencyLimitStorage.increment(dslContext, finalFlow);
executionQueue.emit(newExecution);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
// process flow triggers to allow listening on RUNNING state after a QUEUED state
processFlowTriggers(newExecution);
})
);
// process flow triggers to allow listening on RUNNING state after a QUEUED state
processFlowTriggers(newExecution);
})
);
}
}
}