feat(core): #5467 add inheritance for KV in pebble file functions

This commit is contained in:
nKwiatkowski
2025-03-25 10:32:07 +01:00
parent 8f9b2fc0db
commit 688859e831
5 changed files with 128 additions and 5 deletions

View File

@@ -14,7 +14,6 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
abstract class AbstractFileFunction implements Function { abstract class AbstractFileFunction implements Function {
@@ -66,8 +65,23 @@ abstract class AbstractFileFunction implements Function {
fileUri = URI.create(str); fileUri = URI.create(str);
namespace = checkAllowedFileAndReturnNamespace(context, fileUri); namespace = checkAllowedFileAndReturnNamespace(context, fileUri);
} else { } else {
namespace = (String) Optional.ofNullable(args.get(NAMESPACE)).orElse(flow.get(NAMESPACE)); if (args.get(NAMESPACE) != null){
fileUri = URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + str); namespace = (String) args.get(NAMESPACE);
fileUri = URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + str);
} else {
namespace = flow.get(NAMESPACE);
fileUri = URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + str);
String inheritedNamespace = namespace;
URI inheritedFilePath = fileUri;
while (!storageInterface.exists(tenantId, inheritedNamespace, inheritedFilePath) && inheritedNamespace.contains(".")){
inheritedNamespace = inheritedNamespace.substring(0, inheritedNamespace.lastIndexOf('.'));
inheritedFilePath = URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace) + "/" + str);
}
if (storageInterface.exists(tenantId, inheritedNamespace, inheritedFilePath)){
namespace = inheritedNamespace;
fileUri = inheritedFilePath;
}
}
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE)); flowService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE));
} }
} else { } else {

View File

@@ -66,6 +66,35 @@ class FileExistsFunctionTest {
assertTrue(render); assertTrue(render);
} }
@Test
void findNamespaceFileWhitInheritance() throws IllegalVariableEvaluationException, IOException {
String namespace = "my.parent.namespace";
String inheritedNamespace = "my.parent";
String firstLevelNamespace = "my";
String filePath = "file.txt";
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from inherited namespace".getBytes()));
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(firstLevelNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(firstLevelNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from first level".getBytes()));
boolean render = Boolean.parseBoolean(variableRenderer.render("{{ fileExists('" + filePath + "') }}", Map.of("flow", Map.of("namespace", namespace))));
assertTrue(render);
}
@Test
void shouldNotFindNamespaceFileWhitInheritanceWhenNamespaceSpecified()
throws IOException, IllegalVariableEvaluationException {
String namespace = "my.specified.namespace";
String inheritedNamespace = "my.specified";
String filePath = "file.txt";
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from inherited specified namespace".getBytes()));
boolean render = Boolean.parseBoolean(variableRenderer.render("{{ fileExists('" + filePath + "', namespace='" + namespace + "') }}", Map.of("flow", Map.of("namespace", "my.current.namespace"))));
assertFalse(render);
}
@Test @Test
void shouldReturnFalseForNonExistentFile() throws IllegalVariableEvaluationException { void shouldReturnFalseForNonExistentFile() throws IllegalVariableEvaluationException {
String executionId = IdUtils.create(); String executionId = IdUtils.create();

View File

@@ -52,7 +52,7 @@ public class FileSizeFunctionTest {
} }
@Test @Test
void readNamespaceFileWithNamespace() throws IllegalVariableEvaluationException, IOException { void readSizeNamespaceFileWithNamespace() throws IllegalVariableEvaluationException, IOException {
String namespace = "io.kestra.tests"; String namespace = "io.kestra.tests";
String filePath = "file.txt"; String filePath = "file.txt";
storageInterface.createDirectory(null, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace))); storageInterface.createDirectory(null, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
@@ -62,6 +62,33 @@ public class FileSizeFunctionTest {
assertThat(render, is(FILE_SIZE)); assertThat(render, is(FILE_SIZE));
} }
@Test
void readNamespaceFileWhitInheritance() throws IllegalVariableEvaluationException, IOException {
String namespace = "my.parent.namespace";
String inheritedNamespace = "my.parent";
String firstLevelNamespace = "my";
String filePath = "file.txt";
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from inherited namespace".getBytes()));
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(firstLevelNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(firstLevelNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from first level".getBytes()));
String render = variableRenderer.render("{{ fileSize('" + filePath + "') }}", Map.of("flow", Map.of("namespace", namespace)));
assertThat(render, is(String.valueOf("Hello from inherited namespace".getBytes().length)));
}
@Test
void shouldNotReadSizeNamespaceFileWhitInheritanceWhenNamespaceSpecified() throws IOException {
String namespace = "my.specified.namespace";
String inheritedNamespace = "my.specified";
String filePath = "file.txt";
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from inherited specified namespace".getBytes()));
assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ fileSize('" + filePath + "', namespace='" + namespace + "') }}", Map.of("flow", Map.of("namespace", "my.current.namespace"))));
}
@Test @Test
void returnsCorrectSize_givenStringUri_andParentExecution() throws IOException, IllegalVariableEvaluationException { void returnsCorrectSize_givenStringUri_andParentExecution() throws IOException, IllegalVariableEvaluationException {
String executionId = IdUtils.create(); String executionId = IdUtils.create();

View File

@@ -66,6 +66,33 @@ class IsFileEmptyFunctionTest {
assertFalse(render); assertFalse(render);
} }
@Test
void findFileNamespaceFileWhitInheritance() throws IllegalVariableEvaluationException, IOException {
String namespace = "my.parent.namespace";
String inheritedNamespace = "my.parent";
String firstLevelNamespace = "my";
String filePath = "file.txt";
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from inherited namespace".getBytes()));
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(firstLevelNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(firstLevelNamespace) + "/" + filePath), new ByteArrayInputStream("".getBytes()));
boolean render = Boolean.parseBoolean(variableRenderer.render("{{ isFileEmpty('" + filePath + "') }}", Map.of("flow", Map.of("namespace", namespace))));
assertFalse(render);
}
@Test
void shouldNotFindFileNamespaceFileWhitInheritanceWhenNamespaceSpecified() throws IOException {
String namespace = "my.specified.namespace";
String inheritedNamespace = "my.specified";
String filePath = "file.txt";
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from inherited specified namespace".getBytes()));
assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ isFileEmpty('" + filePath + "', namespace='" + namespace + "') }}", Map.of("flow", Map.of("namespace", "my.current.namespace"))));
}
@Test @Test
void shouldReturnTrueForEmpty() throws IOException, IllegalVariableEvaluationException { void shouldReturnTrueForEmpty() throws IOException, IllegalVariableEvaluationException {
String executionId = IdUtils.create(); String executionId = IdUtils.create();

View File

@@ -7,7 +7,6 @@ import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.micronaut.context.annotation.Property; import io.micronaut.context.annotation.Property;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.pebbletemplates.pebble.error.PebbleException;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -43,6 +42,33 @@ class ReadFileFunctionTest {
assertThat(render, is("Hello from " + namespace)); assertThat(render, is("Hello from " + namespace));
} }
@Test
void readNamespaceFileWhitInheritance() throws IllegalVariableEvaluationException, IOException {
String namespace = "my.parent.namespace";
String inheritedNamespace = "my.parent";
String firstLevelNamespace = "my";
String filePath = "file.txt";
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from inherited namespace".getBytes()));
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(firstLevelNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(firstLevelNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from first level".getBytes()));
String render = variableRenderer.render("{{ read('" + filePath + "') }}", Map.of("flow", Map.of("namespace", namespace)));
assertThat(render, is("Hello from inherited namespace"));
}
@Test
void shouldNotReadNamespaceFileWhitInheritanceWhenNamespaceSpecified() throws IOException {
String namespace = "my.specified.namespace";
String inheritedNamespace = "my.specified";
String filePath = "file.txt";
storageInterface.createDirectory(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace)));
storageInterface.put(null, inheritedNamespace, URI.create(StorageContext.namespaceFilePrefix(inheritedNamespace) + "/" + filePath), new ByteArrayInputStream("Hello from inherited specified namespace".getBytes()));
assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ read('" + filePath + "', namespace='" + namespace + "') }}", Map.of("flow", Map.of("namespace", "my.current.namespace"))));
}
@Test @Test
void readNamespaceFileWithNamespace() throws IllegalVariableEvaluationException, IOException { void readNamespaceFileWithNamespace() throws IllegalVariableEvaluationException, IOException {
String namespace = "io.kestra.tests"; String namespace = "io.kestra.tests";