feat(system): move the indexer in its own module

Part-of: https://github.com/kestra-io/kestra-ee/issues/5751
This commit is contained in:
Loïc Mathieu
2025-12-11 15:43:25 +01:00
parent e23f9df7e5
commit 6f9ae15661
18 changed files with 62 additions and 48 deletions

View File

@@ -38,6 +38,7 @@ dependencies {
implementation project(":scheduler")
implementation project(":webserver")
implementation project(":worker")
implementation project(":indexer")
//test
testImplementation project(':tests')

View File

@@ -77,6 +77,7 @@ dependencies {
testImplementation project(':worker')
testImplementation project(':scheduler')
testImplementation project(':executor')
testImplementation project(':indexer')
testImplementation "io.micronaut:micronaut-http-client"
testImplementation "io.micronaut:micronaut-http-server-netty"

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.FlowScope;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.IndexingRepository;
import io.kestra.core.utils.DateUtils;
import io.kestra.plugin.core.dashboard.data.Executions;
import io.micronaut.data.model.Pageable;
@@ -23,7 +24,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Execution>, QueryBuilderInterface<Executions.Fields> {
public interface ExecutionRepositoryInterface extends QueryBuilderInterface<Executions.Fields> {
default Optional<Execution> findById(String tenantId, String id) {
return findById(tenantId, id, false);
}
@@ -35,7 +36,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
ArrayListTotal<Execution> findByFlowId(String tenantId, String namespace, String id, Pageable pageable);
/**
* Finds all the executions that was triggered by the given execution id.
* Finds all the executions that were triggered by the given execution id.
*
* @param tenantId the tenant id.
* @param triggerExecutionId the id of the execution trigger.

View File

@@ -9,7 +9,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Optional;
public interface KvMetadataRepositoryInterface extends SaveRepositoryInterface<PersistedKvMetadata> {
public interface KvMetadataRepositoryInterface {
Optional<PersistedKvMetadata> findByName(
String tenantId,
String namespace,
@@ -35,6 +35,8 @@ public interface KvMetadataRepositoryInterface extends SaveRepositoryInterface<P
FetchVersion fetchBehavior
);
PersistedKvMetadata save(PersistedKvMetadata item);
default PersistedKvMetadata delete(PersistedKvMetadata persistedKvMetadata) throws IOException {
return this.save(persistedKvMetadata.toBuilder().deleted(true).build());
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.runners.IndexingRepository;
import io.kestra.plugin.core.dashboard.data.Logs;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
@@ -12,7 +13,7 @@ import reactor.core.publisher.Flux;
import java.time.ZonedDateTime;
import java.util.List;
public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry>, QueryBuilderInterface<Logs.Fields> {
public interface LogRepositoryInterface extends IndexingRepository<LogEntry>, QueryBuilderInterface<Logs.Fields> {
/**
* Finds all the log entries for the given tenant, execution and min log-level.
* <p>

View File

@@ -3,6 +3,7 @@ package io.kestra.core.repositories;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.metrics.MetricAggregations;
import io.kestra.core.runners.IndexingRepository;
import io.kestra.plugin.core.dashboard.data.Metrics;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.data.model.Pageable;
@@ -12,7 +13,7 @@ import java.time.ZonedDateTime;
import java.util.List;
import java.util.function.Function;
public interface MetricRepositoryInterface extends SaveRepositoryInterface<MetricEntry>, QueryBuilderInterface<Metrics.Fields> {
public interface MetricRepositoryInterface extends IndexingRepository<MetricEntry>, QueryBuilderInterface<Metrics.Fields> {
ArrayListTotal<MetricEntry> findByExecutionId(String tenantId, String id, Pageable pageable);
ArrayListTotal<MetricEntry> findByExecutionIdAndTaskId(String tenantId, String executionId, String taskId, Pageable pageable);

View File

@@ -9,7 +9,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Optional;
public interface NamespaceFileMetadataRepositoryInterface extends SaveRepositoryInterface<NamespaceFileMetadata> {
public interface NamespaceFileMetadataRepositoryInterface {
Optional<NamespaceFileMetadata> findByPath(
String tenantId,
String namespace,
@@ -37,6 +37,8 @@ public interface NamespaceFileMetadataRepositoryInterface extends SaveRepository
return this.save(namespaceFileMetadata.toBuilder().deleted(true).build());
}
NamespaceFileMetadata save(NamespaceFileMetadata namespaceFileMetadata);
/**
* Purge (hard delete) a list of namespace files metadata. If no version is specified, all versions are purged.
* @param namespaceFilesMetadata the list of namespace files metadata to purge

View File

@@ -1,10 +0,0 @@
package io.kestra.core.repositories;
import java.util.List;
// FIXME rename it to something like IndexedRepository and only implement it for indexed entities
public interface SaveRepositoryInterface<T> {
T save(T item);
int saveBatch(List<T> items);
}

View File

@@ -0,0 +1,15 @@
package io.kestra.core.runners;
import java.util.List;
/**
* This interface exposes methods used by the {@link io.kestra.core.runners.Indexer}.
* Only repositories that are indexed should implement this interface.
*
* @param <T> the entity type
*/
public interface IndexingRepository<T> {
T save(T item);
int saveBatch(List<T> items);
}

19
indexer/build.gradle Normal file
View File

@@ -0,0 +1,19 @@
configurations {
implementation.extendsFrom(micronaut)
}
dependencies {
annotationProcessor project(':processor')
implementation project(":core")
// test
testAnnotationProcessor project(':processor')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':worker')
testImplementation project(':tests')
testImplementation project(':jdbc')
testImplementation project(':jdbc-h2')
testImplementation("io.micronaut.sql:micronaut-jooq")
}

View File

@@ -1,4 +1,4 @@
package io.kestra.core.runners;
package io.kestra.indexer;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.LogEntry;
@@ -8,7 +8,9 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.runners.IndexingRepository;
import io.kestra.core.runners.Indexer;
import io.kestra.core.runners.QueueIndexer;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.utils.IdUtils;
@@ -28,9 +30,8 @@ import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
// FIXME move that to a new indexer module
/**
* This class is responsible to batch-indexed asynchronously queue messages.<p>
* This class is responsible for batch-indexing asynchronously queue messages.<p>
* Some queue messages are indexed synchronously via the {@link QueueIndexer}.
*/
@SuppressWarnings("this-escape")
@@ -90,7 +91,7 @@ public class DefaultIndexer implements Indexer {
this.sendBatch(metricQueue, metricRepository);
}
protected <T> void sendBatch(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
protected <T> void sendBatch(QueueInterface<T> queueInterface, IndexingRepository<T> indexingRepository) {
this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
// first, log all deserialization issues
eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage()));
@@ -114,7 +115,7 @@ public class DefaultIndexer implements Indexer {
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT_DESCRIPTION, "type", itemClassName).increment(items.size());
this.metricRegistry.timer(MetricRegistry.METRIC_INDEXER_REQUEST_DURATION, MetricRegistry.METRIC_INDEXER_REQUEST_DURATION_DESCRIPTION, "type", itemClassName).record(() -> {
int saved = saveRepositoryInterface.saveBatch(items);
int saved = indexingRepository.saveBatch(items);
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT_DESCRIPTION, "type", itemClassName).increment(saved);
});
}

View File

@@ -15,5 +15,6 @@ dependencies {
testImplementation project(':jdbc').sourceSets.test.output
testImplementation project(':executor').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':indexer')
testImplementation project(':tests')
}

View File

@@ -17,6 +17,7 @@ dependencies {
testImplementation project(':executor').sourceSets.test.output
testImplementation project(':scheduler').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':indexer')
testImplementation project(':tests')
testImplementation("io.micronaut.validation:micronaut-validation") // MysqlServiceLivenessCoordinatorTest fail to init without that
}

View File

@@ -16,6 +16,7 @@ dependencies {
testImplementation project(':jdbc').sourceSets.test.output
testImplementation project(':executor').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':indexer')
testImplementation project(':tests')
testImplementation("io.micronaut.validation:micronaut-validation") // PostgresServiceLivenessCoordinatorTest fail to init without that
}

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.utils.ListUtils;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import org.jooq.*;
@@ -141,21 +140,4 @@ public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcCrudR
this.jdbcRepository.persist(kvMetadataToPersist, context, fields);
return kvMetadataToPersist;
}
@Override
public int saveBatch(List<PersistedKvMetadata> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
items.forEach(kvMetadata -> saveKvMetadata(context, kvMetadata));
return items.size();
});
}
}

View File

@@ -6,7 +6,6 @@ import io.kestra.core.models.TenantAndNamespace;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import org.jooq.*;
@@ -179,10 +178,4 @@ public abstract class AbstractJdbcNamespaceFileMetadataRepository extends Abstra
return nsFileMetadataToPersist;
});
}
@Override
public int saveBatch(List<NamespaceFileMetadata> items) {
// FIXME should not be needed as it is not indexed
return items.stream().map(it -> this.save(it)).toList().size();
}
}

View File

@@ -20,6 +20,7 @@ include 'webserver'
include 'executor'
include 'scheduler'
include 'worker'
include 'indexer'
include 'ui'
include 'model'
@@ -27,4 +28,4 @@ include 'processor'
include 'script'
include 'e2e-tests'
include 'jmh-benchmarks'
include 'jmh-benchmarks'

View File

@@ -37,6 +37,7 @@ dependencies {
testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage-local')
testImplementation project(':worker')
testImplementation project(':indexer')
testImplementation "org.wiremock:wiremock-jetty12"
testImplementation "org.awaitility:awaitility"
testImplementation "io.opentelemetry:opentelemetry-sdk-testing"