chore(core): add new interface for accessing Kestra storage

This commit provides a cleaner way to access kestra storage through
RunContext
This commit is contained in:
Florian Hussonnois
2024-01-25 14:03:50 +01:00
committed by Florian Hussonnois
parent fa1479e3e5
commit 9e1b6601be
22 changed files with 1215 additions and 498 deletions

View File

@@ -2,6 +2,7 @@ package io.kestra.core.runners;
import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
@@ -67,7 +68,7 @@ public class NamespaceFilesService {
}
private URI uri(String namespace, @Nullable URI path) {
return URI.create(storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(path)
return URI.create(StorageContext.namespaceFilePrefix(namespace) + Optional.ofNullable(path)
.map(URI::getPath)
.orElse("")
);

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap;
@@ -17,9 +18,11 @@ import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import lombok.NoArgsConstructor;
@@ -27,11 +30,20 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.*;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -46,17 +58,13 @@ public class RunContext {
private MetricRegistry meterRegistry;
private Path tempBasedPath;
private RunContextCache runContextCache;
private URI storageOutputPrefix;
private URI storageExecutionPrefix;
private Map<String, Object> variables;
private List<AbstractMetricEntry<?>> metrics = new ArrayList<>();
private RunContextLogger runContextLogger;
private final List<WorkerTaskResult> dynamicWorkerTaskResult = new ArrayList<>();
protected transient Path temporaryDirectory;
private String triggerExecutionId;
private Storage storage;
/**
* Only used by {@link io.kestra.core.models.triggers.types.Flow}
@@ -82,8 +90,8 @@ public class RunContext {
*/
public RunContext(ApplicationContext applicationContext, Flow flow, Task task, Execution execution, TaskRun taskRun) {
this.initBean(applicationContext);
this.initContext(flow, task, execution, taskRun);
this.initLogger(taskRun, task);
this.initContext(flow, task, execution, taskRun);
}
/**
@@ -107,13 +115,21 @@ public class RunContext {
@VisibleForTesting
public RunContext(ApplicationContext applicationContext, Map<String, Object> variables) {
this.initBean(applicationContext);
this.variables = new HashMap<>();
this.variables.putAll(this.variables(null, null, null, null, null));
this.variables.putAll(variables);
this.storageOutputPrefix = URI.create("");
this.runContextLogger = new RunContextLogger();
this.storage = new InternalStorage(
logger(),
new StorageContext() {
@Override
public URI getContextStorageURI() {
return URI.create("");
}
},
storageInterface
);
}
protected void initBean(ApplicationContext applicationContext) {
@@ -130,8 +146,13 @@ public class RunContext {
private void initContext(Flow flow, Task task, Execution execution, TaskRun taskRun) {
this.variables = this.variables(flow, task, execution, taskRun, null);
if (taskRun != null && this.storageInterface != null) {
this.storageOutputPrefix = this.storageInterface.outputPrefix(flow, task, execution, taskRun);
this.storage = new InternalStorage(
logger(),
StorageContext.forTask(taskRun),
storageInterface
);
}
}
@@ -196,11 +217,6 @@ public class RunContext {
return variables;
}
@SuppressWarnings("unused")
public URI getStorageOutputPrefix() {
return storageOutputPrefix;
}
@JsonIgnore
public ApplicationContext getApplicationContext() {
return applicationContext;
@@ -374,8 +390,7 @@ public class RunContext {
runContext.variableRenderer = this.variableRenderer;
runContext.applicationContext = this.applicationContext;
runContext.storageInterface = this.storageInterface;
runContext.storageOutputPrefix = this.storageOutputPrefix;
runContext.storageExecutionPrefix = this.storageExecutionPrefix;
runContext.storage = this.storage;
runContext.variables = variables;
runContext.metrics = new ArrayList<>();
runContext.meterRegistry = this.meterRegistry;
@@ -388,20 +403,33 @@ public class RunContext {
public RunContext forScheduler(TriggerContext triggerContext, AbstractTrigger trigger) {
this.triggerExecutionId = IdUtils.create();
this.storageOutputPrefix = this.storageInterface.outputPrefix(triggerContext, trigger, triggerExecutionId);
StorageContext context = StorageContext.forTrigger(
triggerContext.getTenantId(),
triggerContext.getNamespace(),
triggerContext.getFlowId(),
triggerExecutionId,
trigger.getId()
);
this.storage = new InternalStorage(
logger(),
context,
storageInterface
);
return this;
}
@SuppressWarnings("unchecked")
public RunContext forWorker(ApplicationContext applicationContext, WorkerTask workerTask) {
this.initBean(applicationContext);
this.initLogger(workerTask.getTaskRun(), workerTask.getTask());
final TaskRun taskRun = workerTask.getTaskRun();
this.initLogger(taskRun, workerTask.getTask());
Map<String, Object> clone = new HashMap<>(this.variables);
clone.remove("taskrun");
clone.put("taskrun", this.variables(workerTask.getTaskRun()));
clone.put("taskrun", this.variables(taskRun));
clone.remove("task");
clone.put("task", this.variables(workerTask.getTask()));
@@ -417,8 +445,7 @@ public class RunContext {
}
this.variables = ImmutableMap.copyOf(clone);
this.storageExecutionPrefix = URI.create("/" + this.storageInterface.executionPrefix(workerTask.getTaskRun()));
this.storage = new InternalStorage(logger(), StorageContext.forTask(taskRun), storageInterface);
return this;
}
@@ -499,31 +526,46 @@ public class RunContext {
return runContextLogger.logger();
}
/**
* Gets a {@link InputStream} for the given file URI.
*
* @param uri the file URI.
* @return the {@link InputStream}.
* @throws IOException
* @deprecated use {@link Storage#getFile(URI)}.
*/
@Deprecated
public InputStream uriToInputStream(URI uri) throws IOException {
if (uri == null) {
throw new IllegalArgumentException("Invalid internal storage uri, got null");
}
return this.storage.getFile(uri);
}
if (uri.getScheme() == null) {
throw new IllegalArgumentException("Invalid internal storage uri, got uri '" + uri + "'");
}
// for serialization backward-compatibility
@JsonIgnore
public URI getStorageOutputPrefix() {
return storage.getContextBaseURI();
}
if (uri.getScheme().equals("kestra")) {
return this.storageInterface.get(tenantId(), uri);
}
throw new IllegalArgumentException("Invalid internal storage scheme, got uri '" + uri + "'");
/**
* Gets access to the Kestra's storage.
*
* @return a {@link Storage} object.
*/
public Storage storage() {
return storage;
}
/**
* Put the temporary file on storage and delete it after.
*
* @param file the temporary file to upload to storage
* @return the {@code StorageObject} created
* @param file the temporary file to upload to storage
* @return the {@code StorageObject} created
* @throws IOException If the temporary file can't be read
*
* @deprecated use {@link #storage()} and {@link InternalStorage#putFile(File)}.
*/
@Deprecated
public URI putTempFile(File file) throws IOException {
return this.putTempFile(file, this.storageOutputPrefix.toString(), (String) null);
return this.storage.putFile(file);
}
/**
@@ -533,166 +575,52 @@ public class RunContext {
* @param name overwrite file name
* @return the {@code StorageObject} created
* @throws IOException If the temporary file can't be read
*
* @deprecated use {@link #storage()} and {@link InternalStorage#putFile(File, String)}.
*/
@Deprecated
public URI putTempFile(File file, String name) throws IOException {
return this.putTempFile(file, this.storageOutputPrefix.toString(), name);
}
/**
* Put the temporary file on storage and delete it after.
* This method is meant to be used by polling triggers, the name of the destination file is derived from the
* executionId and the trigger passed as parameters.
*
* @param file the temporary file to upload to storage
* @param executionId overwrite file name
* @param trigger the trigger
* @return the {@code StorageObject} created
* @throws IOException If the temporary file can't be read
*/
public URI putTempFile(File file, String executionId, AbstractTrigger trigger) throws IOException {
return this.putTempFile(
file,
this.storageOutputPrefix.toString() + "/" + String.join(
"/",
Arrays.asList(
"executions",
executionId,
"trigger",
Slugify.of(trigger.getId())
)
),
(String) null
);
}
private URI putTempFile(InputStream inputStream, String prefix, String name) throws IOException {
URI uri = URI.create(prefix);
URI resolve = uri.resolve(uri.getPath() + "/" + name);
return this.storageInterface.put(tenantId(), resolve, new BufferedInputStream(inputStream));
}
private URI putTempFile(File file, String prefix, String name) throws IOException {
try (InputStream fileInput = new FileInputStream(file)) {
return this.putTempFile(fileInput, prefix, (name != null ? name : file.getName()));
} finally {
try {
Files.delete(file.toPath());
} catch (IOException e) {
runContextLogger.logger().warn("Failed to delete temporary file '{}'", file.toPath(), e);
}
}
}
@SuppressWarnings("unchecked")
private String taskStateFilePathPrefix(String name, Boolean isNamespace, Boolean useTaskRun) {
Map<String, String> taskrun = (Map<String, String>) this.getVariables().get("taskrun");
Map<String, String> flow = (Map<String, String>) this.getVariables().get("flow");
return "/" + this.storageInterface.statePrefix(
flow.get("namespace"),
isNamespace ? null : flow.get("id"),
name,
taskrun != null && useTaskRun ? taskrun.getOrDefault("value", null) : null
);
return this.storage.putFile(file, name);
}
@Deprecated
public InputStream getTaskStateFile(String state, String name) throws IOException {
return this.getTaskStateFile(state, name, false, true);
return this.storage.getTaskStateFile(state, name);
}
@Deprecated
public InputStream getTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
URI uri = URI.create(this.taskStateFilePathPrefix(state, isNamespace, useTaskRun));
URI resolve = uri.resolve(uri.getPath() + "/" + name);
return this.storageInterface.get(tenantId(), resolve);
return this.storage.getTaskStateFile(state, name, isNamespace, useTaskRun);
}
@Deprecated
public URI putTaskStateFile(byte[] content, String state, String name) throws IOException {
return this.putTaskStateFile(content, state, name, false, true);
return this.storage.putTaskStateFile(content, state, name);
}
@Deprecated
public URI putTaskStateFile(byte[] content, String state, String name, Boolean namespace, Boolean useTaskRun) throws IOException {
try (InputStream inputStream = new ByteArrayInputStream(content)) {
return this.putTempFile(
inputStream,
this.taskStateFilePathPrefix(state, namespace, useTaskRun),
name
);
}
return this.storage.putTaskStateFile(content, state, name, namespace, useTaskRun);
}
@Deprecated
public URI putTaskStateFile(File file, String state, String name) throws IOException {
return this.putTaskStateFile(file, state, name, false, true);
return this.storage.putTaskStateFile(file, state, name);
}
@Deprecated
public URI putTaskStateFile(File file, String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
return this.putTempFile(
file,
this.taskStateFilePathPrefix(state, isNamespace, useTaskRun),
name
);
return this.storage.putTaskStateFile(file, state, name, isNamespace, useTaskRun);
}
@Deprecated
public boolean deleteTaskStateFile(String state, String name) throws IOException {
return this.deleteTaskStateFile(state, name, false, true);
return this.storage.deleteTaskStateFile(state, name);
}
@Deprecated
public boolean deleteTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
URI uri = URI.create(this.taskStateFilePathPrefix(state, isNamespace, useTaskRun));
URI resolve = uri.resolve(uri.getPath() + "/" + name);
return this.storageInterface.delete(tenantId(), resolve);
}
/**
* Get from the internal storage the cache file corresponding to this task.
* If the cache file didn't exist, an empty Optional is returned.
*
* @param namespace the flow namespace
* @param flowId the flow identifier
* @param taskId the task identifier
* @param value optional, the task run value
*
* @return an Optional with the cache input stream or empty.
*/
public Optional<InputStream> getTaskCacheFile(String namespace, String flowId, String taskId, String value) throws IOException {
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
return this.storageInterface.exists(tenantId(), uri) ? Optional.of(this.storageInterface.get(tenantId(), uri)) : Optional.empty();
}
public Optional<Long> getTaskCacheFileLastModifiedTime(String namespace, String flowId, String taskId, String value) throws IOException {
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
return this.storageInterface.exists(tenantId(), uri) ? Optional.of(this.storageInterface.getAttributes(tenantId(), uri).getLastModifiedTime()) : Optional.empty();
}
/**
* Put into the internal storage the cache file corresponding to this task.
*
* @param file the cache as a ZIP archive
* @param namespace the flow namespace
* @param flowId the flow identifier
* @param taskId the task identifier
* @param value optional, the task run value
*
* @return the URI of the file inside the internal storage.
*/
public URI putTaskCacheFile(File file, String namespace, String flowId, String taskId, String value) throws IOException {
return this.putTempFile(
file,
"/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value),
"cache.zip"
);
}
public Optional<Boolean> deleteTaskCacheFile(String namespace, String flowId, String taskId, String value) throws IOException {
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
return this.storageInterface.exists(tenantId(), uri) ? Optional.of(this.storageInterface.delete(tenantId(), uri)) : Optional.empty();
}
public List<URI> purgeStorageExecution() throws IOException {
return this.storageInterface.deleteByPrefix(tenantId(), this.storageExecutionPrefix);
return this.storage.deleteTaskStateFile(state, name, isNamespace, useTaskRun);
}
public List<AbstractMetricEntry<?>> metrics() {

View File

@@ -74,7 +74,7 @@ public class RunnerUtils {
throw new RuntimeException("Can't upload");
}
URI from = storageInterface.from(flow, execution, file.getFilename(), tempFile);
URI from = storageInterface.from(execution, file.getFilename(), tempFile);
//noinspection ResultOfMethodCallIgnored
tempFile.delete();
@@ -211,7 +211,7 @@ public class RunnerUtils {
} else {
return Optional.of(new AbstractMap.SimpleEntry<>(
input.getName(),
storageInterface.from(flow, execution, input, new File(((String) current)))
storageInterface.from(execution, input.getName(), new File(((String) current)))
));
}
} catch (Exception e) {

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners.pebble.functions;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.pebbletemplates.pebble.error.PebbleException;
@@ -47,7 +48,7 @@ public class ReadFileFunction implements Function {
@SuppressWarnings("unchecked")
private String readFromNamespaceFile(EvaluationContext context, String path) throws IOException {
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
URI namespaceFile = URI.create(storageInterface.namespaceFilePrefix(flow.get("namespace")) + "/" + path);
URI namespaceFile = URI.create(StorageContext.namespaceFilePrefix(flow.get("namespace")) + "/" + path);
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), namespaceFile)) {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}

View File

@@ -17,6 +17,7 @@ import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.flows.WorkingDirectory;
import io.kestra.core.utils.GraphUtils;
@@ -271,8 +272,8 @@ public class ExecutionService {
}
if (purgeStorage) {
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), URI.create("kestra://" + storageInterface.executionPrefix(
execution))).size());
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), uri).size());
}
return (PurgeResult) builder.build();

View File

@@ -0,0 +1,308 @@
package io.kestra.core.storages;
import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
/**
* The default {@link Storage} implementation acting as a facade to the {@link StorageInterface}.
*/
public class InternalStorage implements Storage {
private static final Logger LOG = LoggerFactory.getLogger(InternalStorage.class);
private static final String PATH_SEPARATOR = "/";
private final Logger logger;
private final StorageContext context;
private final StorageInterface storage;
/**
* Creates a new {@link InternalStorage} instance.
*
* @param context The storage context.
* @param storage The storage to delegate operations.
*/
public InternalStorage(StorageContext context, StorageInterface storage) {
this(LOG, context, storage);
}
/**
* Creates a new {@link InternalStorage} instance.
*
* @param logger The logger to be used by this class.
* @param context The storage context.
* @param storage The storage to delegate operations.
*/
public InternalStorage(Logger logger, StorageContext context, StorageInterface storage) {
this.logger = logger;
this.context = context;
this.storage = storage;
}
/**
* {@inheritDoc}
**/
@Override
public boolean isFileExist(URI uri) {
return this.storage.exists(context.getTenantId(), uri);
}
/**
* {@inheritDoc}
**/
@Override
public InputStream getFile(final URI uri) throws IOException {
if (uri == null) {
throw new IllegalArgumentException("Invalid internal storage uri, got null");
}
String scheme = uri.getScheme();
if (scheme == null) {
throw new IllegalArgumentException("Invalid internal storage uri, got uri '" + uri + "'");
}
if (!scheme.equals("kestra")) {
throw new IllegalArgumentException("Invalid internal storage scheme, got uri '" + uri + "'");
}
return this.storage.get(context.getTenantId(), uri);
}
/**
* {@inheritDoc}
**/
@Override
public List<URI> deleteExecutionFiles() throws IOException {
return this.storage.deleteByPrefix(context.getTenantId(), context.getExecutionStorageURI());
}
/**
* {@inheritDoc}
**/
@Override
public URI getContextBaseURI() {
return this.context.getContextStorageURI();
}
/**
* {@inheritDoc}
**/
@Override
public URI putFile(InputStream inputStream, String name) throws IOException {
URI uri = context.getContextStorageURI();
URI resolved = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
return this.storage.put(context.getTenantId(), resolved, new BufferedInputStream(inputStream));
}
/**
* {@inheritDoc}
**/
@Override
public URI putFile(InputStream inputStream, URI uri) throws IOException {
return this.storage.put(context.getTenantId(), uri, new BufferedInputStream(inputStream));
}
/**
* {@inheritDoc}
**/
@Override
public URI putFile(File file) throws IOException {
return putFile(file, null);
}
/**
* {@inheritDoc}
**/
@Override
public URI putFile(File file, String name) throws IOException {
URI uri = context.getContextStorageURI();
URI resolved = uri.resolve(uri.getPath() + PATH_SEPARATOR + (name != null ? name : file.getName()));
try (InputStream is = new FileInputStream(file)) {
return putFile(is, resolved);
} finally {
try {
Files.delete(file.toPath());
} catch (IOException e) {
logger.warn("Failed to delete temporary file '{}'", file.toPath(), e);
}
}
}
/**
* {@inheritDoc}
**/
@Override
public InputStream getTaskStateFile(String state, String name) throws IOException {
return getTaskStateFile(state, name, false, true);
}
/**
* {@inheritDoc}
**/
@Override
public InputStream getTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
URI uri = URI.create(getStatePrefix(state, isNamespace, useTaskRun));
URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
return this.storage.get(context.getTenantId(), resolve);
}
/**
* {@inheritDoc}
**/
@Override
public URI putTaskStateFile(byte[] content, String state, String name) throws IOException {
return this.putTaskStateFile(content, state, name, false, true);
}
/**
* {@inheritDoc}
**/
@Override
public URI putTaskStateFile(byte[] content, String state, String name, Boolean namespace, Boolean useTaskRun) throws IOException {
try (InputStream inputStream = new ByteArrayInputStream(content)) {
return this.putFile(inputStream, getStatePrefix(state, namespace, useTaskRun), name);
}
}
/**
* {@inheritDoc}
**/
@Override
public URI putTaskStateFile(File file, String state, String name) throws IOException {
return this.putTaskStateFile(file, state, name, false, true);
}
/**
* {@inheritDoc}
**/
@Override
public URI putTaskStateFile(File file, String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
return this.putFileAndDelete(file, getStatePrefix(state, isNamespace, useTaskRun), name);
}
/**
* {@inheritDoc}
**/
@Override
public boolean deleteTaskStateFile(String state, String name) throws IOException {
return this.deleteTaskStateFile(state, name, false, true);
}
/**
* {@inheritDoc}
**/
@Override
public boolean deleteTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
URI uri = URI.create(getStatePrefix(state, isNamespace, useTaskRun));
URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
return this.storage.delete(context.getTenantId(), resolve);
}
private String getStatePrefix(String name, Boolean isNamespace, Boolean useTaskRun) {
if (useTaskRun) {
return getTaskStorageContext()
.map(context -> context.getStateStorePrefix(name, isNamespace, context.getTaskRunValue()))
.orElseThrow(() -> new IllegalStateException("Cannot get task state from: " + context));
}
return context.getStateStorePrefix(name, isNamespace, null);
}
/**
* {@inheritDoc}
**/
@Override
public Optional<InputStream> getCacheFile(final String cacheId,
final @Nullable String objectId,
final @Nullable Duration ttl) throws IOException {
if (ttl != null) {
var maybeLastModifiedTime = getCacheFileLastModifiedTime(cacheId, objectId);
if (maybeLastModifiedTime.isPresent()) {
if (Instant.now().isAfter(Instant.ofEpochMilli(maybeLastModifiedTime.get()).plus(ttl))) {
logger.debug("Cache is expired for cache-id={}, object-id={}, and ttl={}, deleting it",
cacheId,
objectId,
ttl.toMillis()
);
deleteCacheFile(cacheId, objectId);
return Optional.empty();
}
}
}
URI uri = context.getCacheURI(cacheId, objectId);
return isFileExist(uri) ?
Optional.of(storage.get(context.getTenantId(), uri)) :
Optional.empty();
}
private Optional<Long> getCacheFileLastModifiedTime(String cacheId, @Nullable String objectId) throws IOException {
URI uri = context.getCacheURI(cacheId, objectId);
return isFileExist(uri) ?
Optional.of(this.storage.getAttributes(context.getTenantId(), uri).getLastModifiedTime()) :
Optional.empty();
}
/**
* {@inheritDoc}
**/
@Override
public URI putCacheFile(File file, String cacheId, @Nullable String objectId) throws IOException {
URI uri = context.getCacheURI(cacheId, objectId);
return this.putFileAndDelete(file, uri);
}
/**
* {@inheritDoc}
**/
@Override
public Optional<Boolean> deleteCacheFile(String cacheId, @Nullable String objectId) throws IOException {
URI uri = context.getCacheURI(cacheId, objectId);
return isFileExist(uri) ?
Optional.of(this.storage.delete(context.getTenantId(), uri)) :
Optional.empty();
}
private URI putFileAndDelete(File file, URI uri) throws IOException {
try (InputStream is = new FileInputStream(file)) {
return this.putFile(is, uri);
} finally {
try {
Files.delete(file.toPath());
} catch (IOException e) {
logger.warn("Failed to delete temporary file '{}'", file.toPath(), e);
}
}
}
private URI putFileAndDelete(File file, String prefix, String name) throws IOException {
URI uri = URI.create(prefix);
URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + (name != null ? name : file.getName()));
return putFileAndDelete(file, resolve);
}
private URI putFile(InputStream inputStream, String prefix, String name) throws IOException {
URI uri = URI.create(prefix);
URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
return this.storage.put(context.getTenantId(), resolve, new BufferedInputStream(inputStream));
}
private Optional<StorageContext.Task> getTaskStorageContext() {
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
}
}

View File

@@ -0,0 +1,159 @@
package io.kestra.core.storages;
import jakarta.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
/**
* Service interface for accessing the Kestra's storage.
*/
public interface Storage {
/**
* Checks whether the given URI points to an exiting file/object in the internal storage.
*
* @param uri the URI of the file/object in the internal storage.
* @return {@code true} if the URI points to a file/object that exists in the internal storage.
*/
boolean isFileExist(URI uri);
/**
* Retrieve an {@link InputStream} for the given file URI.
*
* @param uri the file URI.
* @return the {@link InputStream}.
* @throws IllegalArgumentException if the given {@link URI} is {@code null} or invalid.
* @throws IOException if an error happens while accessing the file.
*/
InputStream getFile(URI uri) throws IOException;
/**
* Deletes all the files for the current execution.
*
* @return The URIs of the deleted files.
* @throws IOException if an error happened while deleting files.
*/
List<URI> deleteExecutionFiles() throws IOException;
/**
* Gets the storage base URI for the current context.
*
* @return the URI.
*/
URI getContextBaseURI();
/**
* Stores a file with the given name for the given {@link InputStream} into Kestra's storage.
*
* @param inputStream the {@link InputStream} of the file content.
* @param name the name of the file on the Kestra's storage.
* @return the URI of the file/object in the internal storage.
* @throws IOException if an error occurs while storing the file.
*/
URI putFile(InputStream inputStream, String name) throws IOException;
/**
* Stores a file with the given name for the given {@link InputStream} into Kestra's storage.
*
* @param inputStream the {@link InputStream} of the file content.
* @param uri the target URI of the file to be stored in the storage.
* @return the URI of the file/object in the internal storage.
* @throws IOException if an error occurs while storing the file.
*/
URI putFile(InputStream inputStream, URI uri) throws IOException;
/**
* Stores a copy of the given file into Kestra's storage, and deletes the original file.
*
* @param file the file to be store.
* @return the URI of the stored object.
* @throws IOException if an error occurs while storing the file.
*/
URI putFile(File file) throws IOException;
/**
* Stores a copy of the given file into Kestra's storage with the specified name, and deletes the original file.
*
* @param file the file to be store.
* @param name the name of the file on the Kestra's storage.
* @return the URI of the stored object.
* @throws IOException if an error occurs while storing the file.
*/
URI putFile(File file, String name) throws IOException;
// ==============================================================
// STATE STORE
// ==============================================================
InputStream getTaskStateFile(String state, String name) throws IOException;
InputStream getTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException;
URI putTaskStateFile(byte[] content, String state, String name) throws IOException;
URI putTaskStateFile(byte[] content, String state, String name, Boolean namespace, Boolean useTaskRun) throws IOException;
URI putTaskStateFile(File file, String state, String name) throws IOException;
URI putTaskStateFile(File file, String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException;
boolean deleteTaskStateFile(String state, String name) throws IOException;
boolean deleteTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException;
// ==============================================================
// CACHING
// ==============================================================
/**
* Gets the cache file from the Kestra's storage for the given cacheID and objectID.
* If the cache file didn't exist, an empty Optional is returned.
*
* @param cacheId the ID of the cache.
* @param objectId the ID object cached object (optional).
* @return an Optional with the cache input stream or empty.
* @throws IOException if an error occurs during the operation.
*/
default Optional<InputStream> getCacheFile(String cacheId, @Nullable String objectId) throws IOException {
return getCacheFile(cacheId, objectId, null);
}
/**
* Gets the cache file from the Kestra's storage for the given cacheID and objectID.
* If the cache file didn't exist or has expired based on the given TTL, an empty Optional is returned.
*
* @param cacheId the ID of the cache.
* @param objectId the ID object cached object (optional).
* @param ttl the time-to-live duration of the cache.
* @return an Optional with the cache input stream or empty.
* @throws IOException if an error occurs during the operation.
*/
Optional<InputStream> getCacheFile(String cacheId, @Nullable String objectId, @Nullable Duration ttl) throws IOException;
/**
* Caches the given file into Kestra's storage with the given cache ID.
*
* @param file the cache as a ZIP archive
* @param cacheId the ID of the cache.
* @param objectId the ID object cached object (optional).
* @return the URI of the file inside the internal storage.
* @throws IOException if an error occurs during the operation.
*/
URI putCacheFile(File file, String cacheId, @Nullable String objectId) throws IOException;
/**
* Deletes the cache file.
*
* @param cacheId the ID of the cache.
* @param objectId the ID object cached object (optional).
* @return {@code true} if the cache file was removed/. Otherwise {@code false}.
* @throws IOException if an error occurs during the operation.
*/
Optional<Boolean> deleteCacheFile(String cacheId, @Nullable String objectId) throws IOException;
}

View File

@@ -0,0 +1,444 @@
package io.kestra.core.storages;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.hash.Hashing;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.utils.Slugify;
import jakarta.annotation.Nullable;
import lombok.Getter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Context used for storing and retrieving data from Kestra's storage.
*/
@Getter
public class StorageContext {
public static final String KESTRA_SCHEME = "kestra";
// /{namespace}/_files
static final String PREFIX_FORMAT_NAMESPACE_FILE = "/%s/_files";
// /{namespace}/{flow-id}
static final String PREFIX_FORMAT_FLOWS = "/%s/%s";
// /{namespace}/{flow-id}/executions/{execution-id}
static final String PREFIX_FORMAT_EXECUTIONS = "/%s/%s/executions/%s";
// /{namespace}/{flow-id}/executions/{execution-id}/tasks/{task-id}/{task-run-id}
static final String PREFIX_FORMAT_TASK = "/%s/%s/executions/%s/tasks/%s/%s";
// /{namespace}/{flow-id}/executions/{execution-id}/trigger/{trigger-id}
static final String PREFIX_FORMAT_TRIGGER = "/%s/%s/executions/%s/trigger/%s";
// /{namespace}/{flow-id}/executions/{execution-id}/inputs/{input-name}/{file-name}
static final String PREFIX_FORMAT_INPUTS = "/%s/%s/executions/%s/inputs/%s/%s";
// /{namespace}/{flow-id}/{cache-id}/cache/{object-id}/cache.zip
static final String PREFIX_FORMAT_CACHE_OBJECT = "/%s/%s/%s/cache/%s/cache.zip";
// /{namespace}/{flow-id}/{cache-id}/cache/cache.zip
static final String PREFIX_FORMAT_CACHE = "/%s/%s/%s/cache/cache.zip";
/**
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link TaskRun}.
*/
public static StorageContext forTask(TaskRun taskRun) {
return new StorageContext.Task(
taskRun.getTenantId(),
taskRun.getNamespace(),
taskRun.getFlowId(),
taskRun.getExecutionId(),
taskRun.getTaskId(),
taskRun.getId(),
taskRun.getValue()
);
}
/**
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link Flow}.
*/
public static StorageContext forFlow(Flow flow) {
return new StorageContext(flow.getTenantId(), flow.getNamespace(), flow.getId());
}
/**
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link Execution}.
*/
public static StorageContext forExecution(Execution execution) {
return forExecution(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId());
}
/**
* Factory method for constructing a new {@link StorageContext} scoped to a given Execution.
*/
public static StorageContext forExecution(@Nullable String tenantId,
String namespace,
String flowId,
String executionId) {
return new StorageContext(tenantId, namespace, flowId, executionId);
}
/**
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link Execution} and input.
*/
public static StorageContext.Input forInput(Execution execution,
String inputName,
String fileName) {
return new StorageContext.Input(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId(), inputName, fileName);
}
/**
* Factory method for constructing a new {@link StorageContext} scoped to a given Task.
*/
public static StorageContext.Task forTask(@Nullable String tenantId,
String namespace,
String flowId,
String executionId,
String taskId,
String taskRunId,
@Nullable String taskRunValue) {
return new StorageContext.Task(tenantId, namespace, flowId, executionId, taskId, taskRunId, taskRunValue);
}
/**
* Factory method for constructing a new {@link StorageContext} scoped to a given Trigger.
*/
public static StorageContext.Trigger forTrigger(@Nullable String tenantId,
String namespace,
String flowId,
String executionId,
String triggerId) {
return new StorageContext.Trigger(tenantId, namespace, flowId, executionId, triggerId);
}
private final String tenantId;
private final String namespace;
private final String flowId;
private final String executionId;
@VisibleForTesting
public StorageContext() {
this.tenantId = null;
this.namespace = null;
this.flowId = null;
this.executionId = null;
}
private StorageContext(final @Nullable String tenantId,
final String namespace,
final String flowId) {
this.tenantId = tenantId;
this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null");
this.flowId = Objects.requireNonNull(flowId, "flowId cannot be null");
this.executionId = null;
}
private StorageContext(final @Nullable String tenantId,
final String namespace,
final String flowId,
final String executionId) {
this.tenantId = tenantId;
this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null");
this.flowId = Objects.requireNonNull(flowId, "flowId cannot be null");
this.executionId = Objects.requireNonNull(executionId, "executionId cannot be null");
}
public static Optional<String> extractExecutionId(URI path) {
Pattern pattern = Pattern.compile("^/(.+)/executions/([^/]+)/", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(path.getPath());
if (!matcher.find() || matcher.group(2).isEmpty()) {
return Optional.empty();
}
return Optional.of(matcher.group(2));
}
/**
* Gets the storage URI of the given cacheID, and optionally the given objectID.
*
* @param cacheId the ID of the cache.
* @param objectId the ID object cached object (optional).
* @return the URI
*/
public URI getCacheURI(final String cacheId, @Nullable final String objectId) {
Objects.requireNonNull(cacheId, "Cannot create URI with id null");
final String prefix;
if (objectId == null) {
prefix = String.format(
PREFIX_FORMAT_CACHE,
getNamespaceAsPath(),
Slugify.of(getFlowId()),
Slugify.of(cacheId)
);
} else {
String hashedObjectId = Hashing.goodFastHash(64)
.hashString(objectId, Charsets.UTF_8)
.toString();
prefix = String.format(
PREFIX_FORMAT_CACHE_OBJECT,
getNamespaceAsPath(),
Slugify.of(getFlowId()),
Slugify.of(cacheId),
hashedObjectId
);
}
return URI.create(prefix);
}
public String getNamespaceAsPath() {
return getNamespace().replace(".", "/");
}
/**
* Gets the storage prefix for the given state store ID.
*
* @param id the primary ID of the state.
* @param isNamespace specify whether the state is on namespace or flow level.
* @param value the secondary ID (e.g., the runTaskValue).
* @return the storage prefix.
*/
public String getStateStorePrefix(String id, Boolean isNamespace, String value) {
ArrayList<String> paths = new ArrayList<>(List.of(getNamespaceAsPath()));
if (!isNamespace) {
paths.add(Slugify.of(getFlowId()));
}
paths.add("states");
if (id != null) {
paths.add(id);
}
if (value != null) {
paths.add(Hashing
.goodFastHash(64)
.hashString(value, Charsets.UTF_8)
.toString()
);
}
return "/" + String.join("/", paths);
}
/**
* Gets the base storage URI for the current {@link io.kestra.core.models.flows.Flow}.
*
* @return the {@link URI}.
*/
public URI getFlowStorageURI() {
try {
var prefix = String.format(PREFIX_FORMAT_FLOWS, getNamespaceAsPath(), Slugify.of(getFlowId()));
return new URI("//" + prefix);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Gets the base storage URI for the current {@link Execution}.
*
* @return the {@link URI}.
*/
public URI getExecutionStorageURI() {
return getExecutionStorageURI(null);
}
/**
* Gets the base storage URI for the current {@link Execution}.
*
* @param scheme The scheme name.
* @return the {@link URI}.
*/
public URI getExecutionStorageURI(@Nullable String scheme) {
try {
var schemePrefix = Optional.ofNullable(scheme)
.map(s -> s.endsWith("://") ? s : s + "://")
.orElse("//");
var prefix = String.format(PREFIX_FORMAT_EXECUTIONS,
getNamespaceAsPath(),
Slugify.of(flowId),
executionId
);
return new URI(schemePrefix + prefix);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Gets the base storage URI for this context.
*
* @return the {@link URI}.
*/
public URI getContextStorageURI() {
return getExecutionStorageURI();
}
/**
* {@inheritDoc}
**/
@Override
public String toString() {
return "StorageContext::Execution";
}
public static String namespaceFilePrefix(String namespace) {
return String.format(PREFIX_FORMAT_NAMESPACE_FILE, namespace.replace(".", "/"));
}
/**
* A storage context scoped to a Task.
*/
@Getter
public static class Task extends StorageContext {
private final String taskId;
private final String taskRunId;
private final String taskRunValue;
private Task(final String tenantId,
final String namespace,
final String flowId,
final String executionId,
final String taskId,
final String taskRunId,
@Nullable final String taskRunValue) {
super(tenantId, namespace, flowId, executionId);
this.taskId = Objects.requireNonNull(taskId, "taskID cannot be null");
this.taskRunId = Objects.requireNonNull(taskRunId, "taskRunID cannot be null");
this.taskRunValue = taskRunValue;
}
/**
* {@inheritDoc}
**/
@Override
public URI getContextStorageURI() {
try {
var prefix = String.format(
PREFIX_FORMAT_TASK,
getNamespaceAsPath(),
Slugify.of(getFlowId()),
getExecutionId(),
Slugify.of(getTaskId()),
getTaskRunId()
);
return new URI("//" + prefix);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* {@inheritDoc}
**/
@Override
public String toString() {
return "StorageContext::Task";
}
}
/**
* A storage context scoped to a Trigger.
*/
@Getter
public static class Trigger extends StorageContext {
private final String triggerId;
private Trigger(final String tenantId,
final String namespace,
final String flowId,
final String executionId,
final String triggerId) {
super(tenantId, namespace, flowId, executionId);
this.triggerId = Objects.requireNonNull(triggerId, "triggerId cannot be null");
}
/**
* {@inheritDoc}
**/
@Override
public URI getContextStorageURI() {
try {
String prefix = String.format(PREFIX_FORMAT_TRIGGER,
getNamespaceAsPath(),
Slugify.of(getFlowId()),
getExecutionId(),
Slugify.of(getTriggerId())
);
return new URI("//" + prefix);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* {@inheritDoc}
**/
@Override
public String toString() {
return "StorageContext::Trigger";
}
}
/**
* A storage context scoped to a Trigger.
*/
@Getter
public static class Input extends StorageContext {
private final String inputName;
private final String fileName;
private Input(final String tenantId,
final String namespace,
final String flowId,
final String executionId,
final String inputName,
final String fileName) {
super(tenantId, namespace, flowId, executionId);
this.inputName = Objects.requireNonNull(inputName, "inputName cannot be null");
this.fileName = Objects.requireNonNull(fileName, "fileName cannot be null");
}
/**
* {@inheritDoc}
**/
@Override
public URI getContextStorageURI() {
try {
var prefix = String.format(
PREFIX_FORMAT_INPUTS,
getNamespaceAsPath(),
Slugify.of(getFlowId()),
getExecutionId(),
inputName,
fileName
);
return new URI("//" + prefix);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* {@inheritDoc}
**/
@Override
public String toString() {
return "StorageContext::Input";
}
}
}

View File

@@ -1,29 +1,17 @@
package io.kestra.core.storages;
import com.google.common.base.Charsets;
import com.google.common.hash.Hashing;
import io.kestra.core.annotations.Retryable;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.utils.Slugify;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import java.io.*;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Introspected
public interface StorageInterface {
@@ -32,6 +20,7 @@ public interface StorageInterface {
/**
* Returns all objects that start with the given prefix
*
* @param includeDirectories whether to include directories in the given results or not. If true, directories' uri will have a trailing '/'
* @return Kestra's internal storage uris of the found objects
*/
@@ -41,7 +30,6 @@ public interface StorageInterface {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> list(String tenantId, URI uri) throws IOException;
/**
* Whether the uri points to a file/object that exist in the internal storage.
*
@@ -51,7 +39,7 @@ public interface StorageInterface {
*/
@SuppressWarnings("try")
default boolean exists(String tenantId, URI uri) {
try (InputStream ignored = get(tenantId, uri)){
try (InputStream ignored = get(tenantId, uri)) {
return true;
} catch (IOException ieo) {
return false;
@@ -76,176 +64,9 @@ public interface StorageInterface {
@Retryable(includes = {IOException.class})
List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException;
default String executionPrefix(Flow flow, Execution execution) {
return fromParts(
flow.getNamespace().replace(".", "/"),
Slugify.of(flow.getId()),
"executions",
execution.getId()
);
}
default String executionPrefix(Execution execution) {
return fromParts(
execution.getNamespace().replace(".", "/"),
Slugify.of(execution.getFlowId()),
"executions",
execution.getId()
);
}
default String executionPrefix(TaskRun taskRun) {
return fromParts(
taskRun.getNamespace().replace(".", "/"),
Slugify.of(taskRun.getFlowId()),
"executions",
taskRun.getExecutionId()
);
}
default String statePrefix(String namespace, @Nullable String flowId, @Nullable String name, @Nullable String value) {
String namespacePrefix = namespace.replace(".", "/");
ArrayList<String> paths = new ArrayList<>(
List.of(
namespacePrefix
)
);
if (flowId != null) {
paths.add(Slugify.of(flowId));
}
paths.add("states");
if (name != null) {
paths.add(name);
}
if (value != null) {
paths.add(Hashing
.goodFastHash(64)
.hashString(value, Charsets.UTF_8)
.toString()
);
}
return String.join("/", paths);
}
default String cachePrefix(String namespace, String flowId, String taskId, @Nullable String value) {
String namespacePrefix = namespace.replace(".", "/");
ArrayList<String> paths = new ArrayList<>(
List.of(
namespacePrefix,
Slugify.of(flowId),
Slugify.of(taskId),
"cache"
)
);
if (value != null) {
paths.add(Hashing
.goodFastHash(64)
.hashString(value, Charsets.UTF_8)
.toString()
);
}
return String.join("/", paths);
}
default String namespaceFilePrefix(String namespace) {
return fromParts(
namespace.replace(".", "/"),
"_files"
);
}
default Optional<String> extractExecutionId(URI path) {
Pattern pattern = Pattern.compile("^/(.+)/executions/([^/]+)/", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(path.getPath());
if (!matcher.find() || matcher.group(2).isEmpty()) {
return Optional.empty();
}
return Optional.of(matcher.group(2));
}
default URI uri(Flow flow, Execution execution, String inputName, String file) throws URISyntaxException {
return new URI("/" + fromParts(
executionPrefix(flow, execution),
"inputs",
inputName,
file
));
}
@Retryable(includes = {IOException.class})
default URI from(Flow flow, Execution execution, String input, File file) throws IOException {
try {
return this.put(
flow.getTenantId(),
this.uri(flow, execution, input, file.getName()),
new BufferedInputStream(new FileInputStream(file))
);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
@Retryable(includes = {IOException.class})
default URI from(Flow flow, Execution execution, Input<?> input, File file) throws IOException {
return this.from(flow, execution, input.getName(), file);
}
default URI outputPrefix(Flow flow) {
try {
return new URI("//" + fromParts(
flow.getNamespace().replace(".", "/"),
Slugify.of(flow.getId())
));
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
default URI outputPrefix(Flow flow, Task task, Execution execution, TaskRun taskRun) {
try {
return new URI("//" + fromParts(
flow.getNamespace().replace(".", "/"),
Slugify.of(flow.getId()),
"executions",
execution.getId(),
"tasks",
Slugify.of(taskRun.getTaskId()),
taskRun.getId()
));
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
default URI outputPrefix(TriggerContext triggerContext, AbstractTrigger trigger, String triggerExecutionId) {
try {
return new URI("//" + fromParts(
triggerContext.getNamespace().replace(".", "/"),
Slugify.of(triggerContext.getFlowId()),
"executions",
triggerExecutionId,
"trigger",
Slugify.of(trigger.getId())
));
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
private String fromParts(String... parts) {
return "/" + Arrays.stream(parts)
.filter(part -> part != null)
.collect(Collectors.joining("/"));
default URI from(Execution execution, String input, File file) throws IOException {
URI uri = StorageContext.forInput(execution, input, file.getName()).getContextStorageURI();
return this.put(execution.getTenantId(), uri, new BufferedInputStream(new FileInputStream(file)));
}
}

View File

@@ -234,19 +234,8 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
public void preExecuteTasks(RunContext runContext, TaskRun taskRun) throws Exception {
if (cache != null) {
// first, check if we need to delete the file
if (cache.ttl != null) {
var maybeLastModifiedTime = runContext.getTaskCacheFileLastModifiedTime(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
if (maybeLastModifiedTime.isPresent()) {
if (Instant.now().isAfter(Instant.ofEpochMilli(maybeLastModifiedTime.get()).plus(cache.ttl))) {
runContext.logger().debug("Cache is expired, deleting it");
runContext.deleteTaskCacheFile(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
}
}
}
// then download it and extract its content
var maybeCacheFile = runContext.getTaskCacheFile(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
// May download cached file if it exists and is not expired, and extract its content
var maybeCacheFile = runContext.storage().getCacheFile(getId(), taskRun.getValue(), cache.ttl);
if (maybeCacheFile.isPresent()) {
runContext.logger().debug("Cache exist, downloading it");
// download the cache if exist and unzip all entries
@@ -331,7 +320,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
archive.finish();
File archiveFile = File.createTempFile("archive", ".zip");
Files.write(archiveFile.toPath(), bos.toByteArray());
URI uri = runContext.putTaskCacheFile(archiveFile, taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
URI uri = runContext.storage().putCacheFile(archiveFile, getId(), taskRun.getValue());
runContext.logger().debug("Caching in {}", uri);
}
} else {

View File

@@ -70,7 +70,7 @@ public abstract class AbstractState extends Task {
Map<String, Object> merge = MapUtils.merge(current, runContext.render(map));
URI uri = runContext.putTaskStateFile(
URI uri = runContext.storage().putTaskStateFile(
JacksonMapper.ofJson(false).writeValueAsBytes(merge),
"tasks-states",
runContext.render(this.name),
@@ -82,6 +82,6 @@ public abstract class AbstractState extends Task {
}
protected boolean delete(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
return runContext.deleteTaskStateFile("tasks-states", runContext.render(this.name), this.namespace, this.taskrunValue);
return runContext.storage().deleteTaskStateFile("tasks-states", runContext.render(this.name), this.namespace, this.taskrunValue);
}
}

View File

@@ -37,7 +37,7 @@ public class PurgeExecution extends Task implements RunnableTask<PurgeExecution.
@Override
public PurgeExecution.Output run(RunContext runContext) throws Exception {
return Output.builder()
.uris(runContext.purgeStorageExecution())
.uris(runContext.storage().deleteExecutionFiles())
.build();
}

View File

@@ -60,7 +60,8 @@ public abstract class AbstractExecutionServiceTest {
.revision(1)
.build();
Execution execution = Execution.builder()
Execution execution = Execution
.builder()
.id(IdUtils.create())
.state(state)
.flowId(flow.getId())
@@ -70,14 +71,24 @@ public abstract class AbstractExecutionServiceTest {
Return task = Return.builder().id(IdUtils.create()).type(Return.class.getName()).build();
TaskRun taskRun = TaskRun
.builder()
.namespace(flow.getNamespace())
.id(IdUtils.create())
.executionId(execution.getId())
.flowId(flow.getId())
.taskId(task.getId())
.state(state)
.build();
RunContext runContext = runContextFactory.of(
flow,
task,
execution,
TaskRun.builder().id(IdUtils.create()).taskId(task.getId()).state(state).build()
taskRun
);
execution.withInputs(Map.of("test", runContext.putTempFile(tempFile)));
execution.withInputs(Map.of("test", runContext.storage().putFile(tempFile)));
executionRepository.save(execution);

View File

@@ -71,8 +71,9 @@ public class InputsTest extends AbstractMemoryRunnerTest {
flow,
Execution.builder()
.id("test")
.namespace("test")
.namespace(flow.getNamespace())
.flowRevision(1)
.flowId(flow.getId())
.build(),
map
);

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Nullable;
@@ -170,7 +171,7 @@ class NamespaceFilesServiceTest {
private void put(@Nullable String tenantId, String namespace, String path, String content) throws IOException {
storageInterface.put(
tenantId,
URI.create(storageInterface.namespaceFilePrefix(namespace) + path),
URI.create(StorageContext.namespaceFilePrefix(namespace) + path),
new ByteArrayInputStream(content.getBytes())
);
}

View File

@@ -2,11 +2,11 @@ package io.kestra.core.runners.pebble.functions;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
@@ -31,8 +31,8 @@ class ReadFileFunctionTest {
void readNamespaceFile() throws IllegalVariableEvaluationException, IOException {
String namespace = "io.kestra.tests";
String filePath = "file.txt";
storageInterface.createDirectory(null, URI.create(storageInterface.namespaceFilePrefix(namespace)));
storageInterface.put(null, URI.create(storageInterface.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
storageInterface.createDirectory(null, URI.create(StorageContext.namespaceFilePrefix(namespace)));
storageInterface.put(null, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
String render = variableRenderer.render("{{ render(read('" + filePath + "')) }}", Map.of("flow", Map.of("namespace", namespace)));
assertThat(render, is("Hello from " + namespace));

View File

@@ -1,91 +0,0 @@
package io.kestra.core.storage;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.log.Log;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@MicronautTest
public class StorageInterfaceTest {
@Inject
StorageInterface storageInterface;
@Test
void executionPrefix() {
var flow = Flow.builder().id("flow").namespace("namespace").build();
var execution = Execution.builder().id("execution").namespace("namespace").flowId("flow").build();
var taskRun = TaskRun.builder().id("taskrun").namespace("namespace").flowId("flow").executionId("execution").build();
var prefix = storageInterface.executionPrefix(flow, execution);
assertThat(prefix, notNullValue());
assertThat(prefix, is("/namespace/flow/executions/execution"));
prefix = storageInterface.executionPrefix(execution);
assertThat(prefix, notNullValue());
assertThat(prefix, is("/namespace/flow/executions/execution"));
prefix = storageInterface.executionPrefix(taskRun);
assertThat(prefix, notNullValue());
assertThat(prefix, is("/namespace/flow/executions/execution"));
}
@Test
void cachePrefix() {
var prefix = storageInterface.cachePrefix("namespace", "flow", "task", null);
assertThat(prefix, notNullValue());
assertThat(prefix, is("namespace/flow/task/cache"));
prefix = storageInterface.cachePrefix("namespace", "flow", "task", "value");
assertThat(prefix, notNullValue());
assertThat(prefix, startsWith("namespace/flow/task/cache/"));
}
@Test
void statePrefix() {
var prefix = storageInterface.statePrefix("namespace", "flow", "name", null);
assertThat(prefix, notNullValue());
assertThat(prefix, is("namespace/flow/states/name"));
prefix = storageInterface.statePrefix("namespace", "flow", "name", "value");
assertThat(prefix, notNullValue());
assertThat(prefix, startsWith("namespace/flow/states/name/"));
}
@Test
void outputPrefix() {
var flow = Flow.builder().id("flow").namespace("namespace").build();
var prefix = storageInterface.outputPrefix(flow);
assertThat(prefix, notNullValue());
assertThat(prefix.toString(), is("///namespace/flow"));
var log = Log.builder().id("log").type(Log.class.getName()).message("Hello").build();
var execution = Execution.builder().id("execution").build();
var taskRun = TaskRun.builder().id("taskrun").namespace("namespace").flowId("flow").executionId("execution").taskId("taskid").build();
prefix = storageInterface.outputPrefix(flow, log, execution, taskRun);
assertThat(prefix, notNullValue());
assertThat(prefix.toString(), is("///namespace/flow/executions/execution/tasks/taskid/taskrun"));
var triggerContext = TriggerContext.builder().namespace("namespace").flowId("flow").triggerId("trigger").build();
var trigger = Schedule.builder().id("trigger").build();
prefix = storageInterface.outputPrefix(triggerContext, trigger, "execution");
assertThat(prefix, notNullValue());
assertThat(prefix.toString(), is("///namespace/flow/executions/execution/trigger/trigger"));
}
@Test
void namespaceFilePrefix() {
var prefix = storageInterface.namespaceFilePrefix("io.namespace");
assertThat(prefix, notNullValue());
assertThat(prefix, is("/io/namespace/_files"));
}
}

View File

@@ -0,0 +1,121 @@
package io.kestra.core.storages;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import org.junit.jupiter.api.Test;
import java.net.URI;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
class StorageContextTest {
@Test
void shouldGetValidUriForFlowContext() {
StorageContext context = StorageContext.forExecution(Execution
.builder()
.tenantId("tenantId")
.id("executionid")
.namespace("namespace")
.flowId("flowid")
.build()
);
assertThat(context.getFlowStorageURI(), is(URI.create("///namespace/flowid")));
}
@Test
void shouldGetValidUriForExecutionContext() {
StorageContext context = StorageContext.forExecution(Execution
.builder()
.tenantId("tenantId")
.id("executionid")
.namespace("namespace")
.flowId("flowid")
.build()
);
assertThat(context.getExecutionStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
assertThat(context.getContextStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
}
@Test
void shouldGetValidUriForExecutionContextWithScheme() {
StorageContext context = StorageContext.forExecution(Execution
.builder()
.tenantId("tenantId")
.id("executionid")
.namespace("namespace")
.flowId("flowid")
.build()
);
assertThat(context.getExecutionStorageURI("kestra"), is(URI.create("kestra:///namespace/flowid/executions/executionid")));
assertThat(context.getExecutionStorageURI("kestra://"), is(URI.create("kestra:///namespace/flowid/executions/executionid")));
assertThat(context.getContextStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
}
@Test
void shouldGetValidURIForTaskContext() {
StorageContext context = StorageContext.forTask(
"???",
"namespace",
"flowid",
"executionid",
"taskid",
"taskrun",
null
);
assertThat(context.getExecutionStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
assertThat(context.getContextStorageURI(), is(URI.create("///namespace/flowid/executions/executionid/tasks/taskid/taskrun")));
}
@Test
void shouldGetValidStatePrefixForTaskContext() {
StorageContext context = StorageContext.forFlow(Flow.builder().namespace("namespace").id("flowid").build());
assertThat(context.getStateStorePrefix("name", false, null), is("/namespace/flowid/states/name"));
assertThat(context.getStateStorePrefix("name", false, "aaa"), startsWith("/namespace/flowid/states/name/"));
assertThat(context.getStateStorePrefix("name", true, null), is("/namespace/states/name"));
assertThat(context.getStateStorePrefix("name", true, "aaa"), startsWith("/namespace/states/name/"));
assertThat(context.getStateStorePrefix(null, true, null), is("/namespace/states"));
}
@Test
void shouldGetValidURIForTriggerContext() {
StorageContext context = StorageContext.forTrigger(
"???",
"namespace",
"flowid",
"executionid",
"triggerid"
);
assertThat(context.getExecutionStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
assertThat(context.getContextStorageURI(), is(URI.create("///namespace/flowid/executions/executionid/trigger/triggerid")));
}
@Test
void shouldGetNamespaceFilePrefix() {
assertThat(StorageContext.namespaceFilePrefix("io.namespace"), is("/io/namespace/_files"));
}
@Test
void shouldGetTaskCachePrefix() {
assertThat(StorageContext.forFlow(Flow
.builder()
.tenantId(null)
.namespace("namespace")
.id("flowid")
.build()
).getCacheURI("taskid", null), is(URI.create("/namespace/flowid/taskid/cache/cache.zip")));
assertThat(StorageContext.forFlow(Flow
.builder()
.tenantId(null)
.namespace("namespace")
.id("flowid")
.build()
).getCacheURI("taskid", "value").toString(), startsWith("/namespace/flowid/taskid/cache/"));
}
}

View File

@@ -3,9 +3,12 @@ package io.kestra.core.tasks.flows;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -21,6 +24,7 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
@@ -98,8 +102,21 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
@SuppressWarnings("unchecked")
public void cache(RunnerUtils runnerUtils) throws TimeoutException, IOException {
// make sure the cache didn't exist
URI cache = URI.create(storageInterface.cachePrefix("io.kestra.tests", "working-directory-cache", "workingDir", null) + "/cache.zip");
storageInterface.delete(null, cache);
StorageContext storageContext = StorageContext.forFlow(Flow
.builder()
.namespace("io.kestra.tests")
.id("working-directory-cache")
.build()
);
InternalStorage storage = new InternalStorage(
null,
storageContext
, storageInterface
);
storage.deleteCacheFile("workingDir", null);
URI cacheURI = storageContext.getCacheURI("workingdir", null);
assertFalse(storageInterface.exists(null, cacheURI));
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "working-directory-cache");
@@ -111,7 +128,7 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
nullValue()
);
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertTrue(storageInterface.exists(null, cache));
assertTrue(storageInterface.exists(null, cacheURI));
// a second run should use the cache so the task `exists` should output the cached file
execution = runnerUtils.runOne(null, "io.kestra.tests", "working-directory-cache");
@@ -163,7 +180,7 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
private void put(String path, String content) throws IOException {
storageInterface.put(
null,
URI.create(storageInterface.namespaceFilePrefix("io.kestra.tests") + path),
URI.create(StorageContext.namespaceFilePrefix("io.kestra.tests") + path),
new ByteArrayInputStream(content.getBytes())
);
}

View File

@@ -26,6 +26,7 @@ import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.Await;
@@ -44,7 +45,6 @@ import io.micronaut.core.convert.format.Format;
import io.micronaut.data.model.Pageable;
import io.micronaut.http.*;
import io.micronaut.http.annotation.*;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.http.multipart.StreamingFileUpload;
import io.micronaut.http.server.types.files.StreamedFile;
import io.micronaut.http.sse.Event;
@@ -543,24 +543,28 @@ public class ExecutionController {
throw new NoSuchElementException("Unable to find flow id '" + executionId + "'");
}
String prefix = storageInterface.executionPrefix(flow.get(), execution.get());
String prefix = StorageContext
.forExecution(execution.get())
.getExecutionStorageURI().getPath();
if (path.getPath().startsWith(prefix)) {
return null;
}
// maybe state
prefix = storageInterface.statePrefix(flow.get().getNamespace(), flow.get().getId(), null, null);
StorageContext context = StorageContext.forFlow(flow.get());
prefix = context.getStateStorePrefix(null, false, null);
if (path.getPath().startsWith(prefix)) {
return null;
}
prefix = storageInterface.statePrefix(flow.get().getNamespace(), null, null, null);
prefix = context.getStateStorePrefix(null, true, null);
if (path.getPath().startsWith(prefix)) {
return null;
}
// maybe redirect to correct execution
Optional<String> redirectedExecution = storageInterface.extractExecutionId(path);
Optional<String> redirectedExecution = StorageContext.extractExecutionId(path);
if (redirectedExecution.isPresent()) {
return HttpResponse.redirect(URI.create((basePath != null ? basePath : "") +

View File

@@ -1,10 +1,10 @@
package io.kestra.webserver.controllers;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.services.FlowService;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.ImmutableFileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.Rethrow;
@@ -276,7 +276,7 @@ public class NamespaceFileController {
}
private URI toNamespacedStorageUri(String namespace, @Nullable URI relativePath) {
return URI.create("kestra://" + storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse("/"));
return URI.create("kestra://" + StorageContext.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse("/"));
}
private void ensureWritableNamespaceFile(URI path) {

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.webserver.controllers.h2.JdbcH2ControllerTest;
import io.micronaut.core.annotation.Nullable;
@@ -310,7 +311,7 @@ class NamespaceFileControllerTest extends JdbcH2ControllerTest {
}
private URI toNamespacedStorageUri(String namespace, @Nullable URI relativePath) {
return URI.create(storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse(""));
return URI.create(StorageContext.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse(""));
}
@Getter