mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
chore(core): add new interface for accessing Kestra storage
This commit provides a cleaner way to access kestra storage through RunContext
This commit is contained in:
committed by
Florian Hussonnois
parent
fa1479e3e5
commit
9e1b6601be
@@ -2,6 +2,7 @@ package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.tasks.NamespaceFiles;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -67,7 +68,7 @@ public class NamespaceFilesService {
|
||||
}
|
||||
|
||||
private URI uri(String namespace, @Nullable URI path) {
|
||||
return URI.create(storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(path)
|
||||
return URI.create(StorageContext.namespaceFilePrefix(namespace) + Optional.ofNullable(path)
|
||||
.map(URI::getPath)
|
||||
.orElse("")
|
||||
);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.CaseFormat;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
@@ -17,9 +18,11 @@ import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.storages.InternalStorage;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -27,11 +30,20 @@ import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -46,17 +58,13 @@ public class RunContext {
|
||||
private MetricRegistry meterRegistry;
|
||||
private Path tempBasedPath;
|
||||
private RunContextCache runContextCache;
|
||||
|
||||
private URI storageOutputPrefix;
|
||||
private URI storageExecutionPrefix;
|
||||
private Map<String, Object> variables;
|
||||
private List<AbstractMetricEntry<?>> metrics = new ArrayList<>();
|
||||
private RunContextLogger runContextLogger;
|
||||
private final List<WorkerTaskResult> dynamicWorkerTaskResult = new ArrayList<>();
|
||||
|
||||
protected transient Path temporaryDirectory;
|
||||
|
||||
private String triggerExecutionId;
|
||||
private Storage storage;
|
||||
|
||||
/**
|
||||
* Only used by {@link io.kestra.core.models.triggers.types.Flow}
|
||||
@@ -82,8 +90,8 @@ public class RunContext {
|
||||
*/
|
||||
public RunContext(ApplicationContext applicationContext, Flow flow, Task task, Execution execution, TaskRun taskRun) {
|
||||
this.initBean(applicationContext);
|
||||
this.initContext(flow, task, execution, taskRun);
|
||||
this.initLogger(taskRun, task);
|
||||
this.initContext(flow, task, execution, taskRun);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -107,13 +115,21 @@ public class RunContext {
|
||||
@VisibleForTesting
|
||||
public RunContext(ApplicationContext applicationContext, Map<String, Object> variables) {
|
||||
this.initBean(applicationContext);
|
||||
|
||||
this.variables = new HashMap<>();
|
||||
this.variables.putAll(this.variables(null, null, null, null, null));
|
||||
this.variables.putAll(variables);
|
||||
|
||||
this.storageOutputPrefix = URI.create("");
|
||||
this.runContextLogger = new RunContextLogger();
|
||||
this.storage = new InternalStorage(
|
||||
logger(),
|
||||
new StorageContext() {
|
||||
@Override
|
||||
public URI getContextStorageURI() {
|
||||
return URI.create("");
|
||||
}
|
||||
},
|
||||
storageInterface
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
protected void initBean(ApplicationContext applicationContext) {
|
||||
@@ -130,8 +146,13 @@ public class RunContext {
|
||||
|
||||
private void initContext(Flow flow, Task task, Execution execution, TaskRun taskRun) {
|
||||
this.variables = this.variables(flow, task, execution, taskRun, null);
|
||||
|
||||
if (taskRun != null && this.storageInterface != null) {
|
||||
this.storageOutputPrefix = this.storageInterface.outputPrefix(flow, task, execution, taskRun);
|
||||
this.storage = new InternalStorage(
|
||||
logger(),
|
||||
StorageContext.forTask(taskRun),
|
||||
storageInterface
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,11 +217,6 @@ public class RunContext {
|
||||
return variables;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public URI getStorageOutputPrefix() {
|
||||
return storageOutputPrefix;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public ApplicationContext getApplicationContext() {
|
||||
return applicationContext;
|
||||
@@ -374,8 +390,7 @@ public class RunContext {
|
||||
runContext.variableRenderer = this.variableRenderer;
|
||||
runContext.applicationContext = this.applicationContext;
|
||||
runContext.storageInterface = this.storageInterface;
|
||||
runContext.storageOutputPrefix = this.storageOutputPrefix;
|
||||
runContext.storageExecutionPrefix = this.storageExecutionPrefix;
|
||||
runContext.storage = this.storage;
|
||||
runContext.variables = variables;
|
||||
runContext.metrics = new ArrayList<>();
|
||||
runContext.meterRegistry = this.meterRegistry;
|
||||
@@ -388,20 +403,33 @@ public class RunContext {
|
||||
|
||||
public RunContext forScheduler(TriggerContext triggerContext, AbstractTrigger trigger) {
|
||||
this.triggerExecutionId = IdUtils.create();
|
||||
this.storageOutputPrefix = this.storageInterface.outputPrefix(triggerContext, trigger, triggerExecutionId);
|
||||
|
||||
StorageContext context = StorageContext.forTrigger(
|
||||
triggerContext.getTenantId(),
|
||||
triggerContext.getNamespace(),
|
||||
triggerContext.getFlowId(),
|
||||
triggerExecutionId,
|
||||
trigger.getId()
|
||||
);
|
||||
this.storage = new InternalStorage(
|
||||
logger(),
|
||||
context,
|
||||
storageInterface
|
||||
);
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public RunContext forWorker(ApplicationContext applicationContext, WorkerTask workerTask) {
|
||||
this.initBean(applicationContext);
|
||||
this.initLogger(workerTask.getTaskRun(), workerTask.getTask());
|
||||
|
||||
final TaskRun taskRun = workerTask.getTaskRun();
|
||||
|
||||
this.initLogger(taskRun, workerTask.getTask());
|
||||
|
||||
Map<String, Object> clone = new HashMap<>(this.variables);
|
||||
|
||||
clone.remove("taskrun");
|
||||
clone.put("taskrun", this.variables(workerTask.getTaskRun()));
|
||||
clone.put("taskrun", this.variables(taskRun));
|
||||
|
||||
clone.remove("task");
|
||||
clone.put("task", this.variables(workerTask.getTask()));
|
||||
@@ -417,8 +445,7 @@ public class RunContext {
|
||||
}
|
||||
|
||||
this.variables = ImmutableMap.copyOf(clone);
|
||||
this.storageExecutionPrefix = URI.create("/" + this.storageInterface.executionPrefix(workerTask.getTaskRun()));
|
||||
|
||||
this.storage = new InternalStorage(logger(), StorageContext.forTask(taskRun), storageInterface);
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -499,31 +526,46 @@ public class RunContext {
|
||||
return runContextLogger.logger();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a {@link InputStream} for the given file URI.
|
||||
*
|
||||
* @param uri the file URI.
|
||||
* @return the {@link InputStream}.
|
||||
* @throws IOException
|
||||
* @deprecated use {@link Storage#getFile(URI)}.
|
||||
*/
|
||||
@Deprecated
|
||||
public InputStream uriToInputStream(URI uri) throws IOException {
|
||||
if (uri == null) {
|
||||
throw new IllegalArgumentException("Invalid internal storage uri, got null");
|
||||
}
|
||||
return this.storage.getFile(uri);
|
||||
}
|
||||
|
||||
if (uri.getScheme() == null) {
|
||||
throw new IllegalArgumentException("Invalid internal storage uri, got uri '" + uri + "'");
|
||||
}
|
||||
// for serialization backward-compatibility
|
||||
@JsonIgnore
|
||||
public URI getStorageOutputPrefix() {
|
||||
return storage.getContextBaseURI();
|
||||
}
|
||||
|
||||
if (uri.getScheme().equals("kestra")) {
|
||||
return this.storageInterface.get(tenantId(), uri);
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Invalid internal storage scheme, got uri '" + uri + "'");
|
||||
/**
|
||||
* Gets access to the Kestra's storage.
|
||||
*
|
||||
* @return a {@link Storage} object.
|
||||
*/
|
||||
public Storage storage() {
|
||||
return storage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Put the temporary file on storage and delete it after.
|
||||
*
|
||||
* @param file the temporary file to upload to storage
|
||||
* @return the {@code StorageObject} created
|
||||
* @param file the temporary file to upload to storage
|
||||
* @return the {@code StorageObject} created
|
||||
* @throws IOException If the temporary file can't be read
|
||||
*
|
||||
* @deprecated use {@link #storage()} and {@link InternalStorage#putFile(File)}.
|
||||
*/
|
||||
@Deprecated
|
||||
public URI putTempFile(File file) throws IOException {
|
||||
return this.putTempFile(file, this.storageOutputPrefix.toString(), (String) null);
|
||||
return this.storage.putFile(file);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -533,166 +575,52 @@ public class RunContext {
|
||||
* @param name overwrite file name
|
||||
* @return the {@code StorageObject} created
|
||||
* @throws IOException If the temporary file can't be read
|
||||
*
|
||||
* @deprecated use {@link #storage()} and {@link InternalStorage#putFile(File, String)}.
|
||||
*/
|
||||
@Deprecated
|
||||
public URI putTempFile(File file, String name) throws IOException {
|
||||
return this.putTempFile(file, this.storageOutputPrefix.toString(), name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Put the temporary file on storage and delete it after.
|
||||
* This method is meant to be used by polling triggers, the name of the destination file is derived from the
|
||||
* executionId and the trigger passed as parameters.
|
||||
*
|
||||
* @param file the temporary file to upload to storage
|
||||
* @param executionId overwrite file name
|
||||
* @param trigger the trigger
|
||||
* @return the {@code StorageObject} created
|
||||
* @throws IOException If the temporary file can't be read
|
||||
*/
|
||||
public URI putTempFile(File file, String executionId, AbstractTrigger trigger) throws IOException {
|
||||
return this.putTempFile(
|
||||
file,
|
||||
this.storageOutputPrefix.toString() + "/" + String.join(
|
||||
"/",
|
||||
Arrays.asList(
|
||||
"executions",
|
||||
executionId,
|
||||
"trigger",
|
||||
Slugify.of(trigger.getId())
|
||||
)
|
||||
),
|
||||
(String) null
|
||||
);
|
||||
}
|
||||
|
||||
private URI putTempFile(InputStream inputStream, String prefix, String name) throws IOException {
|
||||
URI uri = URI.create(prefix);
|
||||
URI resolve = uri.resolve(uri.getPath() + "/" + name);
|
||||
|
||||
return this.storageInterface.put(tenantId(), resolve, new BufferedInputStream(inputStream));
|
||||
}
|
||||
|
||||
private URI putTempFile(File file, String prefix, String name) throws IOException {
|
||||
try (InputStream fileInput = new FileInputStream(file)) {
|
||||
return this.putTempFile(fileInput, prefix, (name != null ? name : file.getName()));
|
||||
} finally {
|
||||
try {
|
||||
Files.delete(file.toPath());
|
||||
} catch (IOException e) {
|
||||
runContextLogger.logger().warn("Failed to delete temporary file '{}'", file.toPath(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private String taskStateFilePathPrefix(String name, Boolean isNamespace, Boolean useTaskRun) {
|
||||
Map<String, String> taskrun = (Map<String, String>) this.getVariables().get("taskrun");
|
||||
Map<String, String> flow = (Map<String, String>) this.getVariables().get("flow");
|
||||
|
||||
return "/" + this.storageInterface.statePrefix(
|
||||
flow.get("namespace"),
|
||||
isNamespace ? null : flow.get("id"),
|
||||
name,
|
||||
taskrun != null && useTaskRun ? taskrun.getOrDefault("value", null) : null
|
||||
);
|
||||
return this.storage.putFile(file, name);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public InputStream getTaskStateFile(String state, String name) throws IOException {
|
||||
return this.getTaskStateFile(state, name, false, true);
|
||||
return this.storage.getTaskStateFile(state, name);
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public InputStream getTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
|
||||
URI uri = URI.create(this.taskStateFilePathPrefix(state, isNamespace, useTaskRun));
|
||||
URI resolve = uri.resolve(uri.getPath() + "/" + name);
|
||||
|
||||
return this.storageInterface.get(tenantId(), resolve);
|
||||
return this.storage.getTaskStateFile(state, name, isNamespace, useTaskRun);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public URI putTaskStateFile(byte[] content, String state, String name) throws IOException {
|
||||
return this.putTaskStateFile(content, state, name, false, true);
|
||||
return this.storage.putTaskStateFile(content, state, name);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public URI putTaskStateFile(byte[] content, String state, String name, Boolean namespace, Boolean useTaskRun) throws IOException {
|
||||
try (InputStream inputStream = new ByteArrayInputStream(content)) {
|
||||
return this.putTempFile(
|
||||
inputStream,
|
||||
this.taskStateFilePathPrefix(state, namespace, useTaskRun),
|
||||
name
|
||||
);
|
||||
}
|
||||
return this.storage.putTaskStateFile(content, state, name, namespace, useTaskRun);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public URI putTaskStateFile(File file, String state, String name) throws IOException {
|
||||
return this.putTaskStateFile(file, state, name, false, true);
|
||||
return this.storage.putTaskStateFile(file, state, name);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public URI putTaskStateFile(File file, String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
|
||||
return this.putTempFile(
|
||||
file,
|
||||
this.taskStateFilePathPrefix(state, isNamespace, useTaskRun),
|
||||
name
|
||||
);
|
||||
return this.storage.putTaskStateFile(file, state, name, isNamespace, useTaskRun);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public boolean deleteTaskStateFile(String state, String name) throws IOException {
|
||||
return this.deleteTaskStateFile(state, name, false, true);
|
||||
return this.storage.deleteTaskStateFile(state, name);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public boolean deleteTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
|
||||
URI uri = URI.create(this.taskStateFilePathPrefix(state, isNamespace, useTaskRun));
|
||||
URI resolve = uri.resolve(uri.getPath() + "/" + name);
|
||||
|
||||
return this.storageInterface.delete(tenantId(), resolve);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get from the internal storage the cache file corresponding to this task.
|
||||
* If the cache file didn't exist, an empty Optional is returned.
|
||||
*
|
||||
* @param namespace the flow namespace
|
||||
* @param flowId the flow identifier
|
||||
* @param taskId the task identifier
|
||||
* @param value optional, the task run value
|
||||
*
|
||||
* @return an Optional with the cache input stream or empty.
|
||||
*/
|
||||
public Optional<InputStream> getTaskCacheFile(String namespace, String flowId, String taskId, String value) throws IOException {
|
||||
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
|
||||
return this.storageInterface.exists(tenantId(), uri) ? Optional.of(this.storageInterface.get(tenantId(), uri)) : Optional.empty();
|
||||
}
|
||||
|
||||
public Optional<Long> getTaskCacheFileLastModifiedTime(String namespace, String flowId, String taskId, String value) throws IOException {
|
||||
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
|
||||
return this.storageInterface.exists(tenantId(), uri) ? Optional.of(this.storageInterface.getAttributes(tenantId(), uri).getLastModifiedTime()) : Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Put into the internal storage the cache file corresponding to this task.
|
||||
*
|
||||
* @param file the cache as a ZIP archive
|
||||
* @param namespace the flow namespace
|
||||
* @param flowId the flow identifier
|
||||
* @param taskId the task identifier
|
||||
* @param value optional, the task run value
|
||||
*
|
||||
* @return the URI of the file inside the internal storage.
|
||||
*/
|
||||
public URI putTaskCacheFile(File file, String namespace, String flowId, String taskId, String value) throws IOException {
|
||||
return this.putTempFile(
|
||||
file,
|
||||
"/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value),
|
||||
"cache.zip"
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<Boolean> deleteTaskCacheFile(String namespace, String flowId, String taskId, String value) throws IOException {
|
||||
URI uri = URI.create("/" + this.storageInterface.cachePrefix(namespace, flowId, taskId, value) + "/cache.zip");
|
||||
return this.storageInterface.exists(tenantId(), uri) ? Optional.of(this.storageInterface.delete(tenantId(), uri)) : Optional.empty();
|
||||
}
|
||||
|
||||
public List<URI> purgeStorageExecution() throws IOException {
|
||||
return this.storageInterface.deleteByPrefix(tenantId(), this.storageExecutionPrefix);
|
||||
return this.storage.deleteTaskStateFile(state, name, isNamespace, useTaskRun);
|
||||
}
|
||||
|
||||
public List<AbstractMetricEntry<?>> metrics() {
|
||||
|
||||
@@ -74,7 +74,7 @@ public class RunnerUtils {
|
||||
throw new RuntimeException("Can't upload");
|
||||
}
|
||||
|
||||
URI from = storageInterface.from(flow, execution, file.getFilename(), tempFile);
|
||||
URI from = storageInterface.from(execution, file.getFilename(), tempFile);
|
||||
//noinspection ResultOfMethodCallIgnored
|
||||
tempFile.delete();
|
||||
|
||||
@@ -211,7 +211,7 @@ public class RunnerUtils {
|
||||
} else {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
input.getName(),
|
||||
storageInterface.from(flow, execution, input, new File(((String) current)))
|
||||
storageInterface.from(execution, input.getName(), new File(((String) current)))
|
||||
));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
@@ -47,7 +48,7 @@ public class ReadFileFunction implements Function {
|
||||
@SuppressWarnings("unchecked")
|
||||
private String readFromNamespaceFile(EvaluationContext context, String path) throws IOException {
|
||||
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
|
||||
URI namespaceFile = URI.create(storageInterface.namespaceFilePrefix(flow.get("namespace")) + "/" + path);
|
||||
URI namespaceFile = URI.create(StorageContext.namespaceFilePrefix(flow.get("namespace")) + "/" + path);
|
||||
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), namespaceFile)) {
|
||||
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.LogRepositoryInterface;
|
||||
import io.kestra.core.repositories.MetricRepositoryInterface;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.tasks.flows.WorkingDirectory;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
@@ -271,8 +272,8 @@ public class ExecutionService {
|
||||
}
|
||||
|
||||
if (purgeStorage) {
|
||||
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), URI.create("kestra://" + storageInterface.executionPrefix(
|
||||
execution))).size());
|
||||
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
|
||||
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), uri).size());
|
||||
}
|
||||
|
||||
return (PurgeResult) builder.build();
|
||||
|
||||
308
core/src/main/java/io/kestra/core/storages/InternalStorage.java
Normal file
308
core/src/main/java/io/kestra/core/storages/InternalStorage.java
Normal file
@@ -0,0 +1,308 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* The default {@link Storage} implementation acting as a facade to the {@link StorageInterface}.
|
||||
*/
|
||||
public class InternalStorage implements Storage {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(InternalStorage.class);
|
||||
|
||||
private static final String PATH_SEPARATOR = "/";
|
||||
|
||||
private final Logger logger;
|
||||
private final StorageContext context;
|
||||
private final StorageInterface storage;
|
||||
|
||||
/**
|
||||
* Creates a new {@link InternalStorage} instance.
|
||||
*
|
||||
* @param context The storage context.
|
||||
* @param storage The storage to delegate operations.
|
||||
*/
|
||||
public InternalStorage(StorageContext context, StorageInterface storage) {
|
||||
this(LOG, context, storage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link InternalStorage} instance.
|
||||
*
|
||||
* @param logger The logger to be used by this class.
|
||||
* @param context The storage context.
|
||||
* @param storage The storage to delegate operations.
|
||||
*/
|
||||
public InternalStorage(Logger logger, StorageContext context, StorageInterface storage) {
|
||||
this.logger = logger;
|
||||
this.context = context;
|
||||
this.storage = storage;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public boolean isFileExist(URI uri) {
|
||||
return this.storage.exists(context.getTenantId(), uri);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public InputStream getFile(final URI uri) throws IOException {
|
||||
if (uri == null) {
|
||||
throw new IllegalArgumentException("Invalid internal storage uri, got null");
|
||||
}
|
||||
|
||||
String scheme = uri.getScheme();
|
||||
if (scheme == null) {
|
||||
throw new IllegalArgumentException("Invalid internal storage uri, got uri '" + uri + "'");
|
||||
}
|
||||
|
||||
if (!scheme.equals("kestra")) {
|
||||
throw new IllegalArgumentException("Invalid internal storage scheme, got uri '" + uri + "'");
|
||||
}
|
||||
|
||||
return this.storage.get(context.getTenantId(), uri);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public List<URI> deleteExecutionFiles() throws IOException {
|
||||
return this.storage.deleteByPrefix(context.getTenantId(), context.getExecutionStorageURI());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI getContextBaseURI() {
|
||||
return this.context.getContextStorageURI();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI putFile(InputStream inputStream, String name) throws IOException {
|
||||
URI uri = context.getContextStorageURI();
|
||||
URI resolved = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
|
||||
return this.storage.put(context.getTenantId(), resolved, new BufferedInputStream(inputStream));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI putFile(InputStream inputStream, URI uri) throws IOException {
|
||||
return this.storage.put(context.getTenantId(), uri, new BufferedInputStream(inputStream));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI putFile(File file) throws IOException {
|
||||
return putFile(file, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI putFile(File file, String name) throws IOException {
|
||||
URI uri = context.getContextStorageURI();
|
||||
URI resolved = uri.resolve(uri.getPath() + PATH_SEPARATOR + (name != null ? name : file.getName()));
|
||||
try (InputStream is = new FileInputStream(file)) {
|
||||
return putFile(is, resolved);
|
||||
} finally {
|
||||
try {
|
||||
Files.delete(file.toPath());
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to delete temporary file '{}'", file.toPath(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public InputStream getTaskStateFile(String state, String name) throws IOException {
|
||||
return getTaskStateFile(state, name, false, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public InputStream getTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
|
||||
URI uri = URI.create(getStatePrefix(state, isNamespace, useTaskRun));
|
||||
URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
|
||||
return this.storage.get(context.getTenantId(), resolve);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI putTaskStateFile(byte[] content, String state, String name) throws IOException {
|
||||
return this.putTaskStateFile(content, state, name, false, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI putTaskStateFile(byte[] content, String state, String name, Boolean namespace, Boolean useTaskRun) throws IOException {
|
||||
try (InputStream inputStream = new ByteArrayInputStream(content)) {
|
||||
return this.putFile(inputStream, getStatePrefix(state, namespace, useTaskRun), name);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI putTaskStateFile(File file, String state, String name) throws IOException {
|
||||
return this.putTaskStateFile(file, state, name, false, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI putTaskStateFile(File file, String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
|
||||
return this.putFileAndDelete(file, getStatePrefix(state, isNamespace, useTaskRun), name);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public boolean deleteTaskStateFile(String state, String name) throws IOException {
|
||||
return this.deleteTaskStateFile(state, name, false, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public boolean deleteTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException {
|
||||
URI uri = URI.create(getStatePrefix(state, isNamespace, useTaskRun));
|
||||
URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
|
||||
return this.storage.delete(context.getTenantId(), resolve);
|
||||
}
|
||||
|
||||
private String getStatePrefix(String name, Boolean isNamespace, Boolean useTaskRun) {
|
||||
if (useTaskRun) {
|
||||
return getTaskStorageContext()
|
||||
.map(context -> context.getStateStorePrefix(name, isNamespace, context.getTaskRunValue()))
|
||||
.orElseThrow(() -> new IllegalStateException("Cannot get task state from: " + context));
|
||||
}
|
||||
return context.getStateStorePrefix(name, isNamespace, null);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public Optional<InputStream> getCacheFile(final String cacheId,
|
||||
final @Nullable String objectId,
|
||||
final @Nullable Duration ttl) throws IOException {
|
||||
if (ttl != null) {
|
||||
var maybeLastModifiedTime = getCacheFileLastModifiedTime(cacheId, objectId);
|
||||
if (maybeLastModifiedTime.isPresent()) {
|
||||
if (Instant.now().isAfter(Instant.ofEpochMilli(maybeLastModifiedTime.get()).plus(ttl))) {
|
||||
logger.debug("Cache is expired for cache-id={}, object-id={}, and ttl={}, deleting it",
|
||||
cacheId,
|
||||
objectId,
|
||||
ttl.toMillis()
|
||||
);
|
||||
deleteCacheFile(cacheId, objectId);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
||||
URI uri = context.getCacheURI(cacheId, objectId);
|
||||
return isFileExist(uri) ?
|
||||
Optional.of(storage.get(context.getTenantId(), uri)) :
|
||||
Optional.empty();
|
||||
}
|
||||
|
||||
private Optional<Long> getCacheFileLastModifiedTime(String cacheId, @Nullable String objectId) throws IOException {
|
||||
URI uri = context.getCacheURI(cacheId, objectId);
|
||||
return isFileExist(uri) ?
|
||||
Optional.of(this.storage.getAttributes(context.getTenantId(), uri).getLastModifiedTime()) :
|
||||
Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI putCacheFile(File file, String cacheId, @Nullable String objectId) throws IOException {
|
||||
URI uri = context.getCacheURI(cacheId, objectId);
|
||||
return this.putFileAndDelete(file, uri);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public Optional<Boolean> deleteCacheFile(String cacheId, @Nullable String objectId) throws IOException {
|
||||
URI uri = context.getCacheURI(cacheId, objectId);
|
||||
return isFileExist(uri) ?
|
||||
Optional.of(this.storage.delete(context.getTenantId(), uri)) :
|
||||
Optional.empty();
|
||||
}
|
||||
|
||||
private URI putFileAndDelete(File file, URI uri) throws IOException {
|
||||
try (InputStream is = new FileInputStream(file)) {
|
||||
return this.putFile(is, uri);
|
||||
} finally {
|
||||
try {
|
||||
Files.delete(file.toPath());
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to delete temporary file '{}'", file.toPath(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private URI putFileAndDelete(File file, String prefix, String name) throws IOException {
|
||||
URI uri = URI.create(prefix);
|
||||
URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + (name != null ? name : file.getName()));
|
||||
return putFileAndDelete(file, resolve);
|
||||
}
|
||||
|
||||
private URI putFile(InputStream inputStream, String prefix, String name) throws IOException {
|
||||
URI uri = URI.create(prefix);
|
||||
URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
|
||||
return this.storage.put(context.getTenantId(), resolve, new BufferedInputStream(inputStream));
|
||||
}
|
||||
|
||||
private Optional<StorageContext.Task> getTaskStorageContext() {
|
||||
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
|
||||
}
|
||||
}
|
||||
159
core/src/main/java/io/kestra/core/storages/Storage.java
Normal file
159
core/src/main/java/io/kestra/core/storages/Storage.java
Normal file
@@ -0,0 +1,159 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Service interface for accessing the Kestra's storage.
|
||||
*/
|
||||
public interface Storage {
|
||||
|
||||
/**
|
||||
* Checks whether the given URI points to an exiting file/object in the internal storage.
|
||||
*
|
||||
* @param uri the URI of the file/object in the internal storage.
|
||||
* @return {@code true} if the URI points to a file/object that exists in the internal storage.
|
||||
*/
|
||||
boolean isFileExist(URI uri);
|
||||
|
||||
/**
|
||||
* Retrieve an {@link InputStream} for the given file URI.
|
||||
*
|
||||
* @param uri the file URI.
|
||||
* @return the {@link InputStream}.
|
||||
* @throws IllegalArgumentException if the given {@link URI} is {@code null} or invalid.
|
||||
* @throws IOException if an error happens while accessing the file.
|
||||
*/
|
||||
InputStream getFile(URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes all the files for the current execution.
|
||||
*
|
||||
* @return The URIs of the deleted files.
|
||||
* @throws IOException if an error happened while deleting files.
|
||||
*/
|
||||
List<URI> deleteExecutionFiles() throws IOException;
|
||||
|
||||
/**
|
||||
* Gets the storage base URI for the current context.
|
||||
*
|
||||
* @return the URI.
|
||||
*/
|
||||
URI getContextBaseURI();
|
||||
|
||||
/**
|
||||
* Stores a file with the given name for the given {@link InputStream} into Kestra's storage.
|
||||
*
|
||||
* @param inputStream the {@link InputStream} of the file content.
|
||||
* @param name the name of the file on the Kestra's storage.
|
||||
* @return the URI of the file/object in the internal storage.
|
||||
* @throws IOException if an error occurs while storing the file.
|
||||
*/
|
||||
URI putFile(InputStream inputStream, String name) throws IOException;
|
||||
|
||||
/**
|
||||
* Stores a file with the given name for the given {@link InputStream} into Kestra's storage.
|
||||
*
|
||||
* @param inputStream the {@link InputStream} of the file content.
|
||||
* @param uri the target URI of the file to be stored in the storage.
|
||||
* @return the URI of the file/object in the internal storage.
|
||||
* @throws IOException if an error occurs while storing the file.
|
||||
*/
|
||||
URI putFile(InputStream inputStream, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Stores a copy of the given file into Kestra's storage, and deletes the original file.
|
||||
*
|
||||
* @param file the file to be store.
|
||||
* @return the URI of the stored object.
|
||||
* @throws IOException if an error occurs while storing the file.
|
||||
*/
|
||||
URI putFile(File file) throws IOException;
|
||||
|
||||
/**
|
||||
* Stores a copy of the given file into Kestra's storage with the specified name, and deletes the original file.
|
||||
*
|
||||
* @param file the file to be store.
|
||||
* @param name the name of the file on the Kestra's storage.
|
||||
* @return the URI of the stored object.
|
||||
* @throws IOException if an error occurs while storing the file.
|
||||
*/
|
||||
URI putFile(File file, String name) throws IOException;
|
||||
|
||||
// ==============================================================
|
||||
// STATE STORE
|
||||
// ==============================================================
|
||||
InputStream getTaskStateFile(String state, String name) throws IOException;
|
||||
|
||||
InputStream getTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException;
|
||||
|
||||
URI putTaskStateFile(byte[] content, String state, String name) throws IOException;
|
||||
|
||||
URI putTaskStateFile(byte[] content, String state, String name, Boolean namespace, Boolean useTaskRun) throws IOException;
|
||||
|
||||
URI putTaskStateFile(File file, String state, String name) throws IOException;
|
||||
|
||||
URI putTaskStateFile(File file, String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException;
|
||||
|
||||
boolean deleteTaskStateFile(String state, String name) throws IOException;
|
||||
|
||||
boolean deleteTaskStateFile(String state, String name, Boolean isNamespace, Boolean useTaskRun) throws IOException;
|
||||
|
||||
|
||||
// ==============================================================
|
||||
// CACHING
|
||||
// ==============================================================
|
||||
|
||||
/**
|
||||
* Gets the cache file from the Kestra's storage for the given cacheID and objectID.
|
||||
* If the cache file didn't exist, an empty Optional is returned.
|
||||
*
|
||||
* @param cacheId the ID of the cache.
|
||||
* @param objectId the ID object cached object (optional).
|
||||
* @return an Optional with the cache input stream or empty.
|
||||
* @throws IOException if an error occurs during the operation.
|
||||
*/
|
||||
default Optional<InputStream> getCacheFile(String cacheId, @Nullable String objectId) throws IOException {
|
||||
return getCacheFile(cacheId, objectId, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the cache file from the Kestra's storage for the given cacheID and objectID.
|
||||
* If the cache file didn't exist or has expired based on the given TTL, an empty Optional is returned.
|
||||
*
|
||||
* @param cacheId the ID of the cache.
|
||||
* @param objectId the ID object cached object (optional).
|
||||
* @param ttl the time-to-live duration of the cache.
|
||||
* @return an Optional with the cache input stream or empty.
|
||||
* @throws IOException if an error occurs during the operation.
|
||||
*/
|
||||
Optional<InputStream> getCacheFile(String cacheId, @Nullable String objectId, @Nullable Duration ttl) throws IOException;
|
||||
|
||||
/**
|
||||
* Caches the given file into Kestra's storage with the given cache ID.
|
||||
*
|
||||
* @param file the cache as a ZIP archive
|
||||
* @param cacheId the ID of the cache.
|
||||
* @param objectId the ID object cached object (optional).
|
||||
* @return the URI of the file inside the internal storage.
|
||||
* @throws IOException if an error occurs during the operation.
|
||||
*/
|
||||
URI putCacheFile(File file, String cacheId, @Nullable String objectId) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes the cache file.
|
||||
*
|
||||
* @param cacheId the ID of the cache.
|
||||
* @param objectId the ID object cached object (optional).
|
||||
* @return {@code true} if the cache file was removed/. Otherwise {@code false}.
|
||||
* @throws IOException if an error occurs during the operation.
|
||||
*/
|
||||
Optional<Boolean> deleteCacheFile(String cacheId, @Nullable String objectId) throws IOException;
|
||||
}
|
||||
444
core/src/main/java/io/kestra/core/storages/StorageContext.java
Normal file
444
core/src/main/java/io/kestra/core/storages/StorageContext.java
Normal file
@@ -0,0 +1,444 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.hash.Hashing;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Context used for storing and retrieving data from Kestra's storage.
|
||||
*/
|
||||
@Getter
|
||||
public class StorageContext {
|
||||
|
||||
public static final String KESTRA_SCHEME = "kestra";
|
||||
|
||||
// /{namespace}/_files
|
||||
static final String PREFIX_FORMAT_NAMESPACE_FILE = "/%s/_files";
|
||||
// /{namespace}/{flow-id}
|
||||
static final String PREFIX_FORMAT_FLOWS = "/%s/%s";
|
||||
// /{namespace}/{flow-id}/executions/{execution-id}
|
||||
static final String PREFIX_FORMAT_EXECUTIONS = "/%s/%s/executions/%s";
|
||||
// /{namespace}/{flow-id}/executions/{execution-id}/tasks/{task-id}/{task-run-id}
|
||||
static final String PREFIX_FORMAT_TASK = "/%s/%s/executions/%s/tasks/%s/%s";
|
||||
// /{namespace}/{flow-id}/executions/{execution-id}/trigger/{trigger-id}
|
||||
static final String PREFIX_FORMAT_TRIGGER = "/%s/%s/executions/%s/trigger/%s";
|
||||
// /{namespace}/{flow-id}/executions/{execution-id}/inputs/{input-name}/{file-name}
|
||||
static final String PREFIX_FORMAT_INPUTS = "/%s/%s/executions/%s/inputs/%s/%s";
|
||||
// /{namespace}/{flow-id}/{cache-id}/cache/{object-id}/cache.zip
|
||||
static final String PREFIX_FORMAT_CACHE_OBJECT = "/%s/%s/%s/cache/%s/cache.zip";
|
||||
// /{namespace}/{flow-id}/{cache-id}/cache/cache.zip
|
||||
static final String PREFIX_FORMAT_CACHE = "/%s/%s/%s/cache/cache.zip";
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link TaskRun}.
|
||||
*/
|
||||
public static StorageContext forTask(TaskRun taskRun) {
|
||||
return new StorageContext.Task(
|
||||
taskRun.getTenantId(),
|
||||
taskRun.getNamespace(),
|
||||
taskRun.getFlowId(),
|
||||
taskRun.getExecutionId(),
|
||||
taskRun.getTaskId(),
|
||||
taskRun.getId(),
|
||||
taskRun.getValue()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link Flow}.
|
||||
*/
|
||||
public static StorageContext forFlow(Flow flow) {
|
||||
return new StorageContext(flow.getTenantId(), flow.getNamespace(), flow.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link Execution}.
|
||||
*/
|
||||
public static StorageContext forExecution(Execution execution) {
|
||||
return forExecution(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link StorageContext} scoped to a given Execution.
|
||||
*/
|
||||
public static StorageContext forExecution(@Nullable String tenantId,
|
||||
String namespace,
|
||||
String flowId,
|
||||
String executionId) {
|
||||
return new StorageContext(tenantId, namespace, flowId, executionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link Execution} and input.
|
||||
*/
|
||||
public static StorageContext.Input forInput(Execution execution,
|
||||
String inputName,
|
||||
String fileName) {
|
||||
return new StorageContext.Input(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId(), inputName, fileName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link StorageContext} scoped to a given Task.
|
||||
*/
|
||||
public static StorageContext.Task forTask(@Nullable String tenantId,
|
||||
String namespace,
|
||||
String flowId,
|
||||
String executionId,
|
||||
String taskId,
|
||||
String taskRunId,
|
||||
@Nullable String taskRunValue) {
|
||||
return new StorageContext.Task(tenantId, namespace, flowId, executionId, taskId, taskRunId, taskRunValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link StorageContext} scoped to a given Trigger.
|
||||
*/
|
||||
public static StorageContext.Trigger forTrigger(@Nullable String tenantId,
|
||||
String namespace,
|
||||
String flowId,
|
||||
String executionId,
|
||||
String triggerId) {
|
||||
return new StorageContext.Trigger(tenantId, namespace, flowId, executionId, triggerId);
|
||||
}
|
||||
|
||||
private final String tenantId;
|
||||
private final String namespace;
|
||||
private final String flowId;
|
||||
private final String executionId;
|
||||
|
||||
@VisibleForTesting
|
||||
public StorageContext() {
|
||||
this.tenantId = null;
|
||||
this.namespace = null;
|
||||
this.flowId = null;
|
||||
this.executionId = null;
|
||||
}
|
||||
|
||||
private StorageContext(final @Nullable String tenantId,
|
||||
final String namespace,
|
||||
final String flowId) {
|
||||
this.tenantId = tenantId;
|
||||
this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null");
|
||||
this.flowId = Objects.requireNonNull(flowId, "flowId cannot be null");
|
||||
this.executionId = null;
|
||||
}
|
||||
|
||||
private StorageContext(final @Nullable String tenantId,
|
||||
final String namespace,
|
||||
final String flowId,
|
||||
final String executionId) {
|
||||
this.tenantId = tenantId;
|
||||
this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null");
|
||||
this.flowId = Objects.requireNonNull(flowId, "flowId cannot be null");
|
||||
this.executionId = Objects.requireNonNull(executionId, "executionId cannot be null");
|
||||
}
|
||||
|
||||
public static Optional<String> extractExecutionId(URI path) {
|
||||
Pattern pattern = Pattern.compile("^/(.+)/executions/([^/]+)/", Pattern.CASE_INSENSITIVE);
|
||||
Matcher matcher = pattern.matcher(path.getPath());
|
||||
|
||||
if (!matcher.find() || matcher.group(2).isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return Optional.of(matcher.group(2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the storage URI of the given cacheID, and optionally the given objectID.
|
||||
*
|
||||
* @param cacheId the ID of the cache.
|
||||
* @param objectId the ID object cached object (optional).
|
||||
* @return the URI
|
||||
*/
|
||||
public URI getCacheURI(final String cacheId, @Nullable final String objectId) {
|
||||
Objects.requireNonNull(cacheId, "Cannot create URI with id null");
|
||||
|
||||
final String prefix;
|
||||
if (objectId == null) {
|
||||
prefix = String.format(
|
||||
PREFIX_FORMAT_CACHE,
|
||||
getNamespaceAsPath(),
|
||||
Slugify.of(getFlowId()),
|
||||
Slugify.of(cacheId)
|
||||
);
|
||||
} else {
|
||||
String hashedObjectId = Hashing.goodFastHash(64)
|
||||
.hashString(objectId, Charsets.UTF_8)
|
||||
.toString();
|
||||
prefix = String.format(
|
||||
PREFIX_FORMAT_CACHE_OBJECT,
|
||||
getNamespaceAsPath(),
|
||||
Slugify.of(getFlowId()),
|
||||
Slugify.of(cacheId),
|
||||
hashedObjectId
|
||||
);
|
||||
}
|
||||
return URI.create(prefix);
|
||||
}
|
||||
|
||||
public String getNamespaceAsPath() {
|
||||
return getNamespace().replace(".", "/");
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the storage prefix for the given state store ID.
|
||||
*
|
||||
* @param id the primary ID of the state.
|
||||
* @param isNamespace specify whether the state is on namespace or flow level.
|
||||
* @param value the secondary ID (e.g., the runTaskValue).
|
||||
* @return the storage prefix.
|
||||
*/
|
||||
public String getStateStorePrefix(String id, Boolean isNamespace, String value) {
|
||||
ArrayList<String> paths = new ArrayList<>(List.of(getNamespaceAsPath()));
|
||||
|
||||
if (!isNamespace) {
|
||||
paths.add(Slugify.of(getFlowId()));
|
||||
}
|
||||
|
||||
paths.add("states");
|
||||
|
||||
if (id != null) {
|
||||
paths.add(id);
|
||||
}
|
||||
|
||||
if (value != null) {
|
||||
paths.add(Hashing
|
||||
.goodFastHash(64)
|
||||
.hashString(value, Charsets.UTF_8)
|
||||
.toString()
|
||||
);
|
||||
}
|
||||
|
||||
return "/" + String.join("/", paths);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the base storage URI for the current {@link io.kestra.core.models.flows.Flow}.
|
||||
*
|
||||
* @return the {@link URI}.
|
||||
*/
|
||||
public URI getFlowStorageURI() {
|
||||
try {
|
||||
var prefix = String.format(PREFIX_FORMAT_FLOWS, getNamespaceAsPath(), Slugify.of(getFlowId()));
|
||||
return new URI("//" + prefix);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the base storage URI for the current {@link Execution}.
|
||||
*
|
||||
* @return the {@link URI}.
|
||||
*/
|
||||
public URI getExecutionStorageURI() {
|
||||
return getExecutionStorageURI(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the base storage URI for the current {@link Execution}.
|
||||
*
|
||||
* @param scheme The scheme name.
|
||||
* @return the {@link URI}.
|
||||
*/
|
||||
public URI getExecutionStorageURI(@Nullable String scheme) {
|
||||
try {
|
||||
var schemePrefix = Optional.ofNullable(scheme)
|
||||
.map(s -> s.endsWith("://") ? s : s + "://")
|
||||
.orElse("//");
|
||||
|
||||
var prefix = String.format(PREFIX_FORMAT_EXECUTIONS,
|
||||
getNamespaceAsPath(),
|
||||
Slugify.of(flowId),
|
||||
executionId
|
||||
);
|
||||
return new URI(schemePrefix + prefix);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the base storage URI for this context.
|
||||
*
|
||||
* @return the {@link URI}.
|
||||
*/
|
||||
public URI getContextStorageURI() {
|
||||
return getExecutionStorageURI();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StorageContext::Execution";
|
||||
}
|
||||
|
||||
|
||||
public static String namespaceFilePrefix(String namespace) {
|
||||
return String.format(PREFIX_FORMAT_NAMESPACE_FILE, namespace.replace(".", "/"));
|
||||
}
|
||||
|
||||
/**
|
||||
* A storage context scoped to a Task.
|
||||
*/
|
||||
@Getter
|
||||
public static class Task extends StorageContext {
|
||||
|
||||
private final String taskId;
|
||||
private final String taskRunId;
|
||||
private final String taskRunValue;
|
||||
|
||||
private Task(final String tenantId,
|
||||
final String namespace,
|
||||
final String flowId,
|
||||
final String executionId,
|
||||
final String taskId,
|
||||
final String taskRunId,
|
||||
@Nullable final String taskRunValue) {
|
||||
super(tenantId, namespace, flowId, executionId);
|
||||
this.taskId = Objects.requireNonNull(taskId, "taskID cannot be null");
|
||||
this.taskRunId = Objects.requireNonNull(taskRunId, "taskRunID cannot be null");
|
||||
this.taskRunValue = taskRunValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI getContextStorageURI() {
|
||||
try {
|
||||
var prefix = String.format(
|
||||
PREFIX_FORMAT_TASK,
|
||||
getNamespaceAsPath(),
|
||||
Slugify.of(getFlowId()),
|
||||
getExecutionId(),
|
||||
Slugify.of(getTaskId()),
|
||||
getTaskRunId()
|
||||
);
|
||||
return new URI("//" + prefix);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StorageContext::Task";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A storage context scoped to a Trigger.
|
||||
*/
|
||||
@Getter
|
||||
public static class Trigger extends StorageContext {
|
||||
|
||||
|
||||
private final String triggerId;
|
||||
|
||||
private Trigger(final String tenantId,
|
||||
final String namespace,
|
||||
final String flowId,
|
||||
final String executionId,
|
||||
final String triggerId) {
|
||||
super(tenantId, namespace, flowId, executionId);
|
||||
this.triggerId = Objects.requireNonNull(triggerId, "triggerId cannot be null");
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI getContextStorageURI() {
|
||||
try {
|
||||
String prefix = String.format(PREFIX_FORMAT_TRIGGER,
|
||||
getNamespaceAsPath(),
|
||||
Slugify.of(getFlowId()),
|
||||
getExecutionId(),
|
||||
Slugify.of(getTriggerId())
|
||||
);
|
||||
return new URI("//" + prefix);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StorageContext::Trigger";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A storage context scoped to a Trigger.
|
||||
*/
|
||||
@Getter
|
||||
public static class Input extends StorageContext {
|
||||
|
||||
private final String inputName;
|
||||
private final String fileName;
|
||||
|
||||
private Input(final String tenantId,
|
||||
final String namespace,
|
||||
final String flowId,
|
||||
final String executionId,
|
||||
final String inputName,
|
||||
final String fileName) {
|
||||
super(tenantId, namespace, flowId, executionId);
|
||||
this.inputName = Objects.requireNonNull(inputName, "inputName cannot be null");
|
||||
this.fileName = Objects.requireNonNull(fileName, "fileName cannot be null");
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public URI getContextStorageURI() {
|
||||
try {
|
||||
var prefix = String.format(
|
||||
PREFIX_FORMAT_INPUTS,
|
||||
getNamespaceAsPath(),
|
||||
Slugify.of(getFlowId()),
|
||||
getExecutionId(),
|
||||
inputName,
|
||||
fileName
|
||||
);
|
||||
return new URI("//" + prefix);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StorageContext::Input";
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,29 +1,17 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.hash.Hashing;
|
||||
import io.kestra.core.annotations.Retryable;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Introspected
|
||||
public interface StorageInterface {
|
||||
@@ -32,6 +20,7 @@ public interface StorageInterface {
|
||||
|
||||
/**
|
||||
* Returns all objects that start with the given prefix
|
||||
*
|
||||
* @param includeDirectories whether to include directories in the given results or not. If true, directories' uri will have a trailing '/'
|
||||
* @return Kestra's internal storage uris of the found objects
|
||||
*/
|
||||
@@ -41,7 +30,6 @@ public interface StorageInterface {
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
List<FileAttributes> list(String tenantId, URI uri) throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Whether the uri points to a file/object that exist in the internal storage.
|
||||
*
|
||||
@@ -51,7 +39,7 @@ public interface StorageInterface {
|
||||
*/
|
||||
@SuppressWarnings("try")
|
||||
default boolean exists(String tenantId, URI uri) {
|
||||
try (InputStream ignored = get(tenantId, uri)){
|
||||
try (InputStream ignored = get(tenantId, uri)) {
|
||||
return true;
|
||||
} catch (IOException ieo) {
|
||||
return false;
|
||||
@@ -76,176 +64,9 @@ public interface StorageInterface {
|
||||
@Retryable(includes = {IOException.class})
|
||||
List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException;
|
||||
|
||||
default String executionPrefix(Flow flow, Execution execution) {
|
||||
return fromParts(
|
||||
flow.getNamespace().replace(".", "/"),
|
||||
Slugify.of(flow.getId()),
|
||||
"executions",
|
||||
execution.getId()
|
||||
);
|
||||
}
|
||||
|
||||
default String executionPrefix(Execution execution) {
|
||||
return fromParts(
|
||||
execution.getNamespace().replace(".", "/"),
|
||||
Slugify.of(execution.getFlowId()),
|
||||
"executions",
|
||||
execution.getId()
|
||||
);
|
||||
}
|
||||
|
||||
default String executionPrefix(TaskRun taskRun) {
|
||||
return fromParts(
|
||||
taskRun.getNamespace().replace(".", "/"),
|
||||
Slugify.of(taskRun.getFlowId()),
|
||||
"executions",
|
||||
taskRun.getExecutionId()
|
||||
);
|
||||
}
|
||||
|
||||
default String statePrefix(String namespace, @Nullable String flowId, @Nullable String name, @Nullable String value) {
|
||||
String namespacePrefix = namespace.replace(".", "/");
|
||||
|
||||
ArrayList<String> paths = new ArrayList<>(
|
||||
List.of(
|
||||
namespacePrefix
|
||||
)
|
||||
);
|
||||
|
||||
if (flowId != null) {
|
||||
paths.add(Slugify.of(flowId));
|
||||
}
|
||||
|
||||
paths.add("states");
|
||||
|
||||
if (name != null) {
|
||||
paths.add(name);
|
||||
}
|
||||
|
||||
if (value != null) {
|
||||
paths.add(Hashing
|
||||
.goodFastHash(64)
|
||||
.hashString(value, Charsets.UTF_8)
|
||||
.toString()
|
||||
);
|
||||
}
|
||||
|
||||
return String.join("/", paths);
|
||||
}
|
||||
|
||||
default String cachePrefix(String namespace, String flowId, String taskId, @Nullable String value) {
|
||||
String namespacePrefix = namespace.replace(".", "/");
|
||||
|
||||
ArrayList<String> paths = new ArrayList<>(
|
||||
List.of(
|
||||
namespacePrefix,
|
||||
Slugify.of(flowId),
|
||||
Slugify.of(taskId),
|
||||
"cache"
|
||||
)
|
||||
);
|
||||
|
||||
if (value != null) {
|
||||
paths.add(Hashing
|
||||
.goodFastHash(64)
|
||||
.hashString(value, Charsets.UTF_8)
|
||||
.toString()
|
||||
);
|
||||
}
|
||||
|
||||
return String.join("/", paths);
|
||||
}
|
||||
|
||||
default String namespaceFilePrefix(String namespace) {
|
||||
return fromParts(
|
||||
namespace.replace(".", "/"),
|
||||
"_files"
|
||||
);
|
||||
}
|
||||
|
||||
default Optional<String> extractExecutionId(URI path) {
|
||||
Pattern pattern = Pattern.compile("^/(.+)/executions/([^/]+)/", Pattern.CASE_INSENSITIVE);
|
||||
Matcher matcher = pattern.matcher(path.getPath());
|
||||
|
||||
if (!matcher.find() || matcher.group(2).isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return Optional.of(matcher.group(2));
|
||||
}
|
||||
|
||||
default URI uri(Flow flow, Execution execution, String inputName, String file) throws URISyntaxException {
|
||||
return new URI("/" + fromParts(
|
||||
executionPrefix(flow, execution),
|
||||
"inputs",
|
||||
inputName,
|
||||
file
|
||||
));
|
||||
}
|
||||
|
||||
@Retryable(includes = {IOException.class})
|
||||
default URI from(Flow flow, Execution execution, String input, File file) throws IOException {
|
||||
try {
|
||||
return this.put(
|
||||
flow.getTenantId(),
|
||||
this.uri(flow, execution, input, file.getName()),
|
||||
new BufferedInputStream(new FileInputStream(file))
|
||||
);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Retryable(includes = {IOException.class})
|
||||
default URI from(Flow flow, Execution execution, Input<?> input, File file) throws IOException {
|
||||
return this.from(flow, execution, input.getName(), file);
|
||||
}
|
||||
|
||||
default URI outputPrefix(Flow flow) {
|
||||
try {
|
||||
return new URI("//" + fromParts(
|
||||
flow.getNamespace().replace(".", "/"),
|
||||
Slugify.of(flow.getId())
|
||||
));
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
default URI outputPrefix(Flow flow, Task task, Execution execution, TaskRun taskRun) {
|
||||
try {
|
||||
return new URI("//" + fromParts(
|
||||
flow.getNamespace().replace(".", "/"),
|
||||
Slugify.of(flow.getId()),
|
||||
"executions",
|
||||
execution.getId(),
|
||||
"tasks",
|
||||
Slugify.of(taskRun.getTaskId()),
|
||||
taskRun.getId()
|
||||
));
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
default URI outputPrefix(TriggerContext triggerContext, AbstractTrigger trigger, String triggerExecutionId) {
|
||||
try {
|
||||
return new URI("//" + fromParts(
|
||||
triggerContext.getNamespace().replace(".", "/"),
|
||||
Slugify.of(triggerContext.getFlowId()),
|
||||
"executions",
|
||||
triggerExecutionId,
|
||||
"trigger",
|
||||
Slugify.of(trigger.getId())
|
||||
));
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String fromParts(String... parts) {
|
||||
return "/" + Arrays.stream(parts)
|
||||
.filter(part -> part != null)
|
||||
.collect(Collectors.joining("/"));
|
||||
default URI from(Execution execution, String input, File file) throws IOException {
|
||||
URI uri = StorageContext.forInput(execution, input, file.getName()).getContextStorageURI();
|
||||
return this.put(execution.getTenantId(), uri, new BufferedInputStream(new FileInputStream(file)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,19 +234,8 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
||||
|
||||
public void preExecuteTasks(RunContext runContext, TaskRun taskRun) throws Exception {
|
||||
if (cache != null) {
|
||||
// first, check if we need to delete the file
|
||||
if (cache.ttl != null) {
|
||||
var maybeLastModifiedTime = runContext.getTaskCacheFileLastModifiedTime(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
|
||||
if (maybeLastModifiedTime.isPresent()) {
|
||||
if (Instant.now().isAfter(Instant.ofEpochMilli(maybeLastModifiedTime.get()).plus(cache.ttl))) {
|
||||
runContext.logger().debug("Cache is expired, deleting it");
|
||||
runContext.deleteTaskCacheFile(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// then download it and extract its content
|
||||
var maybeCacheFile = runContext.getTaskCacheFile(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
|
||||
// May download cached file if it exists and is not expired, and extract its content
|
||||
var maybeCacheFile = runContext.storage().getCacheFile(getId(), taskRun.getValue(), cache.ttl);
|
||||
if (maybeCacheFile.isPresent()) {
|
||||
runContext.logger().debug("Cache exist, downloading it");
|
||||
// download the cache if exist and unzip all entries
|
||||
@@ -331,7 +320,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
||||
archive.finish();
|
||||
File archiveFile = File.createTempFile("archive", ".zip");
|
||||
Files.write(archiveFile.toPath(), bos.toByteArray());
|
||||
URI uri = runContext.putTaskCacheFile(archiveFile, taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
|
||||
URI uri = runContext.storage().putCacheFile(archiveFile, getId(), taskRun.getValue());
|
||||
runContext.logger().debug("Caching in {}", uri);
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -70,7 +70,7 @@ public abstract class AbstractState extends Task {
|
||||
|
||||
Map<String, Object> merge = MapUtils.merge(current, runContext.render(map));
|
||||
|
||||
URI uri = runContext.putTaskStateFile(
|
||||
URI uri = runContext.storage().putTaskStateFile(
|
||||
JacksonMapper.ofJson(false).writeValueAsBytes(merge),
|
||||
"tasks-states",
|
||||
runContext.render(this.name),
|
||||
@@ -82,6 +82,6 @@ public abstract class AbstractState extends Task {
|
||||
}
|
||||
|
||||
protected boolean delete(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
|
||||
return runContext.deleteTaskStateFile("tasks-states", runContext.render(this.name), this.namespace, this.taskrunValue);
|
||||
return runContext.storage().deleteTaskStateFile("tasks-states", runContext.render(this.name), this.namespace, this.taskrunValue);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ public class PurgeExecution extends Task implements RunnableTask<PurgeExecution.
|
||||
@Override
|
||||
public PurgeExecution.Output run(RunContext runContext) throws Exception {
|
||||
return Output.builder()
|
||||
.uris(runContext.purgeStorageExecution())
|
||||
.uris(runContext.storage().deleteExecutionFiles())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -60,7 +60,8 @@ public abstract class AbstractExecutionServiceTest {
|
||||
.revision(1)
|
||||
.build();
|
||||
|
||||
Execution execution = Execution.builder()
|
||||
Execution execution = Execution
|
||||
.builder()
|
||||
.id(IdUtils.create())
|
||||
.state(state)
|
||||
.flowId(flow.getId())
|
||||
@@ -70,14 +71,24 @@ public abstract class AbstractExecutionServiceTest {
|
||||
|
||||
Return task = Return.builder().id(IdUtils.create()).type(Return.class.getName()).build();
|
||||
|
||||
TaskRun taskRun = TaskRun
|
||||
.builder()
|
||||
.namespace(flow.getNamespace())
|
||||
.id(IdUtils.create())
|
||||
.executionId(execution.getId())
|
||||
.flowId(flow.getId())
|
||||
.taskId(task.getId())
|
||||
.state(state)
|
||||
.build();
|
||||
|
||||
RunContext runContext = runContextFactory.of(
|
||||
flow,
|
||||
task,
|
||||
execution,
|
||||
TaskRun.builder().id(IdUtils.create()).taskId(task.getId()).state(state).build()
|
||||
taskRun
|
||||
);
|
||||
|
||||
execution.withInputs(Map.of("test", runContext.putTempFile(tempFile)));
|
||||
execution.withInputs(Map.of("test", runContext.storage().putFile(tempFile)));
|
||||
|
||||
executionRepository.save(execution);
|
||||
|
||||
|
||||
@@ -71,8 +71,9 @@ public class InputsTest extends AbstractMemoryRunnerTest {
|
||||
flow,
|
||||
Execution.builder()
|
||||
.id("test")
|
||||
.namespace("test")
|
||||
.namespace(flow.getNamespace())
|
||||
.flowRevision(1)
|
||||
.flowId(flow.getId())
|
||||
.build(),
|
||||
map
|
||||
);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.tasks.NamespaceFiles;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
@@ -170,7 +171,7 @@ class NamespaceFilesServiceTest {
|
||||
private void put(@Nullable String tenantId, String namespace, String path, String content) throws IOException {
|
||||
storageInterface.put(
|
||||
tenantId,
|
||||
URI.create(storageInterface.namespaceFilePrefix(namespace) + path),
|
||||
URI.create(StorageContext.namespaceFilePrefix(namespace) + path),
|
||||
new ByteArrayInputStream(content.getBytes())
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2,11 +2,11 @@ package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
@@ -31,8 +31,8 @@ class ReadFileFunctionTest {
|
||||
void readNamespaceFile() throws IllegalVariableEvaluationException, IOException {
|
||||
String namespace = "io.kestra.tests";
|
||||
String filePath = "file.txt";
|
||||
storageInterface.createDirectory(null, URI.create(storageInterface.namespaceFilePrefix(namespace)));
|
||||
storageInterface.put(null, URI.create(storageInterface.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
|
||||
storageInterface.createDirectory(null, URI.create(StorageContext.namespaceFilePrefix(namespace)));
|
||||
storageInterface.put(null, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
|
||||
|
||||
String render = variableRenderer.render("{{ render(read('" + filePath + "')) }}", Map.of("flow", Map.of("namespace", namespace)));
|
||||
assertThat(render, is("Hello from " + namespace));
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
package io.kestra.core.storage;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.models.triggers.types.Schedule;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.tasks.log.Log;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
@MicronautTest
|
||||
public class StorageInterfaceTest {
|
||||
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
@Test
|
||||
void executionPrefix() {
|
||||
var flow = Flow.builder().id("flow").namespace("namespace").build();
|
||||
var execution = Execution.builder().id("execution").namespace("namespace").flowId("flow").build();
|
||||
var taskRun = TaskRun.builder().id("taskrun").namespace("namespace").flowId("flow").executionId("execution").build();
|
||||
|
||||
var prefix = storageInterface.executionPrefix(flow, execution);
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix, is("/namespace/flow/executions/execution"));
|
||||
|
||||
prefix = storageInterface.executionPrefix(execution);
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix, is("/namespace/flow/executions/execution"));
|
||||
|
||||
prefix = storageInterface.executionPrefix(taskRun);
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix, is("/namespace/flow/executions/execution"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void cachePrefix() {
|
||||
var prefix = storageInterface.cachePrefix("namespace", "flow", "task", null);
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix, is("namespace/flow/task/cache"));
|
||||
|
||||
prefix = storageInterface.cachePrefix("namespace", "flow", "task", "value");
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix, startsWith("namespace/flow/task/cache/"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void statePrefix() {
|
||||
var prefix = storageInterface.statePrefix("namespace", "flow", "name", null);
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix, is("namespace/flow/states/name"));
|
||||
|
||||
prefix = storageInterface.statePrefix("namespace", "flow", "name", "value");
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix, startsWith("namespace/flow/states/name/"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void outputPrefix() {
|
||||
var flow = Flow.builder().id("flow").namespace("namespace").build();
|
||||
var prefix = storageInterface.outputPrefix(flow);
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix.toString(), is("///namespace/flow"));
|
||||
|
||||
var log = Log.builder().id("log").type(Log.class.getName()).message("Hello").build();
|
||||
var execution = Execution.builder().id("execution").build();
|
||||
var taskRun = TaskRun.builder().id("taskrun").namespace("namespace").flowId("flow").executionId("execution").taskId("taskid").build();
|
||||
prefix = storageInterface.outputPrefix(flow, log, execution, taskRun);
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix.toString(), is("///namespace/flow/executions/execution/tasks/taskid/taskrun"));
|
||||
|
||||
var triggerContext = TriggerContext.builder().namespace("namespace").flowId("flow").triggerId("trigger").build();
|
||||
var trigger = Schedule.builder().id("trigger").build();
|
||||
prefix = storageInterface.outputPrefix(triggerContext, trigger, "execution");
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix.toString(), is("///namespace/flow/executions/execution/trigger/trigger"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void namespaceFilePrefix() {
|
||||
var prefix = storageInterface.namespaceFilePrefix("io.namespace");
|
||||
assertThat(prefix, notNullValue());
|
||||
assertThat(prefix, is("/io/namespace/_files"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,121 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
class StorageContextTest {
|
||||
|
||||
@Test
|
||||
void shouldGetValidUriForFlowContext() {
|
||||
StorageContext context = StorageContext.forExecution(Execution
|
||||
.builder()
|
||||
.tenantId("tenantId")
|
||||
.id("executionid")
|
||||
.namespace("namespace")
|
||||
.flowId("flowid")
|
||||
.build()
|
||||
);
|
||||
assertThat(context.getFlowStorageURI(), is(URI.create("///namespace/flowid")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetValidUriForExecutionContext() {
|
||||
StorageContext context = StorageContext.forExecution(Execution
|
||||
.builder()
|
||||
.tenantId("tenantId")
|
||||
.id("executionid")
|
||||
.namespace("namespace")
|
||||
.flowId("flowid")
|
||||
.build()
|
||||
);
|
||||
assertThat(context.getExecutionStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
|
||||
assertThat(context.getContextStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetValidUriForExecutionContextWithScheme() {
|
||||
StorageContext context = StorageContext.forExecution(Execution
|
||||
.builder()
|
||||
.tenantId("tenantId")
|
||||
.id("executionid")
|
||||
.namespace("namespace")
|
||||
.flowId("flowid")
|
||||
.build()
|
||||
);
|
||||
assertThat(context.getExecutionStorageURI("kestra"), is(URI.create("kestra:///namespace/flowid/executions/executionid")));
|
||||
assertThat(context.getExecutionStorageURI("kestra://"), is(URI.create("kestra:///namespace/flowid/executions/executionid")));
|
||||
assertThat(context.getContextStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetValidURIForTaskContext() {
|
||||
StorageContext context = StorageContext.forTask(
|
||||
"???",
|
||||
"namespace",
|
||||
"flowid",
|
||||
"executionid",
|
||||
"taskid",
|
||||
"taskrun",
|
||||
null
|
||||
);
|
||||
|
||||
assertThat(context.getExecutionStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
|
||||
assertThat(context.getContextStorageURI(), is(URI.create("///namespace/flowid/executions/executionid/tasks/taskid/taskrun")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetValidStatePrefixForTaskContext() {
|
||||
StorageContext context = StorageContext.forFlow(Flow.builder().namespace("namespace").id("flowid").build());
|
||||
assertThat(context.getStateStorePrefix("name", false, null), is("/namespace/flowid/states/name"));
|
||||
assertThat(context.getStateStorePrefix("name", false, "aaa"), startsWith("/namespace/flowid/states/name/"));
|
||||
assertThat(context.getStateStorePrefix("name", true, null), is("/namespace/states/name"));
|
||||
assertThat(context.getStateStorePrefix("name", true, "aaa"), startsWith("/namespace/states/name/"));
|
||||
assertThat(context.getStateStorePrefix(null, true, null), is("/namespace/states"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetValidURIForTriggerContext() {
|
||||
StorageContext context = StorageContext.forTrigger(
|
||||
"???",
|
||||
"namespace",
|
||||
"flowid",
|
||||
"executionid",
|
||||
"triggerid"
|
||||
);
|
||||
|
||||
assertThat(context.getExecutionStorageURI(), is(URI.create("///namespace/flowid/executions/executionid")));
|
||||
assertThat(context.getContextStorageURI(), is(URI.create("///namespace/flowid/executions/executionid/trigger/triggerid")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetNamespaceFilePrefix() {
|
||||
assertThat(StorageContext.namespaceFilePrefix("io.namespace"), is("/io/namespace/_files"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetTaskCachePrefix() {
|
||||
assertThat(StorageContext.forFlow(Flow
|
||||
.builder()
|
||||
.tenantId(null)
|
||||
.namespace("namespace")
|
||||
.id("flowid")
|
||||
.build()
|
||||
).getCacheURI("taskid", null), is(URI.create("/namespace/flowid/taskid/cache/cache.zip")));
|
||||
|
||||
assertThat(StorageContext.forFlow(Flow
|
||||
.builder()
|
||||
.tenantId(null)
|
||||
.namespace("namespace")
|
||||
.id("flowid")
|
||||
.build()
|
||||
).getCacheURI("taskid", "value").toString(), startsWith("/namespace/flowid/taskid/cache/"));
|
||||
}
|
||||
}
|
||||
@@ -3,9 +3,12 @@ package io.kestra.core.tasks.flows;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.AbstractMemoryRunnerTest;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.storages.InternalStorage;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
@@ -21,6 +24,7 @@ import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
||||
@@ -98,8 +102,21 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
||||
@SuppressWarnings("unchecked")
|
||||
public void cache(RunnerUtils runnerUtils) throws TimeoutException, IOException {
|
||||
// make sure the cache didn't exist
|
||||
URI cache = URI.create(storageInterface.cachePrefix("io.kestra.tests", "working-directory-cache", "workingDir", null) + "/cache.zip");
|
||||
storageInterface.delete(null, cache);
|
||||
StorageContext storageContext = StorageContext.forFlow(Flow
|
||||
.builder()
|
||||
.namespace("io.kestra.tests")
|
||||
.id("working-directory-cache")
|
||||
.build()
|
||||
);
|
||||
InternalStorage storage = new InternalStorage(
|
||||
null,
|
||||
storageContext
|
||||
, storageInterface
|
||||
);
|
||||
storage.deleteCacheFile("workingDir", null);
|
||||
|
||||
URI cacheURI = storageContext.getCacheURI("workingdir", null);
|
||||
assertFalse(storageInterface.exists(null, cacheURI));
|
||||
|
||||
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "working-directory-cache");
|
||||
|
||||
@@ -111,7 +128,7 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
||||
nullValue()
|
||||
);
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
|
||||
assertTrue(storageInterface.exists(null, cache));
|
||||
assertTrue(storageInterface.exists(null, cacheURI));
|
||||
|
||||
// a second run should use the cache so the task `exists` should output the cached file
|
||||
execution = runnerUtils.runOne(null, "io.kestra.tests", "working-directory-cache");
|
||||
@@ -163,7 +180,7 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
||||
private void put(String path, String content) throws IOException {
|
||||
storageInterface.put(
|
||||
null,
|
||||
URI.create(storageInterface.namespaceFilePrefix("io.kestra.tests") + path),
|
||||
URI.create(StorageContext.namespaceFilePrefix("io.kestra.tests") + path),
|
||||
new ByteArrayInputStream(content.getBytes())
|
||||
);
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.services.ConditionService;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.Await;
|
||||
@@ -44,7 +45,6 @@ import io.micronaut.core.convert.format.Format;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.http.*;
|
||||
import io.micronaut.http.annotation.*;
|
||||
import io.micronaut.http.exceptions.HttpStatusException;
|
||||
import io.micronaut.http.multipart.StreamingFileUpload;
|
||||
import io.micronaut.http.server.types.files.StreamedFile;
|
||||
import io.micronaut.http.sse.Event;
|
||||
@@ -543,24 +543,28 @@ public class ExecutionController {
|
||||
throw new NoSuchElementException("Unable to find flow id '" + executionId + "'");
|
||||
}
|
||||
|
||||
String prefix = storageInterface.executionPrefix(flow.get(), execution.get());
|
||||
String prefix = StorageContext
|
||||
.forExecution(execution.get())
|
||||
.getExecutionStorageURI().getPath();
|
||||
|
||||
if (path.getPath().startsWith(prefix)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// maybe state
|
||||
prefix = storageInterface.statePrefix(flow.get().getNamespace(), flow.get().getId(), null, null);
|
||||
StorageContext context = StorageContext.forFlow(flow.get());
|
||||
prefix = context.getStateStorePrefix(null, false, null);
|
||||
if (path.getPath().startsWith(prefix)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
prefix = storageInterface.statePrefix(flow.get().getNamespace(), null, null, null);
|
||||
prefix = context.getStateStorePrefix(null, true, null);
|
||||
if (path.getPath().startsWith(prefix)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// maybe redirect to correct execution
|
||||
Optional<String> redirectedExecution = storageInterface.extractExecutionId(path);
|
||||
Optional<String> redirectedExecution = StorageContext.extractExecutionId(path);
|
||||
|
||||
if (redirectedExecution.isPresent()) {
|
||||
return HttpResponse.redirect(URI.create((basePath != null ? basePath : "") +
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package io.kestra.webserver.controllers;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.serializers.YamlFlowParser;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.ImmutableFileAttributes;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.Rethrow;
|
||||
@@ -276,7 +276,7 @@ public class NamespaceFileController {
|
||||
}
|
||||
|
||||
private URI toNamespacedStorageUri(String namespace, @Nullable URI relativePath) {
|
||||
return URI.create("kestra://" + storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse("/"));
|
||||
return URI.create("kestra://" + StorageContext.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse("/"));
|
||||
}
|
||||
|
||||
private void ensureWritableNamespaceFile(URI path) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.webserver.controllers.h2.JdbcH2ControllerTest;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
@@ -310,7 +311,7 @@ class NamespaceFileControllerTest extends JdbcH2ControllerTest {
|
||||
}
|
||||
|
||||
private URI toNamespacedStorageUri(String namespace, @Nullable URI relativePath) {
|
||||
return URI.create(storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse(""));
|
||||
return URI.create(StorageContext.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse(""));
|
||||
}
|
||||
|
||||
@Getter
|
||||
|
||||
Reference in New Issue
Block a user