feat(system): purge empty service instances

Purge service instance in EMPTY state after a certain duration, 30 days by default, to avoid never ending groth on the service_instances table.

Fixes #8514
This commit is contained in:
Loïc Mathieu
2025-05-05 11:32:52 +02:00
parent 2b015f8d06
commit 7117ae60f5
9 changed files with 78 additions and 3 deletions

View File

@@ -202,6 +202,11 @@ kestra:
initialDelay: 1m
# The expected time between service heartbeats.
heartbeatInterval: 3s
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/usages

View File

@@ -85,7 +85,14 @@ public interface ServiceInstanceRepositoryInterface {
final Instant to);
/**
* Returns the function to be used for mapping column used to sort result.
* Purge all instances in the EMPTY state older than the until date.
*
* @return the number of purged instances
*/
int purgeEmptyInstances(Instant until);
/**
* Returns the function to be used for mapping column used to sort results.
*
* @return the mapping function.
*/

View File

@@ -16,6 +16,14 @@ flyway:
kestra:
server-type: STANDALONE
server:
liveness:
enabled: false
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
queue:
type: h2
repository:

View File

@@ -17,6 +17,14 @@ flyway:
kestra:
server-type: STANDALONE
server:
liveness:
enabled: false
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
queue:
type: mysql
repository:

View File

@@ -39,4 +39,9 @@ kestra:
allow-parameter-outputs: true
server:
liveness:
enabled: false
enabled: false
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d

View File

@@ -182,6 +182,16 @@ public abstract class AbstractJdbcServiceInstanceRepository extends AbstractJdbc
this.jdbcRepository.fetch(query);
}
@Override
public int purgeEmptyInstances(Instant until) {
return jdbcRepository.getDslContextWrapper().transactionResult(
configuration -> using(configuration).delete(table())
.where(STATE.eq(Service.ServiceState.EMPTY.name()))
.and(UPDATED_AT.lessOrEqual(until))
.execute()
);
}
public void transaction(final TransactionalRunnable runnable) {
this.jdbcRepository
.getDslContextWrapper()

View File

@@ -10,11 +10,14 @@ import io.kestra.core.server.ServiceType;
import io.kestra.core.server.WorkerTaskRestartStrategy;
import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -36,6 +39,7 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
private final AtomicReference<JdbcExecutor> executor = new AtomicReference<>();
private final AbstractJdbcServiceInstanceRepository serviceInstanceRepository;
private final Duration purgeRetention;
/**
* Creates a new {@link JdbcServiceLivenessCoordinator} instance.
@@ -46,9 +50,11 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
@Inject
public JdbcServiceLivenessCoordinator(final AbstractJdbcServiceInstanceRepository serviceInstanceRepository,
final ServiceRegistry serviceRegistry,
final ServerConfig serverConfig) {
final ServerConfig serverConfig,
@Value("${kestra.server.service.purge.retention}") final Duration purgeRetention) {
super(serviceInstanceRepository, serviceRegistry, serverConfig);
this.serviceInstanceRepository = serviceInstanceRepository;
this.purgeRetention = purgeRetention;
}
/**
@@ -142,6 +148,12 @@ public final class JdbcServiceLivenessCoordinator extends AbstractServiceLivenes
});
}
@Scheduled(initialDelay = "${kestra.server.service.purge.initial-delay}", fixedDelay = "${kestra.server.service.purge.fixed-delay}")
public void purgeEmptyInstances() {
int purged = serviceInstanceRepository.purgeEmptyInstances(Instant.now().minus(purgeRetention));
log.info("Purged {} service instances", purged);
}
synchronized void setExecutor(final JdbcExecutor executor) {
this.executor.set(executor);

View File

@@ -168,6 +168,21 @@ public abstract class AbstractJdbcServiceInstanceRepositoryTest {
Assertions.assertEquals(new ServiceStateTransition.Response(FAILED, instance), response);
}
@Test
void shouldPurgeServiceInstance() {
// Given
ServiceInstance instance = Fixtures.RunningServiceInstance;
repository.update(instance);
instance = Fixtures.EmptyServiceInstance;
repository.update(instance);
// When
int purged = repository.purgeEmptyInstances(Instant.now());
//Then
assertThat(purged).isEqualTo(1);
}
public static final class Fixtures {
public static List<ServiceInstance> all() {

View File

@@ -55,6 +55,11 @@ kestra:
- "/api/v1/executions/webhook/"
liveness:
enabled: false
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
queue:
type: h2
repository: