wip(storages): add non tenant dependant method to storage interface (#10637)

* wip(storages): add non tenant dependant method to storage interface

* feat(storages): #10636 add instance method to retrieve resources without the tenant id

* fix(stores): #4353 failing unit tests after now that tenant id can't be null

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
This commit is contained in:
Nicolas K.
2025-08-13 11:00:25 +02:00
committed by GitHub
parent e461e46a1c
commit cc5f73ae06
20 changed files with 492 additions and 222 deletions

View File

@@ -54,6 +54,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves an input stream of a instance resource for the given storage URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace of the object (may be null)
* @param uri the URI of the object to retrieve
* @return an InputStream to read the object's contents
* @throws IOException if the object cannot be read
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream getInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves a storage object along with its metadata.
*
@@ -91,6 +103,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Lists the attributes of all instance files and instance directories under the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI to list
* @return a list of file attributes
* @throws IOException if the listing fails
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> listInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Checks whether the given URI exists in the internal storage.
*
@@ -108,6 +132,23 @@ public interface StorageInterface extends AutoCloseable, Plugin {
}
}
/**
* Checks whether the given URI exists in the instance internal storage.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI to check
* @return true if the URI exists, false otherwise
*/
@SuppressWarnings("try")
default boolean existsInstanceResource(@Nullable String namespace, URI uri) {
try (InputStream ignored = getInstanceResource(namespace, uri)) {
return true;
} catch (IOException ieo) {
return false;
}
}
/**
* Retrieves the metadata attributes for the given URI.
*
@@ -120,6 +161,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves the metadata attributes for the given URI.
* n instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI of the object
* @return the file attributes
* @throws IOException if the attributes cannot be retrieved
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getInstanceAttributes(@Nullable String namespace, URI uri) throws IOException;
/**
* Stores data at the given URI.
*
@@ -148,34 +201,86 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class})
URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
/**
* Stores instance data at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the target URI
* @param data the input stream containing the data to store
* @return the URI of the stored object
* @throws IOException if storing fails
*/
@Retryable(includes = {IOException.class})
default URI putInstanceResource(@Nullable String namespace, URI uri, InputStream data) throws IOException {
return this.putInstanceResource(namespace, uri, new StorageObject(null, data));
}
/**
* Stores a instance storage object at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the target URI
* @param storageObject the storage object to store
* @return the URI of the stored object
* @throws IOException if storing fails
*/
@Retryable(includes = {IOException.class})
URI putInstanceResource(@Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
/**
* Deletes the object at the given URI.
*
* @param tenantId the tenant identifier (may be null for global deletion)
* @param tenantId the tenant identifier
* @param namespace the namespace (may be null)
* @param uri the URI of the object to delete
* @return true if deletion was successful
* @throws IOException if deletion fails
*/
@Retryable(includes = {IOException.class})
boolean delete(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Deletes the instance object at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI of the object to delete
* @return true if deletion was successful
* @throws IOException if deletion fails
*/
@Retryable(includes = {IOException.class})
boolean deleteInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Creates a new directory at the given URI.
*
* @param tenantId the tenant identifier (optional)
* @param tenantId the tenant identifier
* @param namespace the namespace (optional)
* @param uri the URI of the directory to create
* @return the URI of the created directory
* @throws IOException if creation fails
*/
@Retryable(includes = {IOException.class})
URI createDirectory(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Creates a new instance directory at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace
* @param uri the URI of the directory to create
* @return the URI of the created directory
* @throws IOException if creation fails
*/
@Retryable(includes = {IOException.class})
URI createInstanceDirectory(String namespace, URI uri) throws IOException;
/**
* Moves an object from one URI to another.
*
* @param tenantId the tenant identifier (optional)
* @param tenantId the tenant identifier
* @param namespace the namespace (optional)
* @param from the source URI
* @param to the destination URI
@@ -183,7 +288,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @throws IOException if moving fails
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
URI move(@Nullable String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
/**
* Deletes all objects that match the given URI prefix.
@@ -228,11 +333,11 @@ public interface StorageInterface extends AutoCloseable, Plugin {
/**
* Builds the internal storage path based on tenant ID and URI.
*
* @param tenantId the tenant identifier (maybe null)
* @param tenantId the tenant identifier
* @param uri the URI of the object
* @return a normalized internal path
*/
default String getPath(@Nullable String tenantId, URI uri) {
default String getPath(String tenantId, URI uri) {
if (uri == null) {
uri = URI.create("/");
}
@@ -240,9 +345,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
parentTraversalGuard(uri);
String path = uri.getPath();
if (tenantId != null) {
path = tenantId + (path.startsWith("/") ? path : "/" + path);
}
return path;
}

View File

@@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test;
import java.net.URI;
import java.util.Map;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -37,9 +38,9 @@ class VariablesTest {
@Test
@SuppressWarnings("unchecked")
void inStorage() {
var storageContext = StorageContext.forTask(null, "namespace", "flow", "execution", "task", "taskRun", null);
var storageContext = StorageContext.forTask(MAIN_TENANT, "namespace", "flow", "execution", "task", "taskRun", null);
var internalStorage = new InternalStorage(storageContext, storageInterface);
Variables.StorageContext variablesContext = new Variables.StorageContext(null, "namespace");
Variables.StorageContext variablesContext = new Variables.StorageContext(MAIN_TENANT, "namespace");
// simple
Map<String, Object> outputs = Map.of("key", "value");

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.junit.annotations.KestraTest;
@@ -26,6 +25,7 @@ import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Objects;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -39,9 +39,6 @@ public abstract class AbstractExecutionServiceTest {
@Inject
LogRepositoryInterface logRepository;
@Inject
StorageInterface storageInterface;
@Inject
RunContextFactory runContextFactory;
@@ -56,12 +53,14 @@ public abstract class AbstractExecutionServiceTest {
Flow flow = Flow.builder()
.namespace("io.kestra.test")
.id("abc")
.tenantId(MAIN_TENANT)
.revision(1)
.build();
Execution execution = Execution
.builder()
.id(IdUtils.create())
.tenantId(MAIN_TENANT)
.state(state)
.flowId(flow.getId())
.namespace(flow.getNamespace())
@@ -74,6 +73,7 @@ public abstract class AbstractExecutionServiceTest {
.builder()
.namespace(flow.getNamespace())
.id(IdUtils.create())
.tenantId(MAIN_TENANT)
.executionId(execution.getId())
.flowId(flow.getId())
.taskId(task.getId())
@@ -94,6 +94,7 @@ public abstract class AbstractExecutionServiceTest {
for (int i = 0; i < 10; i++) {
logRepository.save(LogEntry.builder()
.executionId(execution.getId())
.tenantId(MAIN_TENANT)
.timestamp(Instant.now())
.message("Message " + i)
.flowId(flow.getId())
@@ -108,7 +109,7 @@ public abstract class AbstractExecutionServiceTest {
true,
true,
true,
null,
MAIN_TENANT,
flow.getNamespace(),
flow.getId(),
null,
@@ -126,7 +127,7 @@ public abstract class AbstractExecutionServiceTest {
true,
true,
true,
null,
MAIN_TENANT,
flow.getNamespace(),
flow.getId(),
null,

View File

@@ -1,5 +1,7 @@
package io.kestra.core.runners;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.DependsOn;
@@ -21,7 +23,6 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -200,7 +201,7 @@ class FlowInputOutputTest {
}
@Test
void shouldNotUploadFileInputAfterValidation() throws IOException {
void shouldNotUploadFileInputAfterValidation() {
// Given
FileInput input = FileInput
.builder()
@@ -215,7 +216,7 @@ class FlowInputOutputTest {
// Then
Assertions.assertNull(values.getFirst().exception());
Assertions.assertFalse(storageInterface.exists(null, null, URI.create(values.getFirst().value().toString())));
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
}
@Test

View File

@@ -0,0 +1,39 @@
package io.kestra.core.runners.pebble.functions;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.kestra.core.utils.IdUtils;
import java.util.Map;
public class FunctionTestUtils {
public static final String NAMESPACE = "io.kestra.tests";
public static Map<String, Object> getVariables() {
return getVariables(NAMESPACE);
}
public static Map<String, Object> getVariables(String namespace) {
return Map.of(
"flow", Map.of(
"id", "kv",
"tenantId", MAIN_TENANT,
"namespace", namespace)
);
}
public static Map<String, Object> getVariablesWithExecution(String namespace) {
return getVariablesWithExecution(namespace, IdUtils.create());
}
public static Map<String, Object> getVariablesWithExecution(String namespace, String executionId) {
return Map.of(
"flow", Map.of(
"id", "flow",
"namespace", namespace,
"tenantId", MAIN_TENANT),
"execution", Map.of("id", executionId)
);
}
}

View File

@@ -1,39 +1,24 @@
package io.kestra.core.runners.pebble.functions;
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.getVariables;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return;
import io.pebbletemplates.pebble.error.PebbleException;
import jakarta.inject.Inject;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
@KestraTest(startRunner = true)
public class KvFunctionTest {
@@ -46,20 +31,16 @@ public class KvFunctionTest {
@BeforeEach
void reset() throws IOException {
storageInterface.deleteByPrefix(null, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
storageInterface.deleteByPrefix(MAIN_TENANT, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
}
@Test
void shouldGetValueFromKVGivenExistingKey() throws IllegalVariableEvaluationException, IOException {
// Given
KVStore kv = new InternalKVStore(null, "io.kestra.tests", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "io.kestra.tests", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
@@ -71,17 +52,13 @@ public class KvFunctionTest {
@Test
void shouldGetValueFromKVGivenExistingKeyWithInheritance() throws IllegalVariableEvaluationException, IOException {
// Given
KVStore kv = new InternalKVStore(null, "my.company", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "my.company", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
KVStore firstKv = new InternalKVStore(null, "my", storageInterface);
KVStore firstKv = new InternalKVStore(MAIN_TENANT, "my", storageInterface);
firstKv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "firstValue")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "my.company.team")
);
Map<String, Object> variables = getVariables("my.company.team");
// When
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
@@ -93,14 +70,10 @@ public class KvFunctionTest {
@Test
void shouldNotGetValueFromKVWithGivenNamespaceAndInheritance() throws IOException {
// Given
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "my.company.team")
);
Map<String, Object> variables = getVariables("my.company.team");
// When
Assertions.assertThrows(IllegalVariableEvaluationException.class, () ->
@@ -110,14 +83,10 @@ public class KvFunctionTest {
@Test
void shouldGetValueFromKVGivenExistingAndNamespace() throws IllegalVariableEvaluationException, IOException {
// Given
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
String rendered = variableRenderer.render("{{ kv('my-key', namespace='kv') }}", variables);
@@ -129,11 +98,7 @@ public class KvFunctionTest {
@Test
void shouldGetEmptyGivenNonExistingKeyAndErrorOnMissingFalse() throws IllegalVariableEvaluationException {
// Given
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
String rendered = variableRenderer.render("{{ kv('my-key', errorOnMissing=false) }}", variables);
@@ -145,11 +110,7 @@ public class KvFunctionTest {
@Test
void shouldFailGivenNonExistingKeyAndErrorOnMissingTrue() {
// Given
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
@@ -163,11 +124,7 @@ public class KvFunctionTest {
@Test
void shouldFailGivenNonExistingKeyUsingDefaults() {
// Given
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
variableRenderer.render("{{ kv('my-key') }}", variables);
@@ -176,4 +133,5 @@ public class KvFunctionTest {
// Then
assertThat(exception.getMessage()).isEqualTo("io.pebbletemplates.pebble.error.PebbleException: The key 'my-key' does not exist in the namespace 'io.kestra.tests'. ({{ kv('my-key') }}:1)");
}
}

View File

@@ -20,6 +20,9 @@ import java.net.URI;
import java.nio.file.Files;
import java.util.Map;
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.NAMESPACE;
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.getVariables;
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.getVariablesWithExecution;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,48 +38,39 @@ class ReadFileFunctionTest {
@Test
void readNamespaceFile() throws IllegalVariableEvaluationException, IOException {
String namespace = "io.kestra.tests";
String filePath = "file.txt";
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
storageInterface.createDirectory(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
storageInterface.put(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
String render = variableRenderer.render("{{ render(read('" + filePath + "')) }}", Map.of("flow", Map.of("namespace", namespace, "tenantId", MAIN_TENANT)));
assertThat(render).isEqualTo("Hello from " + namespace);
String render = variableRenderer.render("{{ render(read('" + filePath + "')) }}", getVariables());
assertThat(render).isEqualTo("Hello from " + NAMESPACE);
}
@Test
void readNamespaceFileFromURI() throws IllegalVariableEvaluationException, IOException {
String namespace = "io.kestra.tests";
String filePath = "file.txt";
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
storageInterface.createDirectory(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
storageInterface.put(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "flow",
"namespace", namespace,
"tenantId", MAIN_TENANT),
"execution", Map.of("id", IdUtils.create())
);
Map<String, Object> variables = getVariablesWithExecution(NAMESPACE);
String render = variableRenderer.render("{{ render(read(fileURI('" + filePath + "'))) }}", variables);
assertThat(render).isEqualTo("Hello from " + namespace);
assertThat(render).isEqualTo("Hello from " + NAMESPACE);
}
@Test
void readNamespaceFileWithNamespace() throws IllegalVariableEvaluationException, IOException {
String namespace = "io.kestra.tests";
String filePath = "file.txt";
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello but not from flow.namespace".getBytes()));
storageInterface.createDirectory(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
storageInterface.put(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + filePath), new ByteArrayInputStream("Hello but not from flow.namespace".getBytes()));
String render = variableRenderer.render("{{ read('" + filePath + "', namespace='" + namespace + "') }}", Map.of("flow", Map.of("namespace", "flow.namespace", "tenantId", MAIN_TENANT)));
String render = variableRenderer.render("{{ read('" + filePath + "', namespace='" + NAMESPACE + "') }}", getVariables("different.namespace"));
assertThat(render).isEqualTo("Hello but not from flow.namespace");
}
@Test
void readUnknownNamespaceFile() {
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ read('unknown.txt') }}", Map.of("flow", Map.of("namespace", "io.kestra.tests"))));
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ read('unknown.txt') }}", getVariables()));
assertThat(illegalVariableEvaluationException.getCause().getCause().getClass()).isEqualTo(FileNotFoundException.class);
}
@@ -90,13 +84,7 @@ class ReadFileFunctionTest {
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
// test for an authorized execution
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", flowId,
"namespace", namespace,
"tenantId", MAIN_TENANT),
"execution", Map.of("id", executionId)
);
Map<String, Object> variables = getVariablesWithExecution(namespace, executionId);
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
assertThat(render).isEqualTo("Hello from a task output");
@@ -169,13 +157,7 @@ class ReadFileFunctionTest {
URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "notme",
"namespace", "notme",
"tenantId", MAIN_TENANT),
"execution", Map.of("id", "notme")
);
Map<String, Object> variables = getVariablesWithExecution("notme", "notme");
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
assertThat(render).isEqualTo("Hello from a task output");
@@ -191,13 +173,7 @@ class ReadFileFunctionTest {
@Test
void shouldFailProcessingUnsupportedScheme() {
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "notme",
"namespace", "notme",
"tenantId", MAIN_TENANT),
"execution", Map.of("id", "notme")
);
Map<String, Object> variables = getVariablesWithExecution("notme", "notme");
assertThrows(IllegalArgumentException.class, () -> variableRenderer.render("{{ read('unsupported://path-to/file.txt') }}", variables));
}

View File

@@ -1,7 +1,8 @@
package io.kestra.core.services;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.*;
import io.micronaut.test.annotation.MockBean;
@@ -26,25 +27,25 @@ class KVStoreServiceTest {
@Test
void shouldGetKVStoreForExistingNamespaceGivenFromNull() {
Assertions.assertNotNull(storeService.get(null, TEST_EXISTING_NAMESPACE, null));
Assertions.assertNotNull(storeService.get(MAIN_TENANT, TEST_EXISTING_NAMESPACE, null));
}
@Test
void shouldThrowExceptionWhenAccessingKVStoreForNonExistingNamespace() {
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(null, "io.kestra.unittest.unknown", null));
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(MAIN_TENANT, "io.kestra.unittest.unknown", null));
Assertions.assertTrue(exception.getMessage().contains("namespace 'io.kestra.unittest.unknown' does not exist"));
}
@Test
void shouldGetKVStoreForAnyNamespaceWhenAccessingFromChildNamespace() {
Assertions.assertNotNull(storeService.get(null, "io.kestra", TEST_EXISTING_NAMESPACE));
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "io.kestra", TEST_EXISTING_NAMESPACE));
}
@Test
void shouldGetKVStoreFromNonExistingNamespaceWithAKV() throws IOException {
KVStore kvStore = new InternalKVStore(null, "system", storageInterface);
KVStore kvStore = new InternalKVStore(MAIN_TENANT, "system", storageInterface);
kvStore.put("key", new KVValueAndMetadata(new KVMetadata("myDescription", Duration.ofHours(1)), "value"));
Assertions.assertNotNull(storeService.get(null, "system", null));
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "system", null));
}
@MockBean(NamespaceService.class)

View File

@@ -27,6 +27,7 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
@@ -91,7 +92,7 @@ class InternalKVStoreTest {
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(description, Duration.ofMinutes(5)), complexValue));
// Then
StorageObject withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
StorageObject withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
String valueFile = new String(withMetadata.inputStream().readAllBytes());
Instant expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(4))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(6)))).isTrue();
@@ -102,7 +103,7 @@ class InternalKVStoreTest {
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(null, Duration.ofMinutes(10)), "some-value"));
// Then
withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
valueFile = new String(withMetadata.inputStream().readAllBytes());
expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(9))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(11)))).isTrue();
@@ -176,6 +177,6 @@ class InternalKVStoreTest {
private InternalKVStore kv() {
final String namespaceId = "io.kestra." + IdUtils.create();
return new InternalKVStore(null, namespaceId, storageInterface);
return new InternalKVStore(MAIN_TENANT, namespaceId, storageInterface);
}
}

View File

@@ -17,8 +17,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.is;
class InternalNamespaceTest {
@@ -38,7 +38,7 @@ class InternalNamespaceTest {
void shouldGetAllNamespaceFiles() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
// When
namespace.putFile(Path.of("/sub/dir/file1.txt"), new ByteArrayInputStream("1".getBytes()));
@@ -56,7 +56,7 @@ class InternalNamespaceTest {
void shouldPutFileGivenNoTenant() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
// When
NamespaceFile namespaceFile = namespace.putFile(Path.of("/sub/dir/file.txt"), new ByteArrayInputStream("1".getBytes()));
@@ -73,7 +73,7 @@ class InternalNamespaceTest {
void shouldSucceedPutFileGivenExistingFileForConflictOverwrite() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
@@ -92,7 +92,7 @@ class InternalNamespaceTest {
void shouldFailPutFileGivenExistingFileForError() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
@@ -109,7 +109,7 @@ class InternalNamespaceTest {
void shouldIgnorePutFileGivenExistingFileForSkip() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
@@ -128,7 +128,7 @@ class InternalNamespaceTest {
void shouldFindAllMatchingGivenNoTenant() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
// When
namespace.putFile(Path.of("/a/b/c/1.sql"), new ByteArrayInputStream("1".getBytes()));
@@ -171,7 +171,7 @@ class InternalNamespaceTest {
void shouldReturnNoNamespaceFileForEmptyNamespace() throws IOException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
List<NamespaceFile> namespaceFiles = namespace.findAllFilesMatching((unused) -> true);
assertThat(namespaceFiles.size()).isZero();
}

View File

@@ -12,8 +12,8 @@ import org.junit.jupiter.api.Test;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -33,6 +33,7 @@ class PurgeExecutionsTest {
.id(IdUtils.create())
.namespace(namespace)
.flowId(flowId)
.tenantId(MAIN_TENANT)
.state(new State().withState(State.Type.SUCCESS))
.build();
executionRepository.save(execution);
@@ -42,7 +43,7 @@ class PurgeExecutionsTest {
.namespace(Property.ofValue(namespace))
.endDate(Property.ofValue(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
.build();
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
var runContext = runContextFactory.of(flowId, namespace);
var output = purge.run(runContext);
assertThat(output.getExecutionsCount()).isEqualTo(1);
@@ -58,6 +59,7 @@ class PurgeExecutionsTest {
.namespace(namespace)
.flowId(flowId)
.id(IdUtils.create())
.tenantId(MAIN_TENANT)
.state(new State().withState(State.Type.SUCCESS))
.build();
executionRepository.save(execution);
@@ -68,7 +70,7 @@ class PurgeExecutionsTest {
.flowId(Property.ofValue(flowId))
.endDate(Property.ofValue(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
.build();
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", namespace, "id", flowId)));
var runContext = runContextFactory.of(flowId, namespace);
var output = purge.run(runContext);
assertThat(output.getExecutionsCount()).isEqualTo(1);

View File

@@ -1,9 +1,9 @@
package io.kestra.plugin.core.kv;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.IdUtils;
@@ -21,19 +21,16 @@ class DeleteTest {
static final String TEST_KV_KEY = "test-key";
@Inject
RunContextFactory runContextFactory;
TestRunContextFactory runContextFactory;
@Test
void shouldOutputTrueGivenExistingKey() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.of(namespaceId, Map.of("inputs", Map.of(
"key", TEST_KV_KEY,
"namespace", namespaceId
)
));
)));
Delete delete = Delete.builder()
.id(Delete.class.getSimpleName())
@@ -56,13 +53,10 @@ class DeleteTest {
void shouldOutputFalseGivenNonExistingKey() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.of(namespaceId, Map.of("inputs", Map.of(
"key", TEST_KV_KEY,
"namespace", namespaceId
)
));
)));
Delete delete = Delete.builder()
.id(Delete.class.getSimpleName())

View File

@@ -1,9 +1,9 @@
package io.kestra.plugin.core.kv;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.IdUtils;
@@ -19,15 +19,13 @@ class GetKeysTest {
static final String TEST_KEY_PREFIX_TEST = "test";
@Inject
RunContextFactory runContextFactory;
TestRunContextFactory runContextFactory;
@Test
void shouldGetAllKeys() throws Exception {
// Given
String namespace = IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespace)
));
RunContext runContext = this.runContextFactory.of(namespace);
GetKeys getKeys = GetKeys.builder()
.id(GetKeys.class.getSimpleName())
@@ -50,12 +48,8 @@ class GetKeysTest {
void shouldGetKeysGivenMatchingPrefix() throws Exception {
// Given
String namespace = IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespace),
"inputs", Map.of(
"prefix", TEST_KEY_PREFIX_TEST
)
));
RunContext runContext = this.runContextFactory.of(namespace,
Map.of("inputs", Map.of("prefix", TEST_KEY_PREFIX_TEST)));
GetKeys getKeys = GetKeys.builder()
.id(GetKeys.class.getSimpleName())
@@ -79,12 +73,8 @@ class GetKeysTest {
void shouldGetNoKeysGivenEmptyKeyStore() throws Exception {
// Given
String namespace = IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespace),
"inputs", Map.of(
"prefix", TEST_KEY_PREFIX_TEST
)
));
RunContext runContext = this.runContextFactory.of(namespace,
Map.of("inputs", Map.of("prefix", TEST_KEY_PREFIX_TEST)));
GetKeys getKeys = GetKeys.builder()
.id(GetKeys.class.getSimpleName())

View File

@@ -1,9 +1,9 @@
package io.kestra.plugin.core.kv;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.IdUtils;
@@ -24,15 +24,13 @@ class GetTest {
static final String TEST_KV_KEY = "test-key";
@Inject
RunContextFactory runContextFactory;
TestRunContextFactory runContextFactory;
@Test
void shouldGetGivenExistingKey() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
"inputs", Map.of(
RunContext runContext = this.runContextFactory.of(namespaceId, Map.of("inputs", Map.of(
"key", TEST_KV_KEY,
"namespace", namespaceId
)
@@ -62,8 +60,7 @@ class GetTest {
void shouldGetGivenExistingKeyWithInheritance() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
RunContext runContext = this.runContextFactory.of(namespaceId, Map.of(
"inputs", Map.of(
"key", TEST_KV_KEY
)
@@ -92,8 +89,7 @@ class GetTest {
void shouldGetGivenNonExistingKey() throws Exception {
// Given
String namespaceId = "io.kestra." + IdUtils.create();
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", namespaceId),
RunContext runContext = this.runContextFactory.of(namespaceId, Map.of(
"inputs", Map.of(
"key", TEST_KV_KEY,
"namespace", namespaceId

View File

@@ -1,10 +1,10 @@
package io.kestra.plugin.core.kv;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.kv.KVType;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVStoreException;
@@ -31,7 +31,7 @@ class SetTest {
StorageInterface storageInterface;
@Inject
RunContextFactory runContextFactory;
TestRunContextFactory runContextFactory;
@Test
void shouldSetKVGivenNoNamespace() throws Exception {
@@ -65,8 +65,7 @@ class SetTest {
@Test
void shouldSetKVGivenSameNamespace() throws Exception {
// Given
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", "io.kestra.test"),
RunContext runContext = this.runContextFactory.of("io.kestra.test", Map.of(
"inputs", Map.of(
"key", TEST_KEY,
"value", "test-value"
@@ -93,8 +92,7 @@ class SetTest {
@Test
void shouldSetKVGivenChildNamespace() throws Exception {
// Given
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", "io.kestra.test"),
RunContext runContext = this.runContextFactory.of("io.kestra.test", Map.of(
"inputs", Map.of(
"key", TEST_KEY,
"value", "test-value"
@@ -120,8 +118,7 @@ class SetTest {
@Test
void shouldFailGivenNonExistingNamespace() {
// Given
RunContext runContext = this.runContextFactory.of(Map.of(
"flow", Map.of("namespace", "io.kestra.test"),
RunContext runContext = this.runContextFactory.of("io.kestra.test", Map.of(
"inputs", Map.of(
"key", TEST_KEY,
"value", "test-value"

View File

@@ -3,12 +3,12 @@ package io.kestra.plugin.core.storage;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.runners.RunContextFactory;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -22,6 +22,7 @@ class PurgeCurrentExecutionFilesTest {
var flow = Flow.builder()
.namespace("namespace")
.id("flowId")
.tenantId(MAIN_TENANT)
.build();
var runContext = runContextFactory.of(flow, Map.of(
"execution", Map.of("id", "executionId"),

View File

@@ -50,8 +50,16 @@ public class LocalStorage implements StorageInterface {
}
protected Path getLocalPath(String tenantId, URI uri) {
Path basePath = tenantId == null ? this.basePath.toAbsolutePath()
: Paths.get(this.basePath.toAbsolutePath().toString(), tenantId);
Path basePath = Paths.get(this.basePath.toAbsolutePath().toString(), tenantId);
return getPath(uri, basePath);
}
protected Path getInstancePath(URI uri) {
Path basePath = this.basePath.toAbsolutePath();
return getPath(uri, basePath);
}
protected Path getPath(URI uri, Path basePath) {
if(uri == null) {
return basePath;
}
@@ -62,10 +70,12 @@ public class LocalStorage implements StorageInterface {
@Override
public InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException {
return new BufferedInputStream(new FileInputStream(getLocalPath(tenantId, uri)
.toAbsolutePath()
.toString())
);
return new BufferedInputStream(new FileInputStream(getLocalPath(tenantId, uri).toAbsolutePath().toString()));
}
@Override
public InputStream getInstanceResource(@Nullable String namespace, URI uri) throws IOException {
return new BufferedInputStream(new FileInputStream(getInstancePath(uri).toAbsolutePath().toString()));
}
@Override
@@ -139,9 +149,38 @@ public class LocalStorage implements StorageInterface {
}
}
@Override
public List<FileAttributes> listInstanceResource(@Nullable String namespace, URI uri) throws IOException{
try (Stream<Path> stream = Files.list(getInstancePath(uri))) {
return stream
.filter(path -> !path.getFileName().toString().endsWith(".metadata"))
.map(throwFunction(file -> {
URI relative = URI.create(
getInstancePath(null).relativize(
Path.of(file.toUri())
).toString().replace("\\", "/")
);
return getInstanceAttributes(namespace, relative);
}))
.toList();
} catch (NoSuchFileException e) {
throw new FileNotFoundException(e.getMessage());
}
}
@Override
public URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException {
File file = getLocalPath(tenantId, uri).toFile();
return putFile(uri, storageObject, file);
}
@Override
public URI putInstanceResource(@Nullable String namespace, URI uri, StorageObject storageObject) throws IOException {
File file = getInstancePath(uri).toFile();
return putFile(uri, storageObject, file);
}
private static URI putFile(URI uri, StorageObject storageObject, File file) throws IOException {
File parent = file.getParentFile();
if (!parent.exists()) {
parent.mkdirs();
@@ -167,7 +206,15 @@ public class LocalStorage implements StorageInterface {
@Override
public FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException {
Path path = getLocalPath(tenantId, uri);
return getAttributeFromPath(getLocalPath(tenantId, uri));
}
@Override
public FileAttributes getInstanceAttributes(@Nullable String namespace, URI uri) throws IOException{
return getAttributeFromPath(getInstancePath(uri));
}
private static LocalFileAttributes getAttributeFromPath(Path path) throws IOException {
try {
return LocalFileAttributes.builder()
.filePath(path)
@@ -180,10 +227,19 @@ public class LocalStorage implements StorageInterface {
@Override
public URI createDirectory(String tenantId, @Nullable String namespace, URI uri) {
return createDirectoryFromPath(getLocalPath(tenantId, uri), uri);
}
@Override
public URI createInstanceDirectory(String namespace, URI uri) {
return createDirectoryFromPath(getInstancePath(uri), uri);
}
private static URI createDirectoryFromPath(Path path, URI uri) {
if (uri == null || uri.getPath().isEmpty()) {
throw new IllegalArgumentException("Unable to create a directory with empty url.");
}
File file = getLocalPath(tenantId, uri).toFile();
File file = path.toFile();
if (!file.exists() && !file.mkdirs()) {
throw new RuntimeException("Cannot create directory: " + file.getAbsolutePath());
}
@@ -205,7 +261,15 @@ public class LocalStorage implements StorageInterface {
@Override
public boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException {
Path path = getLocalPath(tenantId, uri);
return deleteFromPath(getLocalPath(tenantId, uri));
}
@Override
public boolean deleteInstanceResource(@Nullable String namespace, URI uri) throws IOException{
return deleteFromPath(getInstancePath(uri));
}
private static boolean deleteFromPath(Path path) throws IOException {
File file = path.toFile();
if (file.isDirectory()) {
@@ -235,9 +299,7 @@ public class LocalStorage implements StorageInterface {
}
private URI getKestraUri(String tenantId, Path path) {
Path prefix = (tenantId == null) ?
basePath.toAbsolutePath():
basePath.toAbsolutePath().resolve(tenantId);
Path prefix = basePath.toAbsolutePath().resolve(tenantId);
subPathParentGuard(path, prefix);
return URI.create("kestra:///" + prefix.relativize(path).toString().replace("\\", "/"));
}

View File

@@ -13,6 +13,24 @@ public class TestRunContextFactory extends RunContextFactory {
@VisibleForTesting
public RunContext of() {
return of(Map.of("flow", Map.of("id", "id", "namespace", "namespace", "tenantId", MAIN_TENANT)));
return of("id", "namespace");
}
@VisibleForTesting
public RunContext of(String namespace) {
return of("id", namespace);
}
@VisibleForTesting
public RunContext of(String id, String namespace) {
return of(Map.of("flow", Map.of("id", id, "namespace", namespace, "tenantId", MAIN_TENANT)));
}
@VisibleForTesting
public RunContext of(String namespace, Map<String, Object> inputs) {
Map<String, Object> variables = new java.util.HashMap<>(Map.of("flow",
Map.of("id", "id", "namespace", namespace, "tenantId", MAIN_TENANT)));
variables.putAll(inputs);
return of(variables);
}
}

View File

@@ -22,6 +22,7 @@ import java.util.Map;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.*;
@@ -120,6 +121,20 @@ public abstract class StorageTestSuite {
}
//endregion
@Test
void getInstanceResource() throws Exception {
String prefix = IdUtils.create();
putInstanceFile("/" + prefix + "/storage/get.yml");
putInstanceFile("/" + prefix + "/storage/level2/2.yml");
URI item = new URI("/" + prefix + "/storage/get.yml");
InputStream get = storageInterface.getInstanceResource(prefix, item);
assertThat(CharStreams.toString(new InputStreamReader(get))).isEqualTo(CONTENT_STRING);
assertTrue(storageInterface.existsInstanceResource(prefix, item));
}
@Test
void filesByPrefix() throws IOException {
var namespaceName ="filesByPrefix_test_namespace";
@@ -267,6 +282,27 @@ public abstract class StorageTestSuite {
}
//endregion
@Test
void listInstanceResouces() throws Exception {
String prefix = IdUtils.create();
List<String> path = Arrays.asList(
"/" + prefix + "/storage/root.yml",
"/" + prefix + "/storage/level1/1.yml",
"/" + prefix + "/storage/level1/level2/1.yml",
"/" + prefix + "/storage/another/1.yml"
);
path.forEach(throwConsumer(s -> putInstanceFile(s, Map.of("someMetadata", "someValue"))));
List<FileAttributes> list = storageInterface.listInstanceResource(prefix, null);
assertThat(list.stream().map(FileAttributes::getFileName).toList()).contains(prefix);
list = storageInterface.listInstanceResource(prefix, new URI("/" + prefix + "/storage"));
assertThat(list.stream().map(FileAttributes::getFileName).toList()).containsExactlyInAnyOrder("root.yml", "level1", "another");
assertThat(list.stream().filter(f -> f.getFileName().equals("root.yml")).findFirst().get().getMetadata()).containsEntry("someMetadata", "someValue");
}
//region test EXISTS
@Test
void exists() throws Exception {
@@ -329,6 +365,15 @@ public abstract class StorageTestSuite {
putFile(tenantId, "/" + prefix + "/storage/get.yml");
assertTrue(storageInterface.exists(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/get.yml")));
}
@Test
void existsInstanceResource() throws Exception {
String prefix = IdUtils.create();
putInstanceFile("/" + prefix + "/storage/put.yml");
assertThat(storageInterface.existsInstanceResource(prefix, new URI("/" + prefix + "/storage/put.yml"))).isTrue();
assertThat(storageInterface.existsInstanceResource(prefix, new URI("/" + prefix + "/storage/notfound.yml"))).isFalse();
}
//endregion
//region test SIZE
@@ -498,25 +543,10 @@ public abstract class StorageTestSuite {
path.forEach(throwConsumer(s -> this.putFile(tenantId, s)));
FileAttributes attr = storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/root.yml"));
assertThat(attr.getFileName()).isEqualTo("root.yml");
assertThat(attr.getType()).isEqualTo(FileAttributes.FileType.File);
assertThat(attr.getSize()).isEqualTo((long) CONTENT_STRING.length());
Instant lastModifiedInstant = Instant.ofEpochMilli(attr.getLastModifiedTime());
assertThat(lastModifiedInstant.isAfter(Instant.now().minus(Duration.ofMinutes(1)))).isTrue();
assertThat(lastModifiedInstant.isBefore(Instant.now())).isTrue();
Instant creationInstant = Instant.ofEpochMilli(attr.getCreationTime());
assertThat(creationInstant.isAfter(Instant.now().minus(Duration.ofMinutes(1)))).isTrue();
assertThat(creationInstant.isBefore(Instant.now())).isTrue();
compareFileAttribute(attr);
attr = storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/level1"));
assertThat(attr.getFileName()).isEqualTo("level1");
assertThat(attr.getType()).isEqualTo(FileAttributes.FileType.Directory);
lastModifiedInstant = Instant.ofEpochMilli(attr.getLastModifiedTime());
assertThat(lastModifiedInstant.isAfter(Instant.now().minus(Duration.ofMinutes(1)))).isTrue();
assertThat(lastModifiedInstant.isBefore(Instant.now())).isTrue();
creationInstant = Instant.ofEpochMilli(attr.getCreationTime());
assertThat(creationInstant.isAfter(Instant.now().minus(Duration.ofMinutes(1)))).isTrue();
assertThat(creationInstant.isBefore(Instant.now())).isTrue();
compareDirectory(attr);
}
@Test
@@ -580,6 +610,42 @@ public abstract class StorageTestSuite {
FileAttributes attr = storageInterface.getAttributes(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/get.yml"));
assertThat(attr.getFileName()).isEqualTo("get.yml");
}
@Test
void getInstanceAttributes() throws Exception {
String prefix = IdUtils.create();
List<String> path = Arrays.asList(
"/" + prefix + "/storage/root.yml",
"/" + prefix + "/storage/level1/1.yml"
);
path.forEach(throwConsumer(this::putInstanceFile));
FileAttributes attr = storageInterface.getInstanceAttributes(prefix, new URI("/" + prefix + "/storage/root.yml"));
compareFileAttribute(attr);
attr = storageInterface.getInstanceAttributes(prefix, new URI("/" + prefix + "/storage/level1"));
compareDirectory(attr);
}
private static void compareDirectory(FileAttributes attr) {
assertThat(attr.getFileName()).isEqualTo("level1");
assertThat(attr.getType()).isEqualTo(FileAttributes.FileType.Directory);
Instant lastModifiedInstant = Instant.ofEpochMilli(attr.getLastModifiedTime());
assertThat(lastModifiedInstant).isCloseTo(Instant.now(), within(Duration.ofSeconds(10)));
Instant creationInstant = Instant.ofEpochMilli(attr.getCreationTime());
assertThat(creationInstant).isCloseTo(Instant.now(), within(Duration.ofSeconds(10)));
}
private static void compareFileAttribute(FileAttributes attr) {
assertThat(attr.getFileName()).isEqualTo("root.yml");
assertThat(attr.getType()).isEqualTo(FileAttributes.FileType.File);
assertThat(attr.getSize()).isEqualTo((long) CONTENT_STRING.length());
Instant lastModifiedInstant = Instant.ofEpochMilli(attr.getLastModifiedTime());
assertThat(lastModifiedInstant).isCloseTo(Instant.now(), within(Duration.ofSeconds(10)));
Instant creationInstant = Instant.ofEpochMilli(attr.getCreationTime());
assertThat(creationInstant).isCloseTo(Instant.now(), within(Duration.ofSeconds(10)));
}
//endregion
//region test PUT
@@ -651,6 +717,17 @@ public abstract class StorageTestSuite {
assertThat(put.toString()).isEqualTo(new URI("kestra:///" + prefix + "/storage/put.yml").toString());
assertThat(CharStreams.toString(new InputStreamReader(get))).isEqualTo(CONTENT_STRING);
}
@Test
void putInstanceResource() throws Exception {
String prefix = IdUtils.create();
URI put = putInstanceFile("/" + prefix + "/storage/put.yml");
InputStream get = storageInterface.getInstanceResource(prefix, new URI("/" + prefix + "/storage/put.yml"));
assertThat(put.toString()).isEqualTo(new URI("kestra:///" + prefix + "/storage/put.yml").toString());
assertThat(CharStreams.toString(new InputStreamReader(get))).isEqualTo(CONTENT_STRING);
}
//endregion
//region test DELETE
@@ -726,6 +803,28 @@ public abstract class StorageTestSuite {
assertTrue(storageInterface.delete(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/get.yml")));
assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/get.yml"))).isFalse();
}
@Test
void deleteInstanceResource() throws Exception {
String prefix = IdUtils.create();
List<String> paths = Arrays.asList(
"/" + prefix + "/storage/root.yml",
"/" + prefix + "/storage/level1/1.yml",
"/" + prefix + "/storage/level1/level2/1.yml"
);
paths.forEach(throwConsumer(this::putInstanceFile));
assertThat(storageInterface.existsInstanceResource(prefix, new URI("/" + prefix + "/storage/root.yml"))).isTrue();
assertThat(storageInterface.existsInstanceResource(prefix, new URI("/" + prefix + "/storage/level1/1.yml"))).isTrue();
assertThat(storageInterface.existsInstanceResource(prefix, new URI("/" + prefix + "/storage/level1/level2/1.yml"))).isTrue();
boolean deleted = storageInterface.deleteInstanceResource(prefix, new URI("/" + prefix + "/storage/level1"));
assertThat(deleted).isTrue();
assertThat(storageInterface.existsInstanceResource(prefix, new URI("/" + prefix + "/storage/root.yml"))).isTrue();
assertThat(storageInterface.existsInstanceResource(prefix, new URI("/" + prefix + "/storage/level1/1.yml"))).isFalse();
assertThat(storageInterface.existsInstanceResource(prefix, new URI("/" + prefix + "/storage/level1/level2/1.yml"))).isFalse();
}
//endregion
//region test CREATEDIRECTORY
@@ -784,6 +883,17 @@ public abstract class StorageTestSuite {
hasProperty("fileName", is("first"))
));
}
@Test
void createInstanceDirectory() throws Exception {
String prefix = IdUtils.create();
storageInterface.createInstanceDirectory(prefix, new URI("/" + prefix + "/storage/level1"));
FileAttributes attr = storageInterface.getInstanceAttributes(prefix, new URI("/" + prefix + "/storage/level1"));
assertThat(attr.getFileName()).isEqualTo("level1");
assertThat(attr.getType()).isEqualTo(FileAttributes.FileType.Directory);
assertThat(attr.getLastModifiedTime()).isNotNull();
}
//endregion
//region test MOVE
@@ -992,4 +1102,23 @@ public abstract class StorageTestSuite {
)
);
}
private URI putInstanceFile(String path) throws Exception {
return storageInterface.putInstanceResource(
null,
new URI(path),
new ByteArrayInputStream(CONTENT_STRING.getBytes())
);
}
private URI putInstanceFile(String path, Map<String, String> metadata) throws Exception {
return storageInterface.putInstanceResource(
null,
new URI(path),
new StorageObject(
metadata,
new ByteArrayInputStream(CONTENT_STRING.getBytes())
)
);
}
}

View File

@@ -246,7 +246,7 @@ class KVControllerTest {
// Then
Assertions.assertEquals(HttpStatus.OK, response.getStatus());
Assertions.assertEquals(new ApiDeleteBulkResponse(List.of()), response.body());
assertThat(storageInterface.exists(null, NAMESPACE, toKVUri(NAMESPACE, "my-key"))).isFalse();
assertThat(storageInterface.exists(MAIN_TENANT, NAMESPACE, toKVUri(NAMESPACE, "my-key"))).isFalse();
}
@Test