mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(ns-files): add a metadata layer on top for better performance & versioned ns files
part of https://github.com/kestra-io/kestra/issues/5617
This commit is contained in:
committed by
brian-mulier-p
parent
d20f7039c7
commit
682d258e7b
@@ -28,10 +28,7 @@ import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.services.*;
|
||||
import io.kestra.core.storages.InternalNamespace;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.*;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.test.flow.TaskFixture;
|
||||
import io.kestra.core.topologies.FlowTopologyService;
|
||||
@@ -202,9 +199,12 @@ public class ExecutionController {
|
||||
@Inject
|
||||
private LocalPathFactory localPathFactory;
|
||||
|
||||
@Inject
|
||||
private NamespaceFactory namespaceFactory;
|
||||
|
||||
@Inject
|
||||
private SecureVariableRendererFactory secureVariableRendererFactory;
|
||||
|
||||
|
||||
@Value("${" + LocalPath.ENABLE_PREVIEW_CONFIG + ":true}")
|
||||
private boolean enableLocalFilePreview;
|
||||
|
||||
@@ -951,9 +951,9 @@ public class ExecutionController {
|
||||
);
|
||||
}
|
||||
|
||||
private URI nsFileToInternalStorageURI(URI path, Execution execution) {
|
||||
InternalNamespace internalNamespace = new InternalNamespace(execution.getTenantId(), execution.getNamespace(), storageInterface);
|
||||
return internalNamespace.get(Path.of(path.getPath())).uri();
|
||||
private URI nsFileToInternalStorageURI(URI path, Execution execution) throws IOException {
|
||||
Namespace namespace = namespaceFactory.of(execution.getTenantId(), execution.getNamespace(), storageInterface);
|
||||
return namespace.get(Path.of(path.getPath())).uri();
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@@ -1514,7 +1514,7 @@ public class ExecutionController {
|
||||
}
|
||||
|
||||
protected Mono<HttpResponse<?>> resumeFoundExecution(MultipartBody inputs, Execution execution,
|
||||
Flow flow) {
|
||||
Flow flow) {
|
||||
Pause.Resumed resumed = createResumed();
|
||||
|
||||
return this.executionService.resume(execution, flow, State.Type.RUNNING, inputs, resumed)
|
||||
@@ -2587,7 +2587,7 @@ public class ExecutionController {
|
||||
|
||||
return HttpResponse.ok(
|
||||
CSVUtils.toCSVFlux(
|
||||
executionRepository.findAsync(this.tenantService.resolveTenant(), QueryFilterUtils.replaceTimeRangeWithComputedStartDateFilter(filters))
|
||||
executionRepository.findAsync(this.tenantService.resolveTenant(), QueryFilterUtils.replaceTimeRangeWithComputedStartDateFilter(filters))
|
||||
.map(log -> objectMapper.convertValue(log, Map.class))
|
||||
)
|
||||
)
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
package io.kestra.webserver.controllers.api;
|
||||
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.models.FetchVersion;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.*;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.Rethrow;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.http.HttpHeaders;
|
||||
import io.micronaut.http.HttpResponse;
|
||||
import io.micronaut.http.MediaType;
|
||||
@@ -21,6 +24,7 @@ import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
@@ -31,12 +35,15 @@ import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipInputStream;
|
||||
import java.util.zip.ZipOutputStream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
@Slf4j
|
||||
@Validated
|
||||
@Controller("/api/v1/{tenant}/namespaces")
|
||||
@@ -48,23 +55,23 @@ public class NamespaceFileController {
|
||||
private TenantService tenantService;
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
@Inject
|
||||
private NamespaceFactory namespaceFactory;
|
||||
@Inject
|
||||
private NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
|
||||
|
||||
private final List<Pattern> forbiddenPathPatterns = List.of(
|
||||
Pattern.compile("/" + FLOWS_FOLDER + "(/.*)?$")
|
||||
);
|
||||
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Get(uri = "{namespace}/files/search")
|
||||
@Operation(tags = {"Files"}, summary = "Find files which path contain the given string in their URI")
|
||||
public List<String> searchNamespaceFiles(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The string the file path should contain") @QueryValue String q
|
||||
) throws IOException, URISyntaxException {
|
||||
URI baseNamespaceFilesUri = NamespaceFile.of(namespace).uri();
|
||||
return storageInterface.allByPrefix(tenantService.resolveTenant(), namespace, baseNamespaceFilesUri, false).stream()
|
||||
.map(storageUri -> "/" + baseNamespaceFilesUri.relativize(storageUri).getPath())
|
||||
.filter(path -> path.contains(q)).toList();
|
||||
) throws IOException {
|
||||
return namespaceFactory.of(tenantService.resolveTenant(), namespace, storageInterface).all(q).stream().map(namespaceFile -> namespaceFile.path(true).toString()).toList();
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@@ -72,7 +79,8 @@ public class NamespaceFileController {
|
||||
@Operation(tags = {"Files"}, summary = "Get namespace file content")
|
||||
public HttpResponse<StreamedFile> getFileContent(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri") @QueryValue String path
|
||||
@Parameter(description = "The internal storage uri") @QueryValue String path,
|
||||
@Nullable @Parameter(description = "The revision, if not provided, the latest revision will be returned") @QueryValue Integer revision
|
||||
) throws IOException, URISyntaxException {
|
||||
URI encodedPath = null;
|
||||
if (path != null) {
|
||||
@@ -80,8 +88,10 @@ public class NamespaceFileController {
|
||||
}
|
||||
forbiddenPathsGuard(encodedPath);
|
||||
|
||||
InputStream fileHandler = storageInterface.get(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace, encodedPath).uri());
|
||||
return HttpResponse.ok(new StreamedFile(fileHandler, MediaType.APPLICATION_OCTET_STREAM_TYPE)).header(HttpHeaders.CACHE_CONTROL, "no-cache");
|
||||
Path filePath = Optional.ofNullable(encodedPath).map(URI::getPath).map(Path::of).orElseThrow();
|
||||
InputStream fileContent = namespaceFactory.of(tenantService.resolveTenant(), namespace, storageInterface)
|
||||
.getFileContent(filePath, revision);
|
||||
return HttpResponse.ok(new StreamedFile(fileContent, MediaType.APPLICATION_OCTET_STREAM_TYPE)).header(HttpHeaders.CACHE_CONTROL, "no-cache");
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@@ -98,14 +108,46 @@ public class NamespaceFileController {
|
||||
forbiddenPathsGuard(encodedPath);
|
||||
|
||||
// if stats is performed upon namespace root, and it doesn't exist yet, we create it
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantService.resolveTenant(), namespace, storageInterface);
|
||||
Path rootPath = Path.of("/");
|
||||
if (path == null || path.isEmpty()) {
|
||||
if(!storageInterface.exists(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace).uri())) {
|
||||
storageInterface.createDirectory(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace).uri());
|
||||
if (!namespaceStorage.exists(rootPath)) {
|
||||
namespaceStorage.createDirectory(rootPath);
|
||||
}
|
||||
return storageInterface.getAttributes(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace).uri());
|
||||
return namespaceStorage.getFileMetadata(rootPath);
|
||||
}
|
||||
|
||||
return storageInterface.getAttributes(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace, encodedPath).uri());
|
||||
return namespaceStorage.getFileMetadata(Path.of(encodedPath.getPath()));
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Get(uri = "{namespace}/files/revisions")
|
||||
@Operation(tags = {"Files"}, summary = "Get namespace file revisions")
|
||||
public List<NamespaceFileRevision> getFileRevisions(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri") @Nullable @QueryValue String path
|
||||
) throws IOException, URISyntaxException {
|
||||
URI encodedPath = null;
|
||||
if (path != null) {
|
||||
encodedPath = new URI(URLEncoder.encode(path, StandardCharsets.UTF_8));
|
||||
}
|
||||
forbiddenPathsGuard(encodedPath);
|
||||
|
||||
encodedPath = Optional.ofNullable(encodedPath).orElse(URI.create("/"));
|
||||
|
||||
ArrayListTotal<NamespaceFileMetadata> namespaceFileMetadata = namespaceFileMetadataRepository.find(Pageable.UNPAGED, tenantService.resolveTenant(), List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build(),
|
||||
QueryFilter.builder().field(QueryFilter.Field.PATH).operation(QueryFilter.Op.EQUALS).value(encodedPath.getPath()).build()
|
||||
), true, FetchVersion.ALL);
|
||||
|
||||
if (namespaceFileMetadata.stream()
|
||||
.filter(NamespaceFileMetadata::isLast)
|
||||
.map(NamespaceFileMetadata::isDeleted).findFirst()
|
||||
.orElse(true)) {
|
||||
throw new FileNotFoundException("File not found: " + encodedPath.getPath());
|
||||
}
|
||||
|
||||
return namespaceFileMetadata.map(metadata -> new NamespaceFileRevision(metadata.getVersion()));
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@@ -121,14 +163,18 @@ public class NamespaceFileController {
|
||||
}
|
||||
forbiddenPathsGuard(encodedPath);
|
||||
|
||||
NamespaceFile namespaceFile = NamespaceFile.of(namespace, encodedPath);
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantService.resolveTenant(), namespace, storageInterface);
|
||||
Path dirPath = Path.of(Optional.ofNullable(encodedPath).map(URI::getPath).orElse("/"));
|
||||
|
||||
if (namespaceFile.isRootDirectory() && !storageInterface.exists(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace).uri())) {
|
||||
storageInterface.createDirectory(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace).uri());
|
||||
return Collections.emptyList();
|
||||
if (dirPath.toString().equals("/") && !namespaceStorage.exists(dirPath)) {
|
||||
namespaceStorage.createDirectory(dirPath);
|
||||
} else if (!namespaceStorage.exists(dirPath)) {
|
||||
throw new FileNotFoundException("Directory not found: " + dirPath);
|
||||
}
|
||||
|
||||
return storageInterface.list(tenantService.resolveTenant(), namespace, namespaceFile.uri());
|
||||
return namespaceStorage.children(dirPath.toString(), false).stream()
|
||||
.map(namespaceFileMetadata -> (FileAttributes) new NamespaceFileAttributes(namespaceFileMetadata))
|
||||
.toList();
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@@ -144,7 +190,8 @@ public class NamespaceFileController {
|
||||
}
|
||||
forbiddenPathsGuard(encodedPath);
|
||||
|
||||
storageInterface.createDirectory(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace, encodedPath).uri());
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantService.resolveTenant(), namespace, storageInterface);
|
||||
namespaceStorage.createDirectory(Optional.ofNullable(encodedPath).map(URI::getPath).map(Path::of).orElse(Path.of("/")));
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@@ -155,7 +202,12 @@ public class NamespaceFileController {
|
||||
@Parameter(description = "The internal storage uri") @QueryValue String path,
|
||||
@Parameter(description = "The file to upload") @Part CompletedFileUpload fileContent
|
||||
) throws Exception {
|
||||
innerCreateNamespaceFile(namespace, path, fileContent);
|
||||
}
|
||||
|
||||
protected List<NamespaceFile> innerCreateNamespaceFile(String namespace, String path, CompletedFileUpload fileContent) throws Exception {
|
||||
String tenantId = tenantService.resolveTenant();
|
||||
List<NamespaceFile> createdFiles = new ArrayList<>();
|
||||
if (fileContent.getFilename().toLowerCase().endsWith(".zip")) {
|
||||
try (ZipInputStream archive = new ZipInputStream(fileContent.getInputStream())) {
|
||||
ZipEntry entry;
|
||||
@@ -165,7 +217,7 @@ public class NamespaceFileController {
|
||||
}
|
||||
|
||||
try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(archive.readAllBytes()))) {
|
||||
putNamespaceFile(tenantId, namespace, URI.create("/" + entry.getName()), inputStream);
|
||||
createdFiles.addAll(putNamespaceFile(tenantId, namespace, URI.create("/" + entry.getName()), inputStream));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -177,25 +229,29 @@ public class NamespaceFileController {
|
||||
return (int) fileContent.getSize();
|
||||
}
|
||||
}) {
|
||||
putNamespaceFile(tenantId, namespace, new URI(URLEncoder.encode(path, StandardCharsets.UTF_8)), inputStream);
|
||||
createdFiles.addAll(putNamespaceFile(tenantId, namespace, new URI(URLEncoder.encode(path, StandardCharsets.UTF_8)), inputStream));
|
||||
}
|
||||
}
|
||||
|
||||
return createdFiles;
|
||||
}
|
||||
|
||||
private void putNamespaceFile(String tenantId, String namespace, URI path, BufferedInputStream inputStream) throws Exception {
|
||||
private List<NamespaceFile> putNamespaceFile(String tenantId, String namespace, URI path, BufferedInputStream inputStream) throws Exception {
|
||||
String filePath = path.getPath();
|
||||
if(filePath.matches("/" + FLOWS_FOLDER + "/.*")) {
|
||||
if(filePath.split("/").length != 3) {
|
||||
if (filePath.matches("/" + FLOWS_FOLDER + "/.*")) {
|
||||
if (filePath.split("/").length != 3) {
|
||||
throw new IllegalArgumentException("Invalid flow file path: " + filePath);
|
||||
}
|
||||
|
||||
String flowSource = new String(inputStream.readAllBytes());
|
||||
flowSource = flowSource.replaceFirst("(?m)^namespace: .*$", "namespace: " + namespace);
|
||||
this.importFlow(tenantId, flowSource);
|
||||
return;
|
||||
return Collections.emptyList();
|
||||
}
|
||||
forbiddenPathsGuard(path);
|
||||
storageInterface.put(tenantId, namespace, NamespaceFile.of(namespace, path).uri(), inputStream);
|
||||
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
|
||||
return namespaceStorage.putFile(Path.of(path.getPath()), inputStream);
|
||||
}
|
||||
|
||||
protected void importFlow(String tenantId, String source) throws FlowProcessingException {
|
||||
@@ -207,21 +263,26 @@ public class NamespaceFileController {
|
||||
@Operation(tags = {"Files"}, summary = "Export namespace files as a ZIP")
|
||||
public HttpResponse<byte[]> exportNamespaceFiles(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace
|
||||
) throws IOException {
|
||||
) throws IOException {
|
||||
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
ZipOutputStream archive = new ZipOutputStream(bos)) {
|
||||
|
||||
URI baseNamespaceFilesUri = NamespaceFile.of(namespace).uri();
|
||||
String tenantId = tenantService.resolveTenant();
|
||||
storageInterface.allByPrefix(tenantId, namespace, baseNamespaceFilesUri, false).forEach(Rethrow.throwConsumer(uri -> {
|
||||
try (InputStream inputStream = storageInterface.get(tenantId, namespace, uri)) {
|
||||
archive.putNextEntry(new ZipEntry(baseNamespaceFilesUri.relativize(uri).getPath()));
|
||||
archive.write(inputStream.readAllBytes());
|
||||
archive.closeEntry();
|
||||
}
|
||||
}));
|
||||
|
||||
flowService.findByNamespaceWithSource(tenantId, namespace).forEach(Rethrow.throwConsumer(flowWithSource -> {
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
|
||||
List<NamespaceFileMetadata> allNsFiles = namespaceStorage.children("/", true);
|
||||
allNsFiles.stream()
|
||||
.filter(Predicate.not(NamespaceFileMetadata::isDirectory))
|
||||
.map(NamespaceFileMetadata::getPath)
|
||||
.forEach(throwConsumer(path -> {
|
||||
try (InputStream inputStream = namespaceStorage.getFileContent(Path.of(path))) {
|
||||
archive.putNextEntry(new ZipEntry(path.substring(1))); // remove leading slash
|
||||
archive.write(inputStream.readAllBytes());
|
||||
archive.closeEntry();
|
||||
}
|
||||
}));
|
||||
|
||||
flowService.findByNamespaceWithSource(tenantId, namespace).forEach(throwConsumer(flowWithSource -> {
|
||||
try {
|
||||
archive.putNextEntry(new ZipEntry(FLOWS_FOLDER + "/" + flowWithSource.getId() + ".yml"));
|
||||
archive.write(flowWithSource.getSource().getBytes());
|
||||
@@ -244,11 +305,18 @@ public class NamespaceFileController {
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri to move from") @QueryValue URI from,
|
||||
@Parameter(description = "The internal storage uri to move to") @QueryValue URI to
|
||||
) throws IOException, URISyntaxException {
|
||||
) throws Exception {
|
||||
innerMoveFileDirectory(namespace, from, to);
|
||||
}
|
||||
|
||||
protected List<Pair<NamespaceFile, NamespaceFile>> innerMoveFileDirectory(String namespace, URI from, URI to) throws Exception {
|
||||
ensureWritableNamespaceFile(from);
|
||||
ensureWritableNamespaceFile(to);
|
||||
|
||||
storageInterface.move(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace, from).uri(),NamespaceFile.of(namespace, to).uri());
|
||||
String tenantId = tenantService.resolveTenant();
|
||||
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
|
||||
return namespaceStorage.move(Path.of(from.getPath()), Path.of(to.getPath()));
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@@ -258,7 +326,11 @@ public class NamespaceFileController {
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri of the file / directory to delete") @QueryValue String path
|
||||
) throws IOException, URISyntaxException {
|
||||
URI encodedPath = null;
|
||||
innerDeleteFileDirectory(namespace, path);
|
||||
}
|
||||
|
||||
protected List<NamespaceFile> innerDeleteFileDirectory(String namespace, String path) throws URISyntaxException, IOException {
|
||||
URI encodedPath;
|
||||
if (!path.startsWith("/")) {
|
||||
path = "/" + path;
|
||||
}
|
||||
@@ -267,30 +339,19 @@ public class NamespaceFileController {
|
||||
|
||||
String pathWithoutScheme = encodedPath.getPath();
|
||||
|
||||
List<String> allNamespaceFilesPaths = storageInterface.allByPrefix(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace).storagePath().toUri(), true)
|
||||
.stream()
|
||||
.map(uri -> NamespaceFile.of(namespace, uri).path(true).toString())
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
String tenantId = tenantService.resolveTenant();
|
||||
|
||||
if (allNamespaceFilesPaths.contains(pathWithoutScheme + "/")) {
|
||||
// the given path to delete is a directory
|
||||
pathWithoutScheme = pathWithoutScheme + "/";
|
||||
String zombieAwarePathToDelete = pathWithoutScheme;
|
||||
String parentPathToCheck = NamespaceFileMetadata.parentPath(zombieAwarePathToDelete);
|
||||
while (parentPathToCheck != null && !parentPathToCheck.equals("/") && namespaceFileMetadataRepository.find(Pageable.from(1, 2), tenantService.resolveTenant(), List.of(
|
||||
QueryFilter.builder().field(QueryFilter.Field.PARENT_PATH).operation(QueryFilter.Op.EQUALS).value(parentPathToCheck).build()
|
||||
), false).size() == 1) {
|
||||
zombieAwarePathToDelete = parentPathToCheck;
|
||||
parentPathToCheck = NamespaceFileMetadata.parentPath(parentPathToCheck);
|
||||
}
|
||||
|
||||
while (!pathWithoutScheme.equals("/")) {
|
||||
String parentFolder = pathWithoutScheme.substring(0, pathWithoutScheme.lastIndexOf('/') + 1);
|
||||
if (parentFolder.equals("/")) {
|
||||
break;
|
||||
}
|
||||
List<String> filesInParentFolder = allNamespaceFilesPaths.stream().filter(p -> p.length() > parentFolder.length() && p.startsWith(parentFolder)).toList();
|
||||
// there is more than one file in this folder so we stop the cascade deletion there
|
||||
if (filesInParentFolder.size() > 1) {
|
||||
break;
|
||||
}
|
||||
allNamespaceFilesPaths.removeIf(filesInParentFolder::contains);
|
||||
pathWithoutScheme = parentFolder.endsWith("/") ? parentFolder.substring(0, parentFolder.length() - 1) : parentFolder;
|
||||
}
|
||||
storageInterface.delete(tenantService.resolveTenant(), namespace, NamespaceFile.of(namespace, Path.of(pathWithoutScheme)).uri());
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
|
||||
return namespaceStorage.delete(Path.of(zombieAwarePathToDelete));
|
||||
}
|
||||
|
||||
private void forbiddenPathsGuard(URI path) {
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.webserver.controllers.api;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
@@ -26,7 +25,8 @@ import io.kestra.core.runners.InputsTest;
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
@@ -60,9 +60,11 @@ import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
@@ -126,6 +128,9 @@ class ExecutionControllerRunnerTest {
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
private NamespaceFactory namespaceFactory;
|
||||
|
||||
public static final String TESTS_FLOW_NS = "io.kestra.tests";
|
||||
public static final String TENANT_ID = "main";
|
||||
|
||||
@@ -844,7 +849,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
void previewNsFileFromExecution() throws TimeoutException, QueueException, IOException {
|
||||
void previewNsFileFromExecution() throws TimeoutException, QueueException, IOException, URISyntaxException {
|
||||
HashMap<String, Object> newInputs = new HashMap<>(InputsTest.inputs);
|
||||
URI file = createNsFile(false);
|
||||
newInputs.put("file", file);
|
||||
@@ -2338,11 +2343,11 @@ class ExecutionControllerRunnerTest {
|
||||
return tempFile.toPath().toUri();
|
||||
}
|
||||
|
||||
private URI createNsFile(boolean nsInAuthority) throws IOException {
|
||||
private URI createNsFile(boolean nsInAuthority) throws IOException, URISyntaxException {
|
||||
String namespace = "io.kestra.tests";
|
||||
String filePath = "file.txt";
|
||||
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
|
||||
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello World".getBytes()));
|
||||
Namespace namespaceStorage = namespaceFactory.of(MAIN_TENANT, namespace, storageInterface);
|
||||
namespaceStorage.putFile(Path.of("/" + filePath), new ByteArrayInputStream("Hello World".getBytes()));
|
||||
return URI.create("nsfile://" + (nsInAuthority ? namespace : "") + "/" + filePath);
|
||||
}
|
||||
|
||||
|
||||
@@ -8,9 +8,9 @@ import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.*;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.flow.Subflow;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.core.type.Argument;
|
||||
@@ -26,7 +26,9 @@ import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.AllArgsConstructor;
|
||||
@@ -38,9 +40,7 @@ import org.junit.jupiter.api.function.Executable;
|
||||
|
||||
@KestraTest
|
||||
class NamespaceFileControllerTest {
|
||||
|
||||
private static final String NAMESPACE = "io.namespace";
|
||||
public static final String TENANT_ID = "main";
|
||||
public static final String TENANT_ID = TenantService.MAIN_TENANT;
|
||||
|
||||
@Inject
|
||||
@Client("/")
|
||||
@@ -52,94 +52,145 @@ class NamespaceFileControllerTest {
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@AfterEach
|
||||
public void clean() throws IOException {
|
||||
storageInterface.delete(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, null));
|
||||
}
|
||||
@Inject
|
||||
private NamespaceFactory namespaceFactory;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void searchNamespaceFiles() throws IOException {
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/file.txt")), new ByteArrayInputStream(new byte[0]));
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/another_file.json")), new ByteArrayInputStream(new byte[0]));
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folder/file.txt")), new ByteArrayInputStream(new byte[0]));
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folder/some.yaml")), new ByteArrayInputStream(new byte[0]));
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folder/sub/script.py")), new ByteArrayInputStream(new byte[0]));
|
||||
void searchNamespaceFiles() throws IOException, URISyntaxException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
Namespace namespaceStorage = namespaceFactory.of(TENANT_ID, namespace, storageInterface);
|
||||
|
||||
String res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/search?q=file"));
|
||||
namespaceStorage.putFile(Path.of("/file.txt"), new ByteArrayInputStream(new byte[0]));
|
||||
namespaceStorage.putFile(Path.of("/another_file.json"), new ByteArrayInputStream(new byte[0]));
|
||||
namespaceStorage.putFile(Path.of("/folder/file.txt"), new ByteArrayInputStream(new byte[0]));
|
||||
namespaceStorage.putFile(Path.of("/folder/some.yaml"), new ByteArrayInputStream(new byte[0]));
|
||||
namespaceStorage.putFile(Path.of("/folder/sub/script.py"), new ByteArrayInputStream(new byte[0]));
|
||||
|
||||
String res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/search?q=file"));
|
||||
assertThat((Iterable<String>) JacksonMapper.toObject(res)).containsExactlyInAnyOrder("/file.txt", "/another_file.json", "/folder/file.txt");
|
||||
|
||||
res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/search?q=file.txt"));
|
||||
res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/search?q=file.txt"));
|
||||
assertThat((Iterable<String>) JacksonMapper.toObject(res)).containsExactlyInAnyOrder("/file.txt", "/folder/file.txt");
|
||||
|
||||
res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/search?q=folder"));
|
||||
res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/search?q=folder"));
|
||||
assertThat((Iterable<String>) JacksonMapper.toObject(res)).containsExactlyInAnyOrder("/folder/file.txt", "/folder/some.yaml", "/folder/sub/script.py");
|
||||
|
||||
res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/search?q=.py"));
|
||||
res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/search?q=.py"));
|
||||
assertThat((Iterable<String>) JacksonMapper.toObject(res)).containsExactlyInAnyOrder("/folder/sub/script.py");
|
||||
}
|
||||
|
||||
@Test
|
||||
void getFileContent() throws IOException {
|
||||
void getFileContent() throws IOException, URISyntaxException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
Namespace namespaceStorage = namespaceFactory.of(TENANT_ID, namespace, storageInterface);
|
||||
String hw = "Hello World";
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/test.txt")), new ByteArrayInputStream(hw.getBytes()));
|
||||
String res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/test.txt"));
|
||||
namespaceStorage.putFile(Path.of("/test.txt"), new ByteArrayInputStream(hw.getBytes()));
|
||||
String res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files?path=/test.txt"));
|
||||
assertThat(res).isEqualTo(hw);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getFileMetadatas() throws IOException {
|
||||
void getFileContentWithRevision() throws IOException, URISyntaxException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
Namespace namespaceStorage = namespaceFactory.of(TENANT_ID, namespace, storageInterface);
|
||||
String content1 = "Hello World";
|
||||
String content2 = "Hello World 2";
|
||||
namespaceStorage.putFile(Path.of("/test.txt"), new ByteArrayInputStream(content1.getBytes()));
|
||||
namespaceStorage.putFile(Path.of("/test.txt"), new ByteArrayInputStream(content2.getBytes()));
|
||||
|
||||
String res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files?path=/test.txt&revision=1"));
|
||||
assertThat(res).isEqualTo(content1);
|
||||
|
||||
res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files?path=/test.txt&revision=2"));
|
||||
assertThat(res).isEqualTo(content2);
|
||||
|
||||
res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files?path=/test.txt"));
|
||||
assertThat(res).isEqualTo(content2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getFileMetadatas() throws IOException, URISyntaxException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
Namespace namespaceStorage = namespaceFactory.of(TENANT_ID, namespace, storageInterface);
|
||||
String hw = "Hello World";
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/test.txt")), new ByteArrayInputStream(hw.getBytes()));
|
||||
FileAttributes res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/stats?path=/test.txt"), TestFileAttributes.class);
|
||||
namespaceStorage.putFile(Path.of("/test.txt"), new ByteArrayInputStream(hw.getBytes()));
|
||||
FileAttributes res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/stats?path=/test.txt"), TestFileAttributes.class);
|
||||
assertThat(res.getFileName()).isEqualTo("test.txt");
|
||||
assertThat(res.getType()).isEqualTo(FileAttributes.FileType.File);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
void getRevisions() throws IOException, URISyntaxException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
Namespace namespaceStorage = namespaceFactory.of(TENANT_ID, namespace, storageInterface);
|
||||
namespaceStorage.putFile(Path.of("/test.txt"), new ByteArrayInputStream("Hello World".getBytes()));
|
||||
|
||||
List<NamespaceFileRevision> res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/revisions?path=/test.txt"), Argument.of(List.class, NamespaceFileRevision.class));
|
||||
assertThat(res).containsExactlyInAnyOrder(new NamespaceFileRevision(1));
|
||||
|
||||
namespaceStorage.putFile(Path.of("/test.txt"), new ByteArrayInputStream("Hello World 2".getBytes()));
|
||||
|
||||
res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/revisions?path=/test.txt"), Argument.of(List.class, NamespaceFileRevision.class));
|
||||
assertThat(res).containsExactlyInAnyOrder(new NamespaceFileRevision(1), new NamespaceFileRevision(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
void namespaceRootGetFileMetadatasWithoutPreCreation() {
|
||||
FileAttributes res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/stats"), TestFileAttributes.class);
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
FileAttributes res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/stats"), TestFileAttributes.class);
|
||||
assertThat(res.getFileName()).isEqualTo("_files");
|
||||
assertThat(res.getType()).isEqualTo(FileAttributes.FileType.Directory);
|
||||
}
|
||||
|
||||
@Test
|
||||
void listNamespaceDirectoryFiles() throws IOException {
|
||||
void listNamespaceDirectoryFiles() throws IOException, URISyntaxException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
Namespace namespaceStorage = namespaceFactory.of(TENANT_ID, namespace, storageInterface);
|
||||
String hw = "Hello World";
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/test/test.txt")), new ByteArrayInputStream(hw.getBytes()));
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/test/test2.txt")), new ByteArrayInputStream(hw.getBytes()));
|
||||
namespaceStorage.putFile(Path.of("/test/test.txt"), new ByteArrayInputStream(hw.getBytes()));
|
||||
namespaceStorage.putFile(Path.of("/test/test2.txt"), new ByteArrayInputStream(hw.getBytes()));
|
||||
|
||||
List<FileAttributes> res = List.of(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/directory"), TestFileAttributes[].class));
|
||||
List<FileAttributes> res = List.of(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/directory"), TestFileAttributes[].class));
|
||||
assertThat(res.stream().map(FileAttributes::getFileName).toList()).containsExactlyInAnyOrder("test");
|
||||
|
||||
res = List.of(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/directory?path=/test"), TestFileAttributes[].class));
|
||||
res = List.of(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/directory?path=/test"), TestFileAttributes[].class));
|
||||
assertThat(res.stream().map(FileAttributes::getFileName).toList()).containsExactlyInAnyOrder("test.txt", "test2.txt");
|
||||
}
|
||||
|
||||
@Test
|
||||
void listNamespaceDirectoryFilesWithoutPreCreation() {
|
||||
void listNamespaceDirectoryFilesNotExisting() {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
// Root directory will be automatically created
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, null))).isFalse();
|
||||
List<FileAttributes> res = List.of(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/directory"), TestFileAttributes[].class));
|
||||
TENANT_ID, namespace, toNamespacedStorageUri(namespace, null))).isFalse();
|
||||
List<FileAttributes> res = List.of(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/directory"), TestFileAttributes[].class));
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, null))).isTrue();
|
||||
assertThat(res.stream().map(FileAttributes::getFileName).count()).isEqualTo(0L);
|
||||
TENANT_ID, namespace, toNamespacedStorageUri(namespace, null))).isTrue();
|
||||
assertThat(res.size()).isEqualTo(0);
|
||||
|
||||
HttpClientResponseException notFoundException = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/directory?path=/not_existing_directory"), TestFileAttributes[].class));
|
||||
assertThat(notFoundException.getMessage()).contains("Directory not found: /not_existing_directory");
|
||||
}
|
||||
|
||||
@Test
|
||||
void createNamespaceDirectory() throws IOException {
|
||||
client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/namespaces/" + NAMESPACE + "/files/directory?path=/test", null));
|
||||
client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/namespaces/" + NAMESPACE + "/files/directory?path=/_flows2", null));
|
||||
FileAttributes res = storageInterface.getAttributes(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/test")));
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/namespaces/" + namespace + "/files/directory?path=/test", null));
|
||||
client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/namespaces/" + namespace + "/files/directory?path=/_flows2", null));
|
||||
FileAttributes res = storageInterface.getAttributes(TENANT_ID, namespace, toNamespacedStorageUri(namespace, URI.create("/test")));
|
||||
assertThat(res.getFileName()).isEqualTo("test");
|
||||
assertThat(res.getType()).isEqualTo(FileAttributes.FileType.Directory);
|
||||
FileAttributes flows = storageInterface.getAttributes(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/_flows2")));
|
||||
FileAttributes flows = storageInterface.getAttributes(TENANT_ID, namespace, toNamespacedStorageUri(namespace, URI.create("/_flows2")));
|
||||
assertThat(flows.getFileName()).isEqualTo("_flows2");
|
||||
assertThat(flows.getType()).isEqualTo(FileAttributes.FileType.Directory);
|
||||
}
|
||||
|
||||
@Test
|
||||
void createNamespaceDirectoryException() {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() ->
|
||||
@@ -147,37 +198,39 @@ class NamespaceFileControllerTest {
|
||||
.toBlocking()
|
||||
.exchange(
|
||||
HttpRequest.POST(
|
||||
"/api/v1/main/namespaces/" + NAMESPACE + "/files/directory?path=/_flows",
|
||||
"/api/v1/main/namespaces/" + namespace + "/files/directory?path=/_flows",
|
||||
null)));
|
||||
}
|
||||
|
||||
@Test
|
||||
void createGetFileContent() throws IOException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
MultipartBody body = MultipartBody.builder()
|
||||
.addPart("fileContent", "test.txt", "Hello".getBytes())
|
||||
.build();
|
||||
client.toBlocking().exchange(
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/test.txt", body)
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + namespace + "/files?path=/test.txt", body)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
|
||||
);
|
||||
assertNamespaceGetFileContentContent(URI.create("/test.txt"), "Hello");
|
||||
assertNamespaceGetFileContentContent(namespace, URI.create("/test.txt"), "Hello");
|
||||
MultipartBody flowBody = MultipartBody.builder()
|
||||
.addPart("fileContent", "_flowsFile", "Hello".getBytes())
|
||||
.build();
|
||||
client.toBlocking().exchange(
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/_flowsFile", flowBody)
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + namespace + "/files?path=/_flowsFile", flowBody)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
|
||||
);
|
||||
assertNamespaceGetFileContentContent(URI.create("/_flowsFile"), "Hello");
|
||||
assertNamespaceGetFileContentContent(namespace, URI.create("/_flowsFile"), "Hello");
|
||||
}
|
||||
|
||||
@Test
|
||||
void createGetFileContentFlowException() {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
MultipartBody body = MultipartBody.builder()
|
||||
.addPart("fileContent", "_flows", "Hello".getBytes())
|
||||
.build();
|
||||
assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/_flows", body)
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + namespace + "/files?path=/_flows", body)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
|
||||
));
|
||||
}
|
||||
@@ -185,117 +238,115 @@ class NamespaceFileControllerTest {
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/task-flow.yaml"})
|
||||
void createGetFileContent_AddFlow() throws IOException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String flowSource = flowRepository.findByIdWithSource(TENANT_ID, "io.kestra.tests", "task-flow").get().getSource();
|
||||
File temp = File.createTempFile("task-flow", ".yml");
|
||||
Files.write(temp.toPath(), flowSource.getBytes());
|
||||
|
||||
assertThat(flowRepository.findByIdWithSource(TENANT_ID, NAMESPACE, "task-flow").isEmpty()).isTrue();
|
||||
assertThat(flowRepository.findByIdWithSource(TENANT_ID, namespace, "task-flow").isEmpty()).isTrue();
|
||||
|
||||
MultipartBody body = MultipartBody.builder()
|
||||
.addPart("fileContent", "task-flow.yml", temp)
|
||||
.build();
|
||||
client.toBlocking().exchange(
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/_flows/task-flow.yml", body)
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + namespace + "/files?path=/_flows/task-flow.yml", body)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
|
||||
);
|
||||
|
||||
assertThat(flowRepository.findByIdWithSource(TENANT_ID, NAMESPACE, "task-flow").get().getSource()).isEqualTo(flowSource.replaceFirst("(?m)^namespace: .*$", "namespace: " + NAMESPACE));
|
||||
assertThat(flowRepository.findByIdWithSource(TENANT_ID, namespace, "task-flow").get().getSource()).isEqualTo(flowSource.replaceFirst("(?m)^namespace: .*$", "namespace: " + namespace));
|
||||
|
||||
assertThat(storageInterface.exists(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/_flows/task-flow.yml")))).isFalse();
|
||||
assertThat(storageInterface.exists(TENANT_ID, namespace, toNamespacedStorageUri(namespace, URI.create("/_flows/task-flow.yml")))).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/task-flow.yaml"})
|
||||
void createGetFileContent_ExtractZip() throws IOException {
|
||||
void createGetFileContent_ExtractZip() throws IOException, URISyntaxException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
Namespace namespaceStorage = namespaceFactory.of(TENANT_ID, namespace, storageInterface);
|
||||
String namespaceToExport = "io.kestra.tests";
|
||||
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(namespaceToExport, URI.create("/file.txt")), new ByteArrayInputStream("file".getBytes()));
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(namespaceToExport, URI.create("/another_file.txt")), new ByteArrayInputStream("another_file".getBytes()));
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(namespaceToExport, URI.create("/folder/file.txt")), new ByteArrayInputStream("folder_file".getBytes()));
|
||||
storageInterface.createDirectory(TENANT_ID, NAMESPACE, toNamespacedStorageUri(namespaceToExport, URI.create("/empty_folder")));
|
||||
namespaceStorage.putFile(Path.of("/file.txt"), new ByteArrayInputStream("file".getBytes()));
|
||||
namespaceStorage.putFile(Path.of("/another_file.txt"), new ByteArrayInputStream("another_file".getBytes()));
|
||||
namespaceStorage.putFile(Path.of("/folder/file.txt"), new ByteArrayInputStream("folder_file".getBytes()));
|
||||
storageInterface.createDirectory(TENANT_ID, namespace, toNamespacedStorageUri(namespaceToExport, URI.create("/empty_folder")));
|
||||
|
||||
byte[] zip = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespaceToExport + "/files/export"),
|
||||
Argument.of(byte[].class));
|
||||
File temp = File.createTempFile("files", ".zip");
|
||||
Files.write(temp.toPath(), zip);
|
||||
|
||||
assertThat(flowRepository.findById(TENANT_ID, NAMESPACE, "task-flow").isEmpty()).isTrue();
|
||||
assertThat(flowRepository.findById(TENANT_ID, namespace, "task-flow").isEmpty()).isTrue();
|
||||
|
||||
MultipartBody body = MultipartBody.builder()
|
||||
.addPart("fileContent", "files.zip", temp)
|
||||
.build();
|
||||
client.toBlocking().exchange(
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/files.zip", body)
|
||||
HttpRequest.POST("/api/v1/main/namespaces/" + namespace + "/files?path=/files.zip", body)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
|
||||
);
|
||||
|
||||
assertNamespaceGetFileContentContent(URI.create("/file.txt"), "file");
|
||||
assertNamespaceGetFileContentContent(URI.create("/another_file.txt"), "another_file");
|
||||
assertThat(storageInterface.exists(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folder")))).isTrue();
|
||||
assertNamespaceGetFileContentContent(URI.create("/folder/file.txt"), "folder_file");
|
||||
assertNamespaceGetFileContentContent(namespace, URI.create("/file.txt"), "file");
|
||||
assertNamespaceGetFileContentContent(namespace, URI.create("/another_file.txt"), "another_file");
|
||||
assertThat(storageInterface.exists(TENANT_ID, namespace, toNamespacedStorageUri(namespace, URI.create("/folder")))).isTrue();
|
||||
assertNamespaceGetFileContentContent(namespace, URI.create("/folder/file.txt"), "folder_file");
|
||||
// Highlights the fact that we currently don't export / import empty folders (would require adding a method to storages to also retrieve folders)
|
||||
assertThat(storageInterface.exists(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/empty_folder")))).isFalse();
|
||||
assertThat(storageInterface.exists(TENANT_ID, namespace, toNamespacedStorageUri(namespace, URI.create("/empty_folder")))).isFalse();
|
||||
|
||||
Flow retrievedFlow = flowRepository.findById(TENANT_ID, NAMESPACE, "task-flow").get();
|
||||
assertThat(retrievedFlow.getNamespace()).isEqualTo(NAMESPACE);
|
||||
Flow retrievedFlow = flowRepository.findById(TENANT_ID, namespace, "task-flow").get();
|
||||
assertThat(retrievedFlow.getNamespace()).isEqualTo(namespace);
|
||||
assertThat(((Subflow) retrievedFlow.getTasks().getFirst()).getNamespace()).isEqualTo(namespaceToExport);
|
||||
}
|
||||
|
||||
private void assertNamespaceGetFileContentContent(URI fileUri, String expectedContent) throws IOException {
|
||||
InputStream inputStream = storageInterface.get(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, fileUri));
|
||||
private void assertNamespaceGetFileContentContent(String namespace, URI fileUri, String expectedContent) throws IOException {
|
||||
InputStream inputStream = storageInterface.get(TENANT_ID, namespace, toNamespacedStorageUri(namespace, fileUri));
|
||||
String content = new String(inputStream.readAllBytes());
|
||||
assertThat(content).isEqualTo(expectedContent);
|
||||
}
|
||||
|
||||
@Test
|
||||
void moveFileDirectory() throws IOException {
|
||||
storageInterface.createDirectory(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/test")));
|
||||
client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/files?from=/test&to=/foo", null));
|
||||
FileAttributes res = storageInterface.getAttributes(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/foo")));
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
Namespace namespaceStorage = namespaceFactory.of(TENANT_ID, namespace, storageInterface);
|
||||
namespaceStorage.createDirectory(Path.of("/test"));
|
||||
client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + namespace + "/files?from=/test&to=/foo", null));
|
||||
FileAttributes res = namespaceStorage.getFileMetadata(Path.of("/foo"));
|
||||
assertThat(res.getFileName()).isEqualTo("foo");
|
||||
assertThat(res.getType()).isEqualTo(FileAttributes.FileType.Directory);
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteFileDirectory() throws IOException {
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folder/file.txt")), new ByteArrayInputStream("Hello".getBytes()));
|
||||
client.toBlocking().exchange(HttpRequest.DELETE("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/folder/file.txt", null));
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folder/file.txt")))).isFalse();
|
||||
void deleteFileDirectory() throws IOException, URISyntaxException {
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
Namespace namespaceStorage = namespaceFactory.of(TENANT_ID, namespace, storageInterface);
|
||||
namespaceStorage.putFile(Path.of("/folder/file.txt"), new ByteArrayInputStream("Hello".getBytes()));
|
||||
client.toBlocking().exchange(HttpRequest.DELETE("/api/v1/main/namespaces/" + namespace + "/files?path=/folder/file.txt", null));
|
||||
assertThat(namespaceStorage.exists(Path.of("/folder/file.txt"))).isFalse();
|
||||
// Zombie folders are deleted, but not the root folder
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folder")))).isFalse();
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, null))).isTrue();
|
||||
assertThat(namespaceStorage.exists(Path.of("/folder"))).isFalse();
|
||||
assertThat(namespaceStorage.exists(null)).isTrue();
|
||||
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folderWithMultipleFiles/file1.txt")), new ByteArrayInputStream("Hello".getBytes()));
|
||||
storageInterface.put(TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folderWithMultipleFiles/file2.txt")), new ByteArrayInputStream("Hello".getBytes()));
|
||||
client.toBlocking().exchange(HttpRequest.DELETE("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/folderWithMultipleFiles/file1.txt", null));
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folderWithMultipleFiles/file1.txt")))).isFalse();
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folderWithMultipleFiles/file2.txt")))).isTrue();
|
||||
// Since there is still one file in the folder, it should not be deleted
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folderWithMultipleFiles")))).isTrue();
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, null))).isTrue();
|
||||
namespaceStorage.putFile(Path.of("/folderWithMultipleFiles/file1.txt"), new ByteArrayInputStream("Hello".getBytes()));
|
||||
namespaceStorage.putFile(Path.of("/folderWithMultipleFiles/file2.txt"), new ByteArrayInputStream("Hello".getBytes()));
|
||||
client.toBlocking().exchange(HttpRequest.DELETE("/api/v1/main/namespaces/" + namespace + "/files?path=/folderWithMultipleFiles/file1.txt", null));
|
||||
assertThat(namespaceStorage.exists(Path.of("/folderWithMultipleFiles/file1.txt"))).isFalse();
|
||||
assertThat(namespaceStorage.exists(Path.of("/folderWithMultipleFiles/file2.txt"))).isTrue();
|
||||
assertThat(namespaceStorage.exists(Path.of("/folderWithMultipleFiles"))).isTrue();
|
||||
assertThat(namespaceStorage.exists(Path.of("/"))).isTrue();
|
||||
|
||||
client.toBlocking().exchange(HttpRequest.DELETE("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/folderWithMultipleFiles", null));
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, URI.create("/folderWithMultipleFiles/")))).isFalse();
|
||||
assertThat(storageInterface.exists(
|
||||
TENANT_ID, NAMESPACE, toNamespacedStorageUri(NAMESPACE, null))).isTrue();
|
||||
client.toBlocking().exchange(HttpRequest.DELETE("/api/v1/main/namespaces/" + namespace + "/files?path=/folderWithMultipleFiles", null));
|
||||
assertThat(namespaceStorage.exists(Path.of("/folderWithMultipleFiles/"))).isFalse();
|
||||
assertThat(namespaceStorage.exists(null)).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void forbiddenPaths() {
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/_flows/test.yml")));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/stats?path=/_flows/test.yml"), TestFileAttributes.class));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + NAMESPACE + "/files/directory?path=/_flows"), TestFileAttributes[].class));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/files?from=/_flows/test&to=/foo", null)));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/files?from=/foo&to=/_flows/test", null)));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().exchange(HttpRequest.DELETE("/api/v1/main/namespaces/" + NAMESPACE + "/files?path=/_flows/test.txt", null)));
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files?path=/_flows/test.yml")));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/stats?path=/_flows/test.yml"), TestFileAttributes.class));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().retrieve(HttpRequest.GET("/api/v1/main/namespaces/" + namespace + "/files/directory?path=/_flows"), TestFileAttributes[].class));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + namespace + "/files?from=/_flows/test&to=/foo", null)));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + namespace + "/files?from=/foo&to=/_flows/test", null)));
|
||||
assertForbiddenErrorThrown(() -> client.toBlocking().exchange(HttpRequest.DELETE("/api/v1/main/namespaces/" + namespace + "/files?path=/_flows/test.txt", null)));
|
||||
}
|
||||
|
||||
private void assertForbiddenErrorThrown(Executable executable) {
|
||||
@@ -317,4 +368,4 @@ class NamespaceFileControllerTest {
|
||||
long size;
|
||||
Map<String, String> metadata;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user