feat(core): flow concurrency limit (#2371)

close kestra-io/kestra#1400
This commit is contained in:
Loïc Mathieu
2023-11-06 16:56:30 +01:00
committed by GitHub
parent 8d6fd66dfd
commit c74540b3ae
34 changed files with 596 additions and 20 deletions

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutionQueuedStorage extends AbstractJdbcExecutionQueuedStorage {
public PostgresExecutionQueuedStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(ExecutionQueued.class, applicationContext));
}
}

View File

@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS execution_queued (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
tenant_id VARCHAR(250) GENERATED ALWAYS AS (value ->> 'tenantId') STORED,
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED,
date TIMESTAMPTZ NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'date')) STORED
);
CREATE INDEX IF NOT EXISTS execution_queued__flow ON execution_queued (tenant_id, namespace, flow_id);

View File

@@ -0,0 +1 @@
ALTER TYPE state_type ADD VALUE IF NOT EXISTS 'CANCELLED';

View File

@@ -76,6 +76,9 @@ kestra:
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"
executionqueued:
table: "execution_queued"
cls: io.kestra.core.runners.ExecutionQueued
queues:
min-poll-interval: 10ms