mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(core,webserver): multi file editor (#2234)
Add endpoint and method to storage interface to allow the new file editor to create/move/delete and edit files and directories in a new storage prefix.
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
|
||||
@JsonSerialize(as = FileAttributes.class)
|
||||
public interface FileAttributes {
|
||||
String getFileName();
|
||||
|
||||
long getLastModifiedTime();
|
||||
|
||||
long getCreationTime();
|
||||
|
||||
FileType getType();
|
||||
|
||||
long getSize();
|
||||
|
||||
enum FileType {
|
||||
File,
|
||||
Directory
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,10 @@ public interface StorageInterface {
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
InputStream get(String tenantId, URI uri) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
List<FileAttributes> list(String tenantId, URI uri) throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Whether the uri points to a file/object that exist in the internal storage.
|
||||
*
|
||||
@@ -46,18 +50,42 @@ public interface StorageInterface {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* Use {@link #getAttributes(URI)} instead of individual call for every attribute
|
||||
* @param uri
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
@Deprecated
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
Long size(String tenantId, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* Use {@link #getAttributes(URI)} instead of individual call for every attribute
|
||||
* @param uri
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
@Deprecated
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
Long lastModifiedTime(String tenantId, URI uri) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
FileAttributes getAttributes(String tenantId, URI uri) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class})
|
||||
URI put(String tenantId, URI uri, InputStream data) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class})
|
||||
boolean delete(String tenantId, URI uri) throws IOException;
|
||||
|
||||
URI createDirectory(String tenantId, URI uri) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
URI move(String tenantId, URI from, URI to) throws IOException;
|
||||
|
||||
@Retryable(includes = {IOException.class})
|
||||
List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException;
|
||||
|
||||
@@ -141,6 +169,13 @@ public interface StorageInterface {
|
||||
return String.join("/", paths);
|
||||
}
|
||||
|
||||
default String namespaceFilePrefix(String namespace) {
|
||||
return String.join("/", List.of(
|
||||
namespace,
|
||||
"files"
|
||||
));
|
||||
}
|
||||
|
||||
default Optional<String> extractExecutionId(URI path) {
|
||||
Pattern pattern = Pattern.compile("^/(.+)/executions/([^/]+)/", Pattern.CASE_INSENSITIVE);
|
||||
Matcher matcher = pattern.matcher(path.getPath());
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
package io.kestra.storage.local;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
|
||||
import javax.naming.directory.InvalidAttributesException;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
|
||||
import static io.kestra.core.storages.FileAttributes.FileType.*;
|
||||
|
||||
@Value
|
||||
@Builder
|
||||
public class LocalFileAttributes implements FileAttributes {
|
||||
String fileName;
|
||||
|
||||
BasicFileAttributes basicFileAttributes;
|
||||
|
||||
@Override
|
||||
public long getLastModifiedTime() {
|
||||
return basicFileAttributes.lastModifiedTime().toMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCreationTime() {
|
||||
return basicFileAttributes.creationTime().toMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileType getType() {
|
||||
if (basicFileAttributes.isRegularFile()) {
|
||||
return File;
|
||||
} else if (basicFileAttributes.isDirectory()) {
|
||||
return Directory;
|
||||
} else {
|
||||
throw new RuntimeException("Unknown type for file %s".formatted(fileName));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSize() {
|
||||
return basicFileAttributes.size();
|
||||
}
|
||||
}
|
||||
@@ -1,21 +1,23 @@
|
||||
package io.kestra.storage.local;
|
||||
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.*;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.nio.file.attribute.FileTime;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Singleton
|
||||
@LocalStorageEnabled
|
||||
@@ -32,21 +34,11 @@ public class LocalStorage implements StorageInterface {
|
||||
}
|
||||
|
||||
private Path getPath(String tenantId, URI uri) {
|
||||
parentTraversalGuard(uri);
|
||||
return tenantId == null ? Paths.get(config.getBasePath().toAbsolutePath().toString(), uri.toString())
|
||||
: Paths.get(config.getBasePath().toAbsolutePath().toString(), tenantId, uri.toString());
|
||||
}
|
||||
|
||||
private void createDirectory(String tenantId, URI append) {
|
||||
Path path = getPath(tenantId, append);
|
||||
File directory = path.getParent().toFile();
|
||||
|
||||
if (!directory.exists()) {
|
||||
if (!directory.mkdirs()) {
|
||||
throw new RuntimeException("Cannot create directory: " + directory.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream get(String tenantId, URI uri) throws IOException {
|
||||
return new BufferedInputStream(new FileInputStream(getPath(tenantId, URI.create(uri.getPath()))
|
||||
@@ -60,6 +52,24 @@ public class LocalStorage implements StorageInterface {
|
||||
return Files.exists(getPath(tenantId, uri));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
|
||||
try (Stream<Path> stream = Files.list(getPath(tenantId, URI.create(uri.getPath())))) {
|
||||
return stream
|
||||
.map(throwFunction(file -> {
|
||||
URI relative = URI.create(
|
||||
getPath(tenantId, URI.create("")).relativize(
|
||||
Path.of(file.toUri())
|
||||
).toString()
|
||||
);
|
||||
return getAttributes(tenantId, relative);
|
||||
}))
|
||||
.toList();
|
||||
} catch (NoSuchFileException e) {
|
||||
throw new FileNotFoundException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long size(String tenantId, URI uri) throws IOException {
|
||||
try {
|
||||
@@ -73,33 +83,84 @@ public class LocalStorage implements StorageInterface {
|
||||
|
||||
@Override
|
||||
public Long lastModifiedTime(String tenantId, URI uri) throws IOException {
|
||||
FileTime lastModifiedTime = Files.getLastModifiedTime(getPath(tenantId, uri));
|
||||
FileTime lastModifiedTime;
|
||||
try {
|
||||
lastModifiedTime = Files.getLastModifiedTime(getPath(tenantId, uri));
|
||||
} catch (NoSuchFileException e) {
|
||||
throw new FileNotFoundException(e.getMessage());
|
||||
}
|
||||
return lastModifiedTime.toMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI put(String tenantId, URI uri, InputStream data) throws IOException {
|
||||
this.createDirectory(tenantId, uri);
|
||||
File file = getPath(tenantId, uri).toFile();
|
||||
File parent = file.getParentFile();
|
||||
if (!parent.exists() && !parent.mkdirs()) {
|
||||
throw new RuntimeException("Cannot create directory: " + parent.getAbsolutePath());
|
||||
}
|
||||
|
||||
try (data; OutputStream outStream = new FileOutputStream(getPath(tenantId, uri).toFile())) {
|
||||
try (data; OutputStream outStream = new FileOutputStream(file)) {
|
||||
byte[] buffer = new byte[8 * 1024];
|
||||
int bytesRead;
|
||||
while ((bytesRead = data.read(buffer)) != -1) {
|
||||
outStream.write(buffer, 0, bytesRead);
|
||||
}
|
||||
}
|
||||
|
||||
return URI.create("kestra://" + uri.getPath());
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileAttributes getAttributes(String tenantId, URI uri) throws IOException {
|
||||
BasicFileAttributes basicFileAttributes;
|
||||
Path path = getPath(tenantId, uri);
|
||||
try {
|
||||
basicFileAttributes = Files.readAttributes(path, BasicFileAttributes.class);
|
||||
} catch (NoSuchFileException e) {
|
||||
throw new FileNotFoundException(e.getMessage());
|
||||
}
|
||||
return LocalFileAttributes.builder()
|
||||
.fileName(path.getFileName().toString())
|
||||
.basicFileAttributes(basicFileAttributes)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI createDirectory(String tenantId, URI uri) {
|
||||
if (uri == null || uri.getPath().isEmpty()) {
|
||||
throw new IllegalArgumentException("Unable to create a directory with empty url.");
|
||||
}
|
||||
File file = getPath(tenantId, uri).toFile();
|
||||
if (!file.exists() && !file.mkdirs()) {
|
||||
throw new RuntimeException("Cannot create directory: " + file.getAbsolutePath());
|
||||
}
|
||||
return URI.create("kestra://" + uri.getPath());
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI move(String tenantId, URI from, URI to) throws IOException {
|
||||
try {
|
||||
Files.move(
|
||||
getPath(tenantId, from),
|
||||
getPath(tenantId, to),
|
||||
StandardCopyOption.ATOMIC_MOVE);
|
||||
} catch (NoSuchFileException e) {
|
||||
throw new FileNotFoundException(e.getMessage());
|
||||
}
|
||||
return URI.create("kestra://" + to.getPath());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(String tenantId, URI uri) throws IOException {
|
||||
File file = getPath(tenantId, URI.create(uri.getPath())).toFile();
|
||||
if (!file.exists()) {
|
||||
return false;
|
||||
Path path = getPath(tenantId, URI.create(uri.getPath()));
|
||||
File file = path.toFile();
|
||||
|
||||
if(file.isDirectory()) {
|
||||
FileUtils.deleteDirectory(file);
|
||||
return true;
|
||||
}
|
||||
|
||||
return file.delete();
|
||||
return Files.deleteIfExists(path);
|
||||
}
|
||||
|
||||
@SuppressWarnings("ResultOfMethodCallIgnored")
|
||||
@@ -120,4 +181,10 @@ public class LocalStorage implements StorageInterface {
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
private void parentTraversalGuard(URI uri) {
|
||||
if (uri.toString().contains("..")) {
|
||||
throw new IllegalArgumentException("File should be accessed with their full path and not using relative '..' path.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,22 +6,21 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.models.triggers.types.Schedule;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.tasks.log.Log;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
@@ -62,6 +61,11 @@ class LocalStorageTest {
|
||||
assertThat(CharStreams.toString(new InputStreamReader(getScheme)), is(content));
|
||||
}
|
||||
|
||||
@Test
|
||||
void getNoTraversal() throws Exception {
|
||||
assertThrows(IllegalArgumentException.class, () -> storageInterface.get(null, new URI("/storage/level1/..")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void missing() {
|
||||
String prefix = IdUtils.create();
|
||||
@@ -101,6 +105,10 @@ class LocalStorageTest {
|
||||
storageInterface.get(null, new URI("/" + prefix + "/storage/put.yml"));
|
||||
});
|
||||
}
|
||||
@Test
|
||||
void putNoTraversal() throws Exception {
|
||||
assertThrows(IllegalArgumentException.class, () -> storageInterface.put(null, new URI("/storage/level1/.."), null));
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteByPrefix() throws Exception {
|
||||
@@ -140,6 +148,11 @@ class LocalStorageTest {
|
||||
assertThat(deleted.size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteByPrefixNoTraversal() throws Exception {
|
||||
assertThrows(IllegalArgumentException.class, () -> storageInterface.deleteByPrefix(null, new URI("/storage/level1/..")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void executionPrefix() {
|
||||
var flow = Flow.builder().id("flow").namespace("namespace").build();
|
||||
@@ -201,4 +214,154 @@ class LocalStorageTest {
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix.toString(), is("///namespace/flow/executions/execution/trigger/trigger"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void list() throws Exception {
|
||||
String prefix = IdUtils.create();
|
||||
|
||||
URL resource = LocalStorageTest.class.getClassLoader().getResource("application.yml");
|
||||
|
||||
List<String> path = Arrays.asList(
|
||||
"/" + prefix + "/storage/root.yml",
|
||||
"/" + prefix + "/storage/root2.yml",
|
||||
"/" + prefix + "/storage/level1/1.yml",
|
||||
"/" + prefix + "/storage/level1/level2/1.yml"
|
||||
);
|
||||
path.forEach(throwConsumer(s -> this.putFile(resource, s)));
|
||||
|
||||
List<FileAttributes> files = storageInterface.list(null, new URI("/" + prefix + "/storage/"));
|
||||
|
||||
assertThat(files,
|
||||
containsInAnyOrder(
|
||||
allOf(
|
||||
hasProperty("fileName", is("root.yml")),
|
||||
hasProperty("type", is(FileAttributes.FileType.File))
|
||||
),
|
||||
allOf(
|
||||
hasProperty("fileName", is("root2.yml")),
|
||||
hasProperty("type", is(FileAttributes.FileType.File))
|
||||
),
|
||||
allOf(
|
||||
hasProperty("fileName", is("level1")),
|
||||
hasProperty("type", is(FileAttributes.FileType.Directory))
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void listNoTraversal() throws Exception {
|
||||
assertThrows(IllegalArgumentException.class, () -> storageInterface.list(null, new URI("/storage/level1/..")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void list_NotFound() {
|
||||
assertThrows(FileNotFoundException.class, () -> storageInterface.list(null, new URI("/unknown.yml")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void getAttributes() throws Exception {
|
||||
String prefix = IdUtils.create();
|
||||
|
||||
URL resource = LocalStorageTest.class.getClassLoader().getResource("application.yml");
|
||||
|
||||
List<String> path = Arrays.asList(
|
||||
"/" + prefix + "/storage/root.yml",
|
||||
"/" + prefix + "/storage/level1/1.yml"
|
||||
);
|
||||
path.forEach(throwConsumer(s -> this.putFile(resource, s)));
|
||||
FileAttributes rootyml = storageInterface.getAttributes(null, new URI("/" + prefix + "/storage/root.yml"));
|
||||
assertThat(rootyml.getFileName(), is("root.yml"));
|
||||
assertThat(rootyml.getType(), is(FileAttributes.FileType.File));
|
||||
|
||||
FileAttributes level1 = storageInterface.getAttributes(null, new URI("/" + prefix + "/storage/level1"));
|
||||
assertThat(level1.getFileName(), is("level1"));
|
||||
assertThat(level1.getType(), is(FileAttributes.FileType.Directory));
|
||||
}
|
||||
|
||||
@Test
|
||||
void getAttributesNoTraversal() throws Exception {
|
||||
assertThrows(IllegalArgumentException.class, () -> storageInterface.getAttributes(null, new URI("/storage/level1/..")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void getAttributes_NotFound() {
|
||||
assertThrows(FileNotFoundException.class, () -> storageInterface.getAttributes(null, new URI("/unknown.yml")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void createDirectory() throws URISyntaxException, IOException {
|
||||
String prefix = IdUtils.create();
|
||||
storageInterface.createDirectory(null, new URI("/" + prefix + "/storage"));
|
||||
FileAttributes level1 = storageInterface.getAttributes(null, new URI("/" + prefix + "/storage"));
|
||||
assertThat(level1.getFileName(), is("storage"));
|
||||
assertThat(level1.getType(), is(FileAttributes.FileType.Directory));
|
||||
}
|
||||
|
||||
@Test
|
||||
void createDirectoryNoTraversal() throws Exception {
|
||||
assertThrows(IllegalArgumentException.class, () -> storageInterface.createDirectory(null, new URI("/storage/level1/..")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void move() throws Exception {
|
||||
String prefix = IdUtils.create();
|
||||
|
||||
URL resource = LocalStorageTest.class.getClassLoader().getResource("application.yml");
|
||||
|
||||
List<String> path = Arrays.asList(
|
||||
"/" + prefix + "/storage/root.yml",
|
||||
"/" + prefix + "/storage/root2.yml",
|
||||
"/" + prefix + "/storage/level1/1.yml",
|
||||
"/" + prefix + "/storage/level1/level2/1.yml"
|
||||
);
|
||||
path.forEach(throwConsumer(s -> this.putFile(resource, s)));
|
||||
|
||||
storageInterface.move(null, new URI("/" + prefix + "/storage/level1"), new URI("/" + prefix + "/storage/lvl1"));
|
||||
FileAttributes level1 = storageInterface.getAttributes(null, new URI("/" + prefix + "/storage/lvl1"));
|
||||
assertThat(level1.getFileName(), is("lvl1"));
|
||||
assertThat(level1.getType(), is(FileAttributes.FileType.Directory));
|
||||
|
||||
assertThat(storageInterface.exists(null, new URI("/" + prefix + "/storage/lvl1/1.yml")), is(true));
|
||||
assertThat(storageInterface.exists(null, new URI("/" + prefix + "/storage/level1")), is(false));
|
||||
assertThat(storageInterface.exists(null, new URI("/" + prefix + "/storage/level1/1.yml")), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
void moveNoTraversal() throws Exception {
|
||||
assertThrows(IllegalArgumentException.class, () -> storageInterface.move(null, new URI("/storage/level1/.."), new URI("/storage/level1/")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void move_NotFound() {
|
||||
assertThrows(FileNotFoundException.class, () -> storageInterface.move(null, new URI("/unknown.yml"), new URI("/unknown2.yml")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void delete() throws Exception {
|
||||
String prefix = IdUtils.create();
|
||||
|
||||
URL resource = LocalStorageTest.class.getClassLoader().getResource("application.yml");
|
||||
|
||||
List<String> path = Arrays.asList(
|
||||
"/" + prefix + "/storage/level1/1.yml",
|
||||
"/" + prefix + "/storage/level1/level2/1.yml"
|
||||
);
|
||||
path.forEach(throwConsumer(s -> this.putFile(resource, s)));
|
||||
storageInterface.delete(null, new URI("/" + prefix + "/storage/level1"));
|
||||
|
||||
assertThat(storageInterface.exists(null, new URI("/" + prefix + "/storage/level1")), is(false));
|
||||
assertThat(storageInterface.exists(null, new URI("/" + prefix + "/storage/level1/1.yml")), is(false));
|
||||
assertThat(storageInterface.exists(null, new URI("/" + prefix + "/storage/level1/level2")), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteNoTraversal() throws Exception {
|
||||
assertThrows(IllegalArgumentException.class, () -> storageInterface.delete(null, new URI("/storage/level1/..")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void delete_NotFound_DoesNotThrow() throws URISyntaxException, IOException {
|
||||
storageInterface.delete(null, new URI("/unknown.yml"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,115 @@
|
||||
package io.kestra.webserver.controllers;
|
||||
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.annotation.*;
|
||||
import io.micronaut.http.multipart.CompletedFileUpload;
|
||||
import io.micronaut.http.server.types.files.StreamedFile;
|
||||
import io.micronaut.scheduling.TaskExecutors;
|
||||
import io.micronaut.scheduling.annotation.ExecuteOn;
|
||||
import io.micronaut.validation.Validated;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Slf4j
|
||||
@Validated
|
||||
@Controller("/api/v1/files/namespaces")
|
||||
public class NamespaceFileController {
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
@Inject
|
||||
private TenantService tenantService;
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Get(uri = "{namespace}", produces = MediaType.APPLICATION_OCTET_STREAM)
|
||||
@Operation(tags = {"Files"}, summary = "Get namespace file content")
|
||||
public StreamedFile file(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri") @QueryValue URI path
|
||||
) throws IOException, URISyntaxException {
|
||||
InputStream fileHandler = storageInterface.get(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path));
|
||||
return new StreamedFile(fileHandler, MediaType.APPLICATION_OCTET_STREAM_TYPE);
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Get(uri = "{namespace}/stats", produces = MediaType.TEXT_JSON)
|
||||
@Operation(tags = {"Files"}, summary = "Get namespace file stats such as size, creation & modification dates and type")
|
||||
public FileAttributes stats(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri") @Nullable @QueryValue URI path
|
||||
) throws IOException, URISyntaxException {
|
||||
try {
|
||||
return storageInterface.getAttributes(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path));
|
||||
} catch (NoSuchFileException e) {
|
||||
throw new FileNotFoundException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Get(uri = "{namespace}/directory", produces = MediaType.TEXT_JSON)
|
||||
@Operation(tags = {"Files"}, summary = "List directory content")
|
||||
public List<FileAttributes> list(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri") @Nullable @QueryValue URI path
|
||||
) throws IOException, URISyntaxException {
|
||||
return storageInterface.list(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path));
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Post(uri = "{namespace}/directory")
|
||||
@Operation(tags = {"Files"}, summary = "Create a directory")
|
||||
public void createDirectory(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri") @Nullable @QueryValue URI path
|
||||
) throws IOException, URISyntaxException {
|
||||
storageInterface.createDirectory(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path));
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Post(uri = "{namespace}", consumes = MediaType.MULTIPART_FORM_DATA)
|
||||
@Operation(tags = {"Files"}, summary = "Create a file")
|
||||
public void createFile(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri") @QueryValue URI path,
|
||||
@Part CompletedFileUpload fileContent
|
||||
) throws IOException, URISyntaxException {
|
||||
storageInterface.put(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path), new BufferedInputStream(fileContent.getInputStream()));
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Put(uri = "{namespace}")
|
||||
@Operation(tags = {"Files"}, summary = "Move a file or directory")
|
||||
public void move(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri to move from") @QueryValue URI from,
|
||||
@Parameter(description = "The internal storage uri to move to") @QueryValue URI to
|
||||
) throws IOException, URISyntaxException {
|
||||
storageInterface.move(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, from), toNamespacedStorageUri(namespace, to));
|
||||
}
|
||||
|
||||
@ExecuteOn(TaskExecutors.IO)
|
||||
@Delete(uri = "{namespace}")
|
||||
@Operation(tags = {"Files"}, summary = "Delete a file or directory")
|
||||
public void delete(
|
||||
@Parameter(description = "The namespace id") @PathVariable String namespace,
|
||||
@Parameter(description = "The internal storage uri of the file / directory to delete") @QueryValue URI path
|
||||
) throws IOException, URISyntaxException {
|
||||
storageInterface.delete(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path));
|
||||
}
|
||||
|
||||
private URI toNamespacedStorageUri(String namespace, @Nullable URI relativePath) {
|
||||
return URI.create(storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse(""));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,136 @@
|
||||
package io.kestra.webserver.controllers;
|
||||
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.client.annotation.Client;
|
||||
import io.micronaut.http.client.multipart.MultipartBody;
|
||||
import io.micronaut.rxjava2.http.client.RxHttpClient;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
@MicronautTest
|
||||
class NamespaceFileControllerTest {
|
||||
@Inject
|
||||
@Client("/")
|
||||
RxHttpClient client;
|
||||
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
@Test
|
||||
void file() throws IOException {
|
||||
String prefix = "/" + IdUtils.create();
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("namespace", URI.create(prefix)));
|
||||
String hw = "Hello World";
|
||||
storageInterface.put(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/test.txt")), new ByteArrayInputStream(hw.getBytes()));
|
||||
String res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/files/namespaces/io.namespace/?path=" + prefix + "/test.txt"));
|
||||
assertThat(res, is(hw));
|
||||
}
|
||||
|
||||
@Test
|
||||
void stats() throws IOException {
|
||||
String prefix = "/" + IdUtils.create();
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("namespace", URI.create(prefix)));
|
||||
|
||||
String hw = "Hello World";
|
||||
storageInterface.put(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/test.txt")), new ByteArrayInputStream(hw.getBytes()));
|
||||
FileAttributes res = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/files/namespaces/io.namespace/stats?path=" + prefix + "/test.txt"), TestFileAttributes.class);
|
||||
assertThat(res.getFileName(), is("test.txt"));
|
||||
assertThat(res.getType(), is(FileAttributes.FileType.File));
|
||||
}
|
||||
|
||||
@Test
|
||||
void list() throws IOException {
|
||||
String prefix = "/" + IdUtils.create();
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("io.namespace", URI.create(prefix)));
|
||||
|
||||
String hw = "Hello World";
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/test")));
|
||||
storageInterface.put(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/test/test.txt")), new ByteArrayInputStream(hw.getBytes()));
|
||||
storageInterface.put(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/test/test2.txt")), new ByteArrayInputStream(hw.getBytes()));
|
||||
List<FileAttributes> res = List.of(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/files/namespaces/io.namespace/directory?path=" + prefix + "/test"), TestFileAttributes[].class));
|
||||
assertThat(res.stream().map(FileAttributes::getFileName).toList(), Matchers.containsInAnyOrder("test.txt", "test2.txt"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void createDirectory() throws IOException {
|
||||
String prefix = "/" + IdUtils.create();
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("io.namespace", URI.create(prefix)));
|
||||
|
||||
client.toBlocking().exchange(HttpRequest.POST("/api/v1/files/namespaces/io.namespace/directory?path=" + prefix + "/test", null));
|
||||
FileAttributes res = storageInterface.getAttributes(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/test")));
|
||||
assertThat(res.getFileName(), is("test"));
|
||||
assertThat(res.getType(), is(FileAttributes.FileType.Directory));
|
||||
}
|
||||
|
||||
@Test
|
||||
void createFile() throws IOException {
|
||||
String prefix = "/" + IdUtils.create();
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("io.namespace", URI.create(prefix)));
|
||||
MultipartBody body = MultipartBody.builder()
|
||||
.addPart("fileContent", "test.txt", "Hello".getBytes())
|
||||
.build();
|
||||
client.toBlocking().exchange(
|
||||
HttpRequest.POST("/api/v1/files/namespaces/io.namespace?path=" + prefix + "/test.txt", body)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
|
||||
);
|
||||
InputStream inputStream = storageInterface.get(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/test.txt")));
|
||||
String content = new String(inputStream.readAllBytes());
|
||||
assertThat(content, is("Hello"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void move() throws IOException {
|
||||
String prefix = "/" + IdUtils.create();
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("io.namespace", URI.create(prefix)));
|
||||
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/test")));
|
||||
client.toBlocking().exchange(HttpRequest.PUT("/api/v1/files/namespaces/io.namespace?from=" + prefix + "/test&to=" + prefix + "/foo", null));
|
||||
FileAttributes res = storageInterface.getAttributes(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/foo")));
|
||||
assertThat(res.getFileName(), is("foo"));
|
||||
assertThat(res.getType(), is(FileAttributes.FileType.Directory));
|
||||
}
|
||||
|
||||
@Test
|
||||
void delete() throws IOException {
|
||||
String prefix = "/" + IdUtils.create();
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("namespace", URI.create(prefix)));
|
||||
|
||||
storageInterface.createDirectory(null, toNamespacedStorageUri("namespace", URI.create(prefix + "/test")));
|
||||
client.toBlocking().exchange(HttpRequest.DELETE("/api/v1/files/namespaces/io.namespace?path=" + prefix + "/test", null));
|
||||
boolean res = storageInterface.exists(null, toNamespacedStorageUri("io.namespace", URI.create(prefix + "/test")));
|
||||
assertThat(res, is(false));
|
||||
}
|
||||
|
||||
private URI toNamespacedStorageUri(String namespace, @Nullable URI relativePath) {
|
||||
return URI.create(storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse(""));
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public static class TestFileAttributes implements FileAttributes {
|
||||
String fileName;
|
||||
long lastModifiedTime;
|
||||
long creationTime;
|
||||
FileType type;
|
||||
long size;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user