feat(core): introduce KV Metadata in-repository storing (#12342)

part of https://github.com/kestra-io/kestra/issues/12341
This commit is contained in:
brian.mulier
2025-10-24 14:01:50 +02:00
committed by brian-mulier-p
parent b18d304b77
commit d9144c8c4f
51 changed files with 1957 additions and 483 deletions

View File

@@ -0,0 +1,30 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.impl.DSL;
import java.util.ArrayList;
import java.util.List;
@Singleton
@PostgresRepositoryEnabled
public class PostgresKvMetadataRepository extends AbstractJdbcKvMetadataRepository {
@Inject
public PostgresKvMetadataRepository(
@Named("kvMetadata") PostgresRepository<PersistedKvMetadata> repository,
ApplicationContext applicationContext
) {
super(repository, applicationContext);
}
@Override
protected Condition findCondition(String query) {
return PostgresKvMetadataRepositoryService.findCondition(jdbcRepository, query);
}
}

View File

@@ -0,0 +1,21 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.jdbc.AbstractJdbcRepository;
import org.jooq.Condition;
import org.jooq.impl.DSL;
import java.util.ArrayList;
import java.util.List;
public abstract class PostgresKvMetadataRepositoryService {
public static Condition findCondition(AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository, String query) {
List<Condition> conditions = new ArrayList<>();
if (query != null) {
conditions.add(jdbcRepository.fullTextCondition(List.of("fulltext"), query));
}
return conditions.isEmpty() ? DSL.trueCondition() : DSL.and(conditions);
}
}

View File

@@ -0,0 +1,25 @@
CREATE TABLE IF NOT EXISTS kv_metadata (
"key" VARCHAR(768) NOT NULL PRIMARY KEY,
"value" JSONB NOT NULL,
"tenant_id" VARCHAR(250) NOT NULL GENERATED ALWAYS AS (value ->> 'tenantId') STORED,
"namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
"name" VARCHAR(350) NOT NULL GENERATED ALWAYS AS (value ->> 'name') STORED,
"description" TEXT GENERATED ALWAYS AS (value ->> 'description') STORED,
"version" INT NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'version' AS INTEGER)) STORED,
"last" BOOL NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'last' AS BOOL)) STORED,
"expiration_date" TIMESTAMPTZ GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'expirationDate')) STORED,
"updated" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
"deleted" BOOL NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'deleted' AS BOOL)) STORED,
fulltext TSVECTOR GENERATED ALWAYS AS (
FULLTEXT_INDEX(CAST(value ->> 'name' AS varchar))
) STORED
);
CREATE INDEX IF NOT EXISTS ix_last_deleted_tenant_namespace_name_version ON kv_metadata ("last", "deleted", "tenant_id", "namespace", "name", "version");
CREATE INDEX IF NOT EXISTS ix_last_deleted_tenant_namespace_name ON kv_metadata ("last", "deleted", "tenant_id", "namespace", "name");
CREATE INDEX IF NOT EXISTS ix_last_deleted_tenant_namespace_version ON kv_metadata ("last", "deleted", "tenant_id", "namespace", "version");
CREATE INDEX IF NOT EXISTS ix_last_deleted_tenant_name_version ON kv_metadata ("last", "deleted", "tenant_id", "name", "version");
CREATE OR REPLACE TRIGGER kv_metadata_updated BEFORE UPDATE
ON kv_metadata FOR EACH ROW EXECUTE PROCEDURE
UPDATE_UPDATED_DATETIME();

View File

@@ -0,0 +1,6 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcKvMetadataRepositoryTest;
public class PostgresKvMetadataRepositoryTest extends AbstractJdbcKvMetadataRepositoryTest {
}