fix(kv): get value doesn't need metadata migration

Also purge expired kv in metadata migrate command
This commit is contained in:
brian.mulier
2025-10-30 15:01:58 +01:00
committed by brian-mulier-p
parent 07e90de835
commit 2b29a36850
4 changed files with 76 additions and 14 deletions

View File

@@ -6,6 +6,7 @@ import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.tenant.TenantService;
import jakarta.inject.Inject;
@@ -19,6 +20,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -57,11 +59,25 @@ public class MetadataMigrationService {
this.namespacesPerTenant().entrySet().stream()
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
return list.stream()
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
.filter(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isBefore(expirationDate)).orElse(true))
.map(kvEntry -> PersistedKvMetadata.from(namespaceForTenant.getKey(), kvEntry));
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
entriesByIsExpired.get(true).forEach(kvEntry -> {
try {
storageInterface.delete(
namespaceForTenant.getKey(),
namespaceForTenant.getValue(),
kvStore.storageUri(kvEntry.key())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return entriesByIsExpired.get(false).stream().map(kvEntry -> PersistedKvMetadata.from(namespaceForTenant.getKey(), kvEntry));
}))
.forEach(kvMetadataRepository::save);
}

View File

@@ -18,6 +18,7 @@ import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
@@ -26,6 +27,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
@@ -52,8 +54,14 @@ public class MetadataMigrationCommandTest {
String anotherDescription = "another description";
putOldKv(storage, anotherNamespace, anotherKey, anotherDescription, "anotherValue");
KvMetadataRepositoryInterface kvMetadataRepository = ctx.getBean(KvMetadataRepositoryInterface.class);
String tenantId = TenantService.MAIN_TENANT;
// Expired KV should not be migrated + should be purged from the storage
String expiredKey = "expiredKey";
putOldKv(storage, namespace, expiredKey, Instant.now().minus(Duration.ofMinutes(5)), "some expired description", "expiredValue");
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isTrue();
KvMetadataRepositoryInterface kvMetadataRepository = ctx.getBean(KvMetadataRepositoryInterface.class);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
String[] kvMetadataMigrationCommand = {
@@ -92,15 +100,26 @@ public class MetadataMigrationCommandTest {
Optional<KVValue> actualValue = kvStore.getValue(key);
assertThat(actualValue.isPresent()).isTrue();
assertThat(actualValue.get().value()).isEqualTo(value);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, expiredKey).isPresent()).isFalse();
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isFalse();
}
}
private static void putOldKv(StorageInterface storage, String namespace, String key, String description, String value) throws IOException {
URI kvStorageUri = URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace) + "/" + key + ".ion");
KVValueAndMetadata kvValueAndMetadata = new KVValueAndMetadata(new KVMetadata(description, Duration.ofMinutes(5)), value);
putOldKv(storage, namespace, key, Instant.now().plus(Duration.ofMinutes(5)), description, value);
}
private static void putOldKv(StorageInterface storage, String namespace, String key, Instant expirationDate, String description, String value) throws IOException {
URI kvStorageUri = getKvStorageUri(namespace, key);
KVValueAndMetadata kvValueAndMetadata = new KVValueAndMetadata(new KVMetadata(description, expirationDate), value);
storage.put(TenantService.MAIN_TENANT, namespace, kvStorageUri, new StorageObject(
kvValueAndMetadata.metadataAsMap(),
new ByteArrayInputStream(JacksonMapper.ofIon().writeValueAsBytes(kvValueAndMetadata.value()))
));
}
private static @NonNull URI getKvStorageUri(String namespace, String key) {
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace) + "/" + key + ".ion");
}
}

View File

@@ -113,19 +113,23 @@ public class InternalKVStore implements KVStore {
KVStore.validateKey(key);
Optional<PersistedKvMetadata> maybeMetadata = this.kvMetadataRepository.findByName(this.tenant, this.namespace, key);
if (maybeMetadata.isEmpty() || maybeMetadata.get().isDeleted()) {
return Optional.empty();
}
PersistedKvMetadata metadata = maybeMetadata.get();
if (Optional.ofNullable(metadata.getExpirationDate()).map(Instant.now()::isAfter).orElse(false)) {
this.delete(key);
throw new ResourceExpiredException("The requested value has expired");
int version = maybeMetadata.map(PersistedKvMetadata::getVersion).orElse(1);
if (maybeMetadata.isPresent()) {
PersistedKvMetadata metadata = maybeMetadata.get();
if (metadata.isDeleted()) {
return Optional.empty();
}
if (Optional.ofNullable(metadata.getExpirationDate()).map(Instant.now()::isAfter).orElse(false)) {
this.delete(key);
throw new ResourceExpiredException("The requested value has expired");
}
}
StorageObject withMetadata;
try {
withMetadata = this.storage.getWithMetadata(this.tenant, this.namespace, this.storageUri(key, metadata.getVersion()));
withMetadata = this.storage.getWithMetadata(this.tenant, this.namespace, this.storageUri(key, version));
} catch (FileNotFoundException e) {
return Optional.empty();
}

View File

@@ -10,11 +10,13 @@ import io.kestra.core.storages.kv.KVMetadata;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.storages.kv.KVValue;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
@@ -227,6 +229,27 @@ class InternalKVStoreTest {
Assertions.assertEquals(val.metadata().getExpirationDate().truncatedTo(ChronoUnit.MILLIS), result.get().metadata().getExpirationDate().truncatedTo(ChronoUnit.MILLIS));
}
@Test
void getShouldStillWorkWithoutMetadata() throws IOException, ResourceExpiredException {
// Given
InternalKVStore kv = kv();
String key = IdUtils.create();
URI kvStorageUri = URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(kv.namespace()) + "/" + key + ".ion");
String value = "someValue";
KVValueAndMetadata kvValueAndMetadata = new KVValueAndMetadata(new KVMetadata("some description", Instant.now().plus(Duration.ofMinutes(5))), value);
storageInterface.put(TenantService.MAIN_TENANT, kv.namespace(), kvStorageUri, new StorageObject(
kvValueAndMetadata.metadataAsMap(),
new ByteArrayInputStream(JacksonMapper.ofIon().writeValueAsBytes(kvValueAndMetadata.value()))
));
// When
Optional<KVValue> result = kv.getValue(key);
// Then
assertThat(result.isPresent()).isTrue();
assertThat(result.get().value()).isEqualTo(value);
}
@Test
void illegalKey() {
InternalKVStore kv = kv();