Compare commits

..

3 Commits

Author SHA1 Message Date
AJ Emerich
d69fe49961 Merge branch 'develop' into docs/return-example 2025-11-17 04:52:32 -06:00
AJ Emerich
f33a6e5f16 Merge branch 'develop' into docs/return-example 2025-11-17 04:05:30 -06:00
AJ Emerich
840ef10b54 docs(return): fix example 2025-11-14 12:51:18 +01:00
563 changed files with 9772 additions and 18879 deletions

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
name: Checkout
with:
fetch-depth: 0

View File

@@ -27,7 +27,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v6
uses: actions/checkout@v5
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.

View File

@@ -33,7 +33,7 @@ jobs:
exit 1;
fi
# Checkout
- uses: actions/checkout@v6
- uses: actions/checkout@v5
with:
fetch-depth: 0
path: kestra

View File

@@ -39,7 +39,7 @@ jobs:
# Checkout
- name: Checkout
uses: actions/checkout@v6
uses: actions/checkout@v5
with:
fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -28,7 +28,7 @@ jobs:
steps:
# Targeting develop branch from develop
- name: Trigger EE Workflow (develop push, no payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' }}
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -16,7 +16,7 @@ jobs:
- name: Check EE repo for branch with same name
if: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false }}
id: check-ee-branch
uses: actions/github-script@v8
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GH_PERSONAL_TOKEN }}
script: |
@@ -40,7 +40,7 @@ jobs:
# Targeting pull request (only if not from a fork and EE has no branch with same name)
- name: Trigger EE Workflow (pull request, with payload)
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697
uses: peter-evans/repository-dispatch@5fc4efd1a4797ddb68ffd0714a238564e4cc0e6f
if: ${{ github.event_name == 'pull_request'
&& github.event.pull_request.number != ''
&& github.event.pull_request.head.repo.fork == false
@@ -50,7 +50,7 @@ jobs:
repository: kestra-io/kestra-ee
event-type: "oss-updated"
client-payload: >-
{"commit_sha":"${{ github.event.pull_request.head.sha }}","pr_repo":"${{ github.repository }}"}
{"commit_sha":"${{ github.sha }}","pr_repo":"${{ github.repository }}"}
file-changes:
if: ${{ github.event.pull_request.draft == false }}

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v6
- uses: actions/checkout@v5
with:
fetch-depth: 0
@@ -58,7 +58,7 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v6
- uses: actions/checkout@v5
with:
fetch-depth: 0
@@ -95,7 +95,7 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v6
- uses: actions/checkout@v5
with:
fetch-depth: 0

View File

@@ -7,7 +7,7 @@ buildscript {
}
dependencies {
classpath "net.e175.klaus:zip-prefixer:0.4.0"
classpath "net.e175.klaus:zip-prefixer:0.3.1"
}
}
@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.1.0.6387"
id "org.sonarqube" version "7.0.1.6134"
id 'jacoco-report-aggregation'
// helper
@@ -32,7 +32,7 @@ plugins {
// release
id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.4"
id "com.gorylenko.gradle-git-properties" version "2.5.3"
id 'signing'
id "com.vanniktech.maven.publish" version "0.35.0"

View File

@@ -8,10 +8,11 @@ import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.cli.services.EnvironmentProvider;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.ApplicationContextBuilder;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.Introspected;
import org.slf4j.bridge.SLF4JBridgeHandler;
import picocli.CommandLine;
@@ -19,9 +20,11 @@ import picocli.CommandLine;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.stream.Stream;
@CommandLine.Command(
name = "kestra",
@@ -46,77 +49,35 @@ import java.util.stream.Stream;
@Introspected
public class App implements Callable<Integer> {
public static void main(String[] args) {
System.exit(runCli(args));
}
public static int runCli(String[] args, String... extraEnvironments) {
return runCli(App.class, args, extraEnvironments);
}
public static int runCli(Class<?> cls, String[] args, String... extraEnvironments) {
ServiceLoader<EnvironmentProvider> environmentProviders = ServiceLoader.load(EnvironmentProvider.class);
String[] baseEnvironments = environmentProviders.findFirst().map(EnvironmentProvider::getCliEnvironments).orElseGet(() -> new String[0]);
return execute(
cls,
Stream.concat(
Arrays.stream(baseEnvironments),
Arrays.stream(extraEnvironments)
).toArray(String[]::new),
args
);
execute(App.class, new String [] { Environment.CLI }, args);
}
@Override
public Integer call() throws Exception {
return runCli(new String[0]);
return PicocliRunner.call(App.class, "--help");
}
protected static int execute(Class<?> cls, String[] environments, String... args) {
protected static void execute(Class<?> cls, String[] environments, String... args) {
// Log Bridge
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
// Init ApplicationContext
CommandLine commandLine = getCommandLine(cls, args);
ApplicationContext applicationContext = App.applicationContext(cls, commandLine, environments);
Class<?> targetCommand = commandLine.getCommandSpec().userObject().getClass();
if (!AbstractCommand.class.isAssignableFrom(targetCommand) && args.length == 0) {
// if no command provided, show help
args = new String[]{"--help"};
}
ApplicationContext applicationContext = App.applicationContext(cls, environments, args);
// Call Picocli command
int exitCode;
int exitCode = 0;
try {
exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
} catch (CommandLine.InitializationException e){
System.err.println("Could not initialize picocli CommandLine, err: " + e.getMessage());
System.err.println("Could not initialize picoli ComandLine, err: " + e.getMessage());
e.printStackTrace();
exitCode = 1;
}
applicationContext.close();
// exit code
return exitCode;
}
private static CommandLine getCommandLine(Class<?> cls, String[] args) {
CommandLine cmd = new CommandLine(cls, CommandLine.defaultFactory());
continueOnParsingErrors(cmd);
CommandLine.ParseResult parseResult = cmd.parseArgs(args);
List<CommandLine> parsedCommands = parseResult.asCommandLineList();
return parsedCommands.getLast();
}
public static ApplicationContext applicationContext(Class<?> mainClass,
String[] environments,
String... args) {
return App.applicationContext(mainClass, getCommandLine(mainClass, args), environments);
System.exit(Objects.requireNonNullElse(exitCode, 0));
}
@@ -124,17 +85,25 @@ public class App implements Callable<Integer> {
* Create an {@link ApplicationContext} with additional properties based on configuration files (--config) and
* forced Properties from current command.
*
* @param args args passed to java app
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
CommandLine commandLine,
String[] environments) {
String[] environments,
String[] args) {
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(environments);
CommandLine cmd = new CommandLine(mainClass, CommandLine.defaultFactory());
continueOnParsingErrors(cmd);
CommandLine.ParseResult parseResult = cmd.parseArgs(args);
List<CommandLine> parsedCommands = parseResult.asCommandLineList();
CommandLine commandLine = parsedCommands.getLast();
Class<?> cls = commandLine.getCommandSpec().userObject().getClass();
if (AbstractCommand.class.isAssignableFrom(cls)) {

View File

@@ -1,5 +1,6 @@
package io.kestra.cli.commands.configs.sys;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
@@ -19,6 +20,8 @@ public class ConfigCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"configs", "--help"});
PicocliRunner.call(App.class, "configs", "--help");
return 0;
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -28,6 +29,8 @@ public class FlowCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"flow", "--help"});
PicocliRunner.call(App.class, "flow", "--help");
return 0;
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -21,6 +22,8 @@ public class FlowNamespaceCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"flow", "namespace", "--help"});
PicocliRunner.call(App.class, "flow", "namespace", "--help");
return 0;
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.migrations.metadata.MetadataMigrationCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -23,6 +24,8 @@ public class MigrationCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"migrate", "--help"});
PicocliRunner.call(App.class, "migrate", "--help");
return 0;
}
}

View File

@@ -10,8 +10,7 @@ import picocli.CommandLine;
description = "populate metadata for entities",
subcommands = {
KvMetadataMigrationCommand.class,
SecretsMetadataMigrationCommand.class,
NsFilesMetadataMigrationCommand.class
SecretsMetadataMigrationCommand.class
}
)
@Slf4j

View File

@@ -1,10 +1,8 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
@@ -17,13 +15,11 @@ import jakarta.inject.Singleton;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@@ -40,9 +36,6 @@ public class MetadataMigrationService {
@Inject
private KvMetadataRepositoryInterface kvMetadataRepository;
@Inject
private NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
@Inject
private StorageInterface storageInterface;
@@ -56,9 +49,7 @@ public class MetadataMigrationService {
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
List<FileAttributes> list = listAllFromStorage(storageInterface, StorageContext::kvPrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue()).stream()
.map(PathAndAttributes::attributes)
.toList();
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
@@ -84,35 +75,15 @@ public class MetadataMigrationService {
}));
}
public void nsFilesMigration() throws IOException {
this.namespacesPerTenant().entrySet().stream()
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
List<PathAndAttributes> list = listAllFromStorage(storageInterface, StorageContext::namespaceFilePrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue());
return list.stream()
.map(pathAndAttributes -> NamespaceFileMetadata.of(namespaceForTenant.getKey(), namespaceForTenant.getValue(), pathAndAttributes.path(), pathAndAttributes.attributes()));
}))
.forEach(throwConsumer(nsFileMetadata -> {
if (namespaceFileMetadataRepository.findByPath(nsFileMetadata.getTenantId(), nsFileMetadata.getNamespace(), nsFileMetadata.getPath()).isEmpty()) {
namespaceFileMetadataRepository.save(nsFileMetadata);
}
}));
}
public void secretMigration() throws Exception {
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
}
private static List<PathAndAttributes> listAllFromStorage(StorageInterface storage, Function<String, String> prefixFunction, String tenant, String namespace) throws IOException {
private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
try {
String prefix = prefixFunction.apply(namespace);
return storage.allByPrefix(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix + "/"), true).stream()
.map(throwFunction(uri -> new PathAndAttributes(uri.getPath().substring(prefix.length()), storage.getAttributes(tenant, namespace, uri))))
.toList();
return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
} catch (FileNotFoundException e) {
return Collections.emptyList();
}
}
public record PathAndAttributes(String path, FileAttributes attributes) {}
}

View File

@@ -1,31 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "nsfiles",
description = "populate metadata for Namespace Files"
)
@Slf4j
public class NsFilesMetadataMigrationCommand extends AbstractCommand {
@Inject
private Provider<MetadataMigrationService> metadataMigrationServiceProvider;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationServiceProvider.get().nsFilesMigration();
} catch (Exception e) {
System.err.println("❌ Namespace Files Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ Namespace Files Metadata migration complete.");
return 0;
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.namespaces.files.NamespaceFilesCommand;
import io.kestra.cli.commands.namespaces.kv.KvCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -24,6 +25,8 @@ public class NamespaceCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"namespace", "--help"});
PicocliRunner.call(App.class, "namespace", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.namespaces.files;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -21,6 +22,8 @@ public class NamespaceFilesCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"namespace", "files", "--help"});
PicocliRunner.call(App.class, "namespace", "files", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.namespaces.kv;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -21,6 +22,8 @@ public class KvCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"namespace", "kv", "--help"});
PicocliRunner.call(App.class, "namespace", "kv", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.plugins;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine.Command;
@@ -24,7 +25,9 @@ public class PluginCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"plugins", "--help"});
PicocliRunner.call(App.class, "plugins", "--help");
return 0;
}
@Override

View File

@@ -1,5 +1,6 @@
package io.kestra.cli.commands.servers;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -27,6 +28,8 @@ public class ServerCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"server", "--help"});
PicocliRunner.call(App.class, "server", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.sys;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
@@ -24,6 +25,8 @@ public class SysCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"sys", "--help"});
PicocliRunner.call(App.class, "sys", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.sys.database;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine;
@@ -19,6 +20,8 @@ public class DatabaseCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"sys", "database", "--help"});
PicocliRunner.call(App.class, "sys", "database", "--help");
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine;
@@ -19,6 +20,8 @@ public class StateStoreCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"sys", "state-store", "--help"});
PicocliRunner.call(App.class, "sys", "state-store", "--help");
return 0;
}
}

View File

@@ -4,6 +4,7 @@ import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -26,6 +27,8 @@ public class TemplateCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"template", "--help"});
PicocliRunner.call(App.class, "template", "--help");
return 0;
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -23,6 +24,8 @@ public class TemplateNamespaceCommand extends AbstractCommand {
public Integer call() throws Exception {
super.call();
return App.runCli(new String[]{"template", "namespace", "--help"});
PicocliRunner.call(App.class, "template", "namespace", "--help");
return 0;
}
}

View File

@@ -1,16 +0,0 @@
package io.kestra.cli.services;
import io.micronaut.context.env.Environment;
import java.util.Arrays;
import java.util.stream.Stream;
public class DefaultEnvironmentProvider implements EnvironmentProvider {
@Override
public String[] getCliEnvironments(String... extraEnvironments) {
return Stream.concat(
Stream.of(Environment.CLI),
Arrays.stream(extraEnvironments)
).toArray(String[]::new);
}
}

View File

@@ -1,5 +0,0 @@
package io.kestra.cli.services;
public interface EnvironmentProvider {
String[] getCliEnvironments(String... extraEnvironments);
}

View File

@@ -1 +0,0 @@
io.kestra.cli.services.DefaultEnvironmentProvider

View File

@@ -1,11 +1,14 @@
package io.kestra.cli;
import io.kestra.core.models.ServerType;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import picocli.CommandLine;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
@@ -19,15 +22,11 @@ class AppTest {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
// No arg will print help
assertThat(App.runCli(new String[0])).isZero();
assertThat(out.toString()).contains("kestra");
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
PicocliRunner.call(App.class, ctx, "--help");
out.reset();
// Explicit help command
assertThat(App.runCli(new String[]{"--help"})).isZero();
assertThat(out.toString()).contains("kestra");
assertThat(out.toString()).contains("kestra");
}
}
@ParameterizedTest
@@ -39,12 +38,11 @@ class AppTest {
final String[] args = new String[]{"server", serverType, "--help"};
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, args)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
assertThat(out.toString()).startsWith("Usage: kestra server " + serverType);
}
assertThat(App.runCli(args)).isZero();
assertThat(out.toString()).startsWith("Usage: kestra server " + serverType);
}
@Test
@@ -54,10 +52,12 @@ class AppTest {
final String[] argsWithMissingParams = new String[]{"flow", "namespace", "update"};
assertThat(App.runCli(argsWithMissingParams)).isEqualTo(2);
try (ApplicationContext ctx = App.applicationContext(App.class, new String [] { Environment.CLI }, argsWithMissingParams)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
assertThat(out.toString()).startsWith("Missing required parameters: ");
assertThat(out.toString()).contains("Usage: kestra flow namespace update ");
assertThat(out.toString()).doesNotContain("MissingParameterException: ");
assertThat(out.toString()).startsWith("Missing required parameters: ");
assertThat(out.toString()).contains("Usage: kestra flow namespace update ");
assertThat(out.toString()).doesNotContain("MissingParameterException: ");
}
}
}

View File

@@ -1,145 +0,0 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.*;
import io.kestra.core.storages.kv.*;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import org.junit.jupiter.api.Test;
import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class NsFilesMetadataMigrationCommandTest {
@Test
void run() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
/* Initial setup:
* - namespace 1: my/path, value
* - namespace 1: another/path
* - namespace 2: yet/another/path
* - Nothing in database */
String namespace = TestsUtils.randomNamespace();
String path = "/my/path";
StorageInterface storage = ctx.getBean(StorageInterface.class);
String value = "someValue";
putOldNsFile(storage, namespace, path, value);
String anotherPath = "/another/path";
String anotherValue = "anotherValue";
putOldNsFile(storage, namespace, anotherPath, anotherValue);
String anotherNamespace = TestsUtils.randomNamespace();
String yetAnotherPath = "/yet/another/path";
String yetAnotherValue = "yetAnotherValue";
putOldNsFile(storage, anotherNamespace, yetAnotherPath, yetAnotherValue);
NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository = ctx.getBean(NamespaceFileMetadataRepositoryInterface.class);
String tenantId = TenantService.MAIN_TENANT;
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
/* Expected outcome from the migration command:
* - no namespace files has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
String[] nsFilesMetadataMigrationCommand = {
"migrate", "metadata", "nsfiles"
};
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
// Still it's not in the metadata repository because no flow exist to find that namespace file
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath).isPresent()).isFalse();
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
/* We run the migration again:
* - namespace 1 my/path file is seen and metadata is migrated to database
* - namespace 1 another/path file is seen and metadata is migrated to database
* - namespace 2 yet/another/path is not seen because no flow exist in this namespace */
out.reset();
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
Optional<NamespaceFileMetadata> foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
assertThat(foundNsFile.isPresent()).isTrue();
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
assertThat(foundNsFile.get().getSize()).isEqualTo(value.length());
Optional<NamespaceFileMetadata> anotherFoundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath);
assertThat(anotherFoundNsFile.isPresent()).isTrue();
assertThat(anotherFoundNsFile.get().getVersion()).isEqualTo(1);
assertThat(anotherFoundNsFile.get().getSize()).isEqualTo(anotherValue.length());
NamespaceFactory namespaceFactory = ctx.getBean(NamespaceFactory.class);
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storage);
FileAttributes nsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(path));
assertThat(nsFileRawMetadata.getSize()).isEqualTo(value.length());
assertThat(new String(namespaceStorage.getFileContent(Path.of(path)).readAllBytes())).isEqualTo(value);
FileAttributes anotherNsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(anotherPath));
assertThat(anotherNsFileRawMetadata.getSize()).isEqualTo(anotherValue.length());
assertThat(new String(namespaceStorage.getFileContent(Path.of(anotherPath)).readAllBytes())).isEqualTo(anotherValue);
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
assertThatThrownBy(() -> namespaceStorage.getFileMetadata(Path.of(yetAnotherPath))).isInstanceOf(FileNotFoundException.class);
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
out.reset();
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
}
}
private static void putOldNsFile(StorageInterface storage, String namespace, String path, String value) throws IOException {
URI nsFileStorageUri = getNsFileStorageUri(namespace, path);
storage.put(TenantService.MAIN_TENANT, namespace, nsFileStorageUri, new StorageObject(
null,
new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8))
));
}
private static @NonNull URI getNsFileStorageUri(String namespace, String path) {
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + path);
}
}

View File

@@ -19,6 +19,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junitpioneer.jupiter.RetryingTest;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@@ -58,7 +59,7 @@ class FileChangedEventListenerTest {
}
@FlakyTest
@Test
@RetryingTest(2)
void test() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
// remove the flow if it already exists
@@ -97,7 +98,7 @@ class FileChangedEventListenerTest {
}
@FlakyTest
@Test
@RetryingTest(2)
void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
// remove the flow if it already exists
@@ -137,4 +138,4 @@ class FileChangedEventListenerTest {
Duration.ofSeconds(10)
);
}
}
}

View File

@@ -21,7 +21,6 @@ kestra:
server:
liveness:
enabled: false
termination-grace-period: 5s
micronaut:
http:
services:

View File

@@ -1,23 +0,0 @@
package io.kestra.core.exceptions;
/**
* Exception that can be thrown when a Flow is not found.
*/
public class FlowNotFoundException extends NotFoundException {
/**
* Creates a new {@link FlowNotFoundException} instance.
*/
public FlowNotFoundException() {
super();
}
/**
* Creates a new {@link NotFoundException} instance.
*
* @param message the error message.
*/
public FlowNotFoundException(final String message) {
super(message);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.core.exceptions;
import java.io.Serial;
public class ResourceAccessDeniedException extends KestraRuntimeException {
@Serial
private static final long serialVersionUID = 1L;
public ResourceAccessDeniedException() {
}
public ResourceAccessDeniedException(String message) {
super(message);
}
}

View File

@@ -180,24 +180,6 @@ public record QueryFilter(
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
},
PATH("path") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN);
}
},
PARENT_PATH("parentPath") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.STARTS_WITH);
}
},
VERSION("version") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
}
};
private static final Map<String, Field> BY_VALUE = Arrays.stream(values())
@@ -293,19 +275,6 @@ public record QueryFilter(
Field.UPDATED
);
}
},
NAMESPACE_FILE_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE,
Field.PATH,
Field.PARENT_PATH,
Field.VERSION,
Field.UPDATED
);
}
};
public abstract List<Field> supportedField();

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.conditions;
import io.kestra.core.models.flows.FlowInterface;
import lombok.*;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.runners.RunContext;

View File

@@ -658,20 +658,18 @@ public class Execution implements DeletedInterface, TenantInterface {
public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
return this.findTaskRunByTasks(resolvedTasks, parentTaskRun)
.stream()
// NOTE: we check on isFailed first to avoid the costly shouldBeRetried() method
.anyMatch(taskRun -> taskRun.getState().isFailed() && shouldNotBeRetried(resolvedTasks, parentTaskRun, taskRun));
}
private static boolean shouldNotBeRetried(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun, TaskRun taskRun) {
ResolvedTask resolvedTask = resolvedTasks.stream()
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
.orElse(null);
if (resolvedTask == null) {
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry());
.anyMatch(taskRun -> {
ResolvedTask resolvedTask = resolvedTasks.stream()
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
.orElse(null);
if (resolvedTask == null) {
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
taskRun.getId(), parentTaskRun.getId());
return false;
}
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry())
&& taskRun.getState().isFailed();
});
}
public boolean hasCreated() {

View File

@@ -1,16 +1,15 @@
package io.kestra.core.models.executions;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.triggers.AbstractTrigger;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import jakarta.validation.constraints.NotNull;
@Value
@Builder
@@ -22,7 +21,6 @@ public class ExecutionTrigger {
@NotNull
String type;
@Schema(type = "object", additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
Map<String, Object> variables;
URI logFile;

View File

@@ -3,7 +3,7 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.swagger.v3.oas.annotations.Hidden;
@@ -97,7 +97,7 @@ public class LogEntry implements DeletedInterface, TenantInterface {
.build();
}
public static LogEntry of(FlowInterface flow, AbstractTrigger abstractTrigger) {
public static LogEntry of(Flow flow, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
return LogEntry.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
@@ -107,7 +107,7 @@ public class LogEntry implements DeletedInterface, TenantInterface {
.build();
}
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger) {
public static LogEntry of(TriggerContext triggerContext, AbstractTrigger abstractTrigger, ExecutionKind executionKind) {
return LogEntry.builder()
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())

View File

@@ -314,11 +314,4 @@ public class TaskRun implements TenantInterface {
.build();
}
public TaskRun addAttempt(TaskRunAttempt attempt) {
if (this.attempts == null) {
this.attempts = new ArrayList<>();
}
this.attempts.add(attempt);
return this;
}
}

View File

@@ -24,8 +24,4 @@ public class Concurrency {
public enum Behavior {
QUEUE, CANCEL, FAIL;
}
public static boolean possibleTransitions(State.Type type) {
return type.equals(State.Type.CANCELLED) || type.equals(State.Type.FAILED);
}
}

View File

@@ -11,7 +11,6 @@ import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
@@ -130,14 +129,6 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
@PluginProperty
List<SLA> sla;
@Schema(
title = "Conditions evaluated before the flow is executed.",
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
)
@Valid
@PluginProperty
List<Check> checks;
public Stream<String> allTypes() {
return Stream.of(

View File

@@ -43,7 +43,6 @@ public class FlowWithSource extends Flow {
.concurrency(this.concurrency)
.retry(this.retry)
.sla(this.sla)
.checks(this.checks)
.build();
}
@@ -86,7 +85,6 @@ public class FlowWithSource extends Flow {
.concurrency(flow.concurrency)
.retry(flow.retry)
.sla(flow.sla)
.checks(flow.checks)
.build();
}
}

View File

@@ -84,24 +84,12 @@ public class State {
);
}
/**
* non-terminated execution duration is hard to provide in SQL, so we set it to null when endDate is empty
*/
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public Optional<Duration> getDuration() {
if (this.getEndDate().isPresent()) {
return Optional.of(Duration.between(this.getStartDate(), this.getEndDate().get()));
} else {
return Optional.empty();
}
}
/**
* @return either the Duration persisted in database, or calculate it on the fly for non-terminated executions
*/
public Duration getDurationOrComputeIt() {
return this.getDuration().orElseGet(() -> Duration.between(this.getStartDate(), Instant.now()));
public Duration getDuration() {
return Duration.between(
this.histories.getFirst().getDate(),
this.histories.size() > 1 ? this.histories.get(this.histories.size() - 1).getDate() : Instant.now()
);
}
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
@@ -121,7 +109,7 @@ public class State {
public String humanDuration() {
try {
return DurationFormatUtils.formatDurationHMS(getDurationOrComputeIt().toMillis());
return DurationFormatUtils.formatDurationHMS(getDuration().toMillis());
} catch (Throwable e) {
return getDuration().toString();
}

View File

@@ -1,109 +0,0 @@
package io.kestra.core.models.flows.check;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
* Represents a check within a Kestra flow.
* <p>
* A {@code Check} defines a boolean condition that is evaluated when validating flow's inputs
* and before triggering an execution.
* <p>
* If the condition evaluates to {@code false}, the configured {@link Behavior}
* determines how the execution proceeds, and the {@link Style} determines how
* the message is visually presented in the UI.
* </p>
*/
@SuperBuilder
@Getter
@NoArgsConstructor
public class Check {
/**
* The condition to evaluate.
*/
@NotNull
@NotEmpty
String condition;
/**
* The message associated with this check, will be displayed when the condition evaluates to {@code false}.
*/
@NotEmpty
String message;
/**
* Defines the style of the message displayed in the UI when the condition evaluates to {@code false}.
*/
Style style = Style.INFO;
/**
* The behavior to apply when the condition evaluates to {@code false}.
*/
Behavior behavior = Behavior.BLOCK_EXECUTION;
/**
* The visual style used to display the message when the check fails.
*/
public enum Style {
/**
* Display the message as an error.
*/
ERROR,
/**
* Display the message as a success indicator.
*/
SUCCESS,
/**
* Display the message as a warning.
*/
WARNING,
/**
* Display the message as informational content.
*/
INFO;
}
/**
* Defines how the flow should behave when the condition evaluates to {@code false}.
*/
public enum Behavior {
/**
* Block the creation of the execution.
*/
BLOCK_EXECUTION,
/**
* Create the execution as failed.
*/
FAIL_EXECUTION,
/**
* Create a new execution as a result of the check failing.
*/
CREATE_EXECUTION;
}
/**
* Resolves the effective behavior for a list of {@link Check}s based on priority.
*
* @param checks the list of checks whose behaviors are to be evaluated
* @return the highest-priority behavior, or {@code CREATE_EXECUTION} if the list is empty or only contains nulls
*/
public static Check.Behavior resolveBehavior(List<Check> checks) {
if (checks == null || checks.isEmpty()) {
return Behavior.CREATE_EXECUTION;
}
return checks.stream()
.map(Check::getBehavior)
.filter(Objects::nonNull).min(Comparator.comparingInt(Enum::ordinal))
.orElse(Behavior.CREATE_EXECUTION);
}
}

View File

@@ -48,7 +48,7 @@ public class SubflowGraphTask extends AbstractGraphTask {
public record SubflowTaskWrapper<T extends Output>(RunContext runContext, ExecutableTask<T> subflowTask) implements TaskInterface, ExecutableTask<T> {
@Override
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext, FlowMetaStoreInterface flowExecutorInterface, FlowInterface currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext, FlowMetaStoreInterface flowExecutorInterface, Flow currentFlow, Execution currentExecution, TaskRun currentTaskRun) throws InternalException {
return subflowTask.createSubflowExecutions(runContext, flowExecutorInterface, currentFlow, currentExecution, currentTaskRun);
}

View File

@@ -20,6 +20,7 @@ import java.util.Optional;
@Slf4j
@Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@AllArgsConstructor
@ToString
@EqualsAndHashCode
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID {
@@ -53,19 +54,6 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
private boolean deleted;
public PersistedKvMetadata(String tenantId, String namespace, String name, String description, Integer version, boolean last, @Nullable Instant expirationDate, @Nullable Instant created, @Nullable Instant updated, boolean deleted) {
this.tenantId = tenantId;
this.namespace = namespace;
this.name = name;
this.description = description;
this.version = version;
this.last = last;
this.expirationDate = expirationDate;
this.created = Optional.ofNullable(created).orElse(Instant.now());
this.updated = updated;
this.deleted = deleted;
}
public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) {
return PersistedKvMetadata.builder()
.tenantId(tenantId)
@@ -80,15 +68,12 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
}
public PersistedKvMetadata asLast() {
return this.toBuilder().updated(Instant.now()).last(true).build();
}
public PersistedKvMetadata toDeleted() {
return this.toBuilder().updated(Instant.now()).deleted(true).build();
Instant saveDate = Instant.now();
return this.toBuilder().created(Optional.ofNullable(this.created).orElse(saveDate)).updated(saveDate).last(true).build();
}
@Override
public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), String.valueOf(getVersion()));
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), getVersion().toString());
}
}

View File

@@ -1,132 +0,0 @@
package io.kestra.core.models.namespaces.files;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
@Builder(toBuilder = true)
@Slf4j
@Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@ToString
@EqualsAndHashCode
public class NamespaceFileMetadata implements DeletedInterface, TenantInterface, HasUID {
@With
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
private String namespace;
@NotNull
private String path;
private String parentPath;
@NotNull
private Integer version;
@Builder.Default
private boolean last = true;
@NotNull
private Long size;
@Builder.Default
private Instant created = Instant.now();
@Nullable
private Instant updated;
private boolean deleted;
@JsonCreator
public NamespaceFileMetadata(String tenantId, String namespace, String path, String parentPath, Integer version, boolean last, Long size, Instant created, @Nullable Instant updated, boolean deleted) {
this.tenantId = tenantId;
this.namespace = namespace;
this.path = path;
this.parentPath = parentPath(path);
this.version = version;
this.last = last;
this.size = size;
this.created = created;
this.updated = updated;
this.deleted = deleted;
}
public static String path(String path, boolean trailingSlash) {
if (trailingSlash && !path.endsWith("/")) {
return path + "/";
} else if (!trailingSlash && path.endsWith("/")) {
return path.substring(0, path.length() - 1);
}
return path;
}
public String path(boolean trailingSlash) {
return path(this.path, trailingSlash);
}
public static String parentPath(String path) {
String withoutTrailingSlash = path.endsWith("/") ? path.substring(0, path.length() - 1) : path;
// The parent path can't be set, it's always computed
return withoutTrailingSlash.contains("/") ?
withoutTrailingSlash.substring(0, withoutTrailingSlash.lastIndexOf("/") + 1) :
null;
}
public static NamespaceFileMetadata of(String tenantId, NamespaceFile namespaceFile) {
return NamespaceFileMetadata.builder()
.tenantId(tenantId)
.namespace(namespaceFile.namespace())
.path(namespaceFile.path(true).toString())
.version(namespaceFile.version())
.build();
}
public static NamespaceFileMetadata of(String tenantId, String namespace, String path, FileAttributes fileAttributes) {
return NamespaceFileMetadata.builder()
.tenantId(tenantId)
.namespace(namespace)
.path(path)
.created(Instant.ofEpochMilli(fileAttributes.getCreationTime()))
.updated(Instant.ofEpochMilli(fileAttributes.getLastModifiedTime()))
.size(fileAttributes.getSize())
.version(1)
.build();
}
public NamespaceFileMetadata asLast() {
Instant saveDate = Instant.now();
return this.toBuilder().updated(saveDate).last(true).build();
}
public NamespaceFileMetadata toDeleted() {
return this.toBuilder().deleted(true).updated(Instant.now()).build();
}
@Override
public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getPath(), String.valueOf(getVersion()));
}
@JsonIgnore
public boolean isDirectory() {
return this.path.endsWith("/");
}
}

View File

@@ -35,6 +35,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@JsonDeserialize(using = Property.PropertyDeserializer.class)
@JsonSerialize(using = Property.PropertySerializer.class)
@Builder
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE)
@Schema(
oneOf = {
@@ -50,7 +51,6 @@ public class Property<T> {
.copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
private final boolean skipCache;
private String expression;
private T value;
@@ -60,23 +60,13 @@ public class Property<T> {
@Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) {
this(expression, false);
}
private Property(String expression, boolean skipCache) {
this.expression = expression;
this.skipCache = skipCache;
}
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@VisibleForTesting
@Deprecated
public Property(Map<?, ?> map) {
try {
expression = MAPPER.writeValueAsString(map);
this.skipCache = false;
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
@@ -89,6 +79,9 @@ public class Property<T> {
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
*
* @return a new {@link Property} without a pre-rendered value
*/
@@ -140,7 +133,6 @@ public class Property<T> {
/**
* Build a new Property object with a Pebble expression.<br>
* This property object will not cache its rendered value.
* <p>
* Use {@link #ofValue(Object)} to build a property with a value instead.
*/
@@ -150,11 +142,11 @@ public class Property<T> {
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
}
return new Property<>(expression, true);
return new Property<>(expression);
}
/**
* Render a property, then convert it to its target type.<br>
* Render a property then convert it to its target type.<br>
* <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
@@ -172,7 +164,7 @@ public class Property<T> {
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) {
if (property.value == null) {
String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -200,7 +192,7 @@ public class Property<T> {
*/
@SuppressWarnings("unchecked")
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) {
if (property.value == null) {
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
try {
String trimmedExpression = property.expression.trim();
@@ -252,7 +244,7 @@ public class Property<T> {
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) {
if (property.value == null) {
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);
try {

View File

@@ -24,7 +24,7 @@ public interface ExecutableTask<T extends Output>{
*/
List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
FlowMetaStoreInterface flowExecutorInterface,
FlowInterface currentFlow, Execution currentExecution,
Flow currentFlow, Execution currentExecution,
TaskRun currentTaskRun) throws InternalException;
/**

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import jakarta.validation.constraints.NotNull;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -214,7 +215,8 @@ abstract public class PluginUtilsService {
realNamespace = runContext.render(namespace);
realFlowId = runContext.render(flowId);
// validate that the flow exists: a.k.a access is authorized by this namespace
runContext.acl().allowNamespace(realNamespace).check();
FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(flowInfo.tenantId(), realNamespace, flowInfo.tenantId(), flowInfo.namespace());
} else if (namespace != null || flowId != null) {
throw new IllegalArgumentException("Both `namespace` and `flowId` must be set when `executionId` is set.");
} else {

View File

@@ -74,7 +74,7 @@ public class Trigger extends TriggerContext implements HasUID {
);
}
public static String uid(FlowInterface flow, AbstractTrigger abstractTrigger) {
public static String uid(Flow flow, AbstractTrigger abstractTrigger) {
return IdUtils.fromParts(
flow.getTenantId(),
flow.getNamespace(),

View File

@@ -2,12 +2,14 @@ package io.kestra.core.models.triggers.multipleflows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.utils.IdUtils;
import lombok.Builder;
import lombok.Value;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

View File

@@ -23,12 +23,12 @@ import java.util.Objects;
@Singleton
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
private final FlowRepositoryInterface flowRepository;
private final ExecutionRepositoryInterface executionRepository;
private final DashboardRepositoryInterface dashboardRepository;
private final boolean enabled;
@Inject
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
ExecutionRepositoryInterface executionRepository,
@@ -37,26 +37,26 @@ public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.Us
this.flowRepository = flowRepository;
this.executionRepository = executionRepository;
this.dashboardRepository = dashboardRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public UsageEvent report(final Instant now, TimeInterval interval) {
return UsageEvent
.builder()
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
.dashboards(new Count(dashboardRepository.countAllForAllTenants()))
.dashboards(new Count(dashboardRepository.count()))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
Objects.requireNonNull(tenant, "tenant is null");
@@ -67,7 +67,7 @@ public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.Us
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
.build();
}
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized

View File

@@ -16,14 +16,14 @@ import java.util.Map;
import java.util.Optional;
public interface DashboardRepositoryInterface {
/**
* Gets the total number of Dashboards.
*
* @return the total number.
*/
long countAllForAllTenants();
long count();
Boolean isEnabled();
Optional<Dashboard> get(String tenantId, String id);

View File

@@ -2,6 +2,7 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;
@@ -93,8 +94,6 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
Flux<Execution> findAllAsync(@Nullable String tenantId);
Flux<Execution> findAsync(String tenantId, List<QueryFilter> filters);
Execution delete(Execution execution);
Integer purge(Execution execution);

View File

@@ -8,7 +8,6 @@ import io.kestra.plugin.core.dashboard.data.Flows;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolationException;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.Optional;
@@ -159,8 +158,6 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fie
.toList();
}
Flux<Flow> findAsync(String tenantId, List<QueryFilter> filters);
FlowWithSource create(GenericFlow flow);
FlowWithSource update(GenericFlow flow, FlowInterface previous) throws ConstraintViolationException;

View File

@@ -1,46 +0,0 @@
package io.kestra.core.repositories;
import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.micronaut.data.model.Pageable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
public interface NamespaceFileMetadataRepositoryInterface extends SaveRepositoryInterface<NamespaceFileMetadata> {
Optional<NamespaceFileMetadata> findByPath(
String tenantId,
String namespace,
String path
) throws IOException;
default ArrayListTotal<NamespaceFileMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted
) {
return this.find(pageable, tenantId, filters, allowDeleted, FetchVersion.LATEST);
}
ArrayListTotal<NamespaceFileMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted,
FetchVersion fetchBehavior
);
default NamespaceFileMetadata delete(NamespaceFileMetadata namespaceFileMetadata) throws IOException {
return this.save(namespaceFileMetadata.toBuilder().deleted(true).build());
}
/**
* Purge (hard delete) a list of namespace files metadata. If no version is specified, all versions are purged.
* @param namespaceFilesMetadata the list of namespace files metadata to purge
* @return the number of purged namespace files metadata
*/
Integer purge(List<NamespaceFileMetadata> namespaceFilesMetadata);
}

View File

@@ -39,13 +39,13 @@ public interface TriggerRepositoryInterface extends QueryBuilderInterface<Trigge
* @param tenantId the tenant of the triggers
* @return The count.
*/
long countAll(@Nullable String tenantId);
int count(@Nullable String tenantId);
/**
* Find all triggers that match the query, return a flux of triggers
* as the search is not paginated
*/
Flux<Trigger> findAsync(String tenantId, List<QueryFilter> filters);
Flux<Trigger> find(String tenantId, List<QueryFilter> filters);
default Function<String, String> sortMapping() throws IllegalArgumentException {
return Function.identity();

View File

@@ -1,50 +0,0 @@
package io.kestra.core.runners;
import javax.annotation.CheckReturnValue;
import java.util.List;
/**
* Check if the current taskrun has access to the requested resources.
*
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*
* @see AllowedResources
*/
public interface AclChecker {
/**Tasks that need to access resources outside their namespace should use this interface to check ACL (Allowed namespaces in EE).
* Allow all namespaces.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
@CheckReturnValue
AllowedResources allowAllNamespaces();
/**
* Allow only the given namespace.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
@CheckReturnValue
AllowedResources allowNamespace(String namespace);
/**
* Allow only the given namespaces.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
@CheckReturnValue
AllowedResources allowNamespaces(List<String> namespaces);
/**
* Represents a set of allowed resources.
* Tasks that need to access resources outside their namespace should call the <code>check()</code> method to check the ACL (Allowed namespaces in EE).
*/
interface AllowedResources {
/**
* Check if the current taskrun has access to the requested resources.
*/
void check();
}
}

View File

@@ -1,86 +0,0 @@
package io.kestra.core.runners;
import io.kestra.core.services.NamespaceService;
import io.micronaut.context.ApplicationContext;
import java.util.List;
import java.util.Objects;
class AclCheckerImpl implements AclChecker {
private final NamespaceService namespaceService;
private final RunContext.FlowInfo flowInfo;
AclCheckerImpl(ApplicationContext applicationContext, RunContext.FlowInfo flowInfo) {
this.namespaceService = applicationContext.getBean(NamespaceService.class);
this.flowInfo = flowInfo;
}
@Override
public AllowedResources allowAllNamespaces() {
return new AllowAllNamespaces(flowInfo, namespaceService);
}
@Override
public AllowedResources allowNamespace(String namespace) {
return new AllowNamespace(flowInfo, namespaceService, namespace);
}
@Override
public AllowedResources allowNamespaces(List<String> namespaces) {
return new AllowNamespaces(flowInfo, namespaceService, namespaces);
}
static class AllowAllNamespaces implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
AllowAllNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
}
@Override
public void check() {
this.namespaceService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace());
}
}
static class AllowNamespace implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
private final String namespace;
public AllowNamespace(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, String namespace) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
this.namespace = Objects.requireNonNull(namespace);
}
@Override
public void check() {
namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace());
}
}
static class AllowNamespaces implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
private final List<String> namespaces;
AllowNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, List<String> namespaces) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
this.namespaces = Objects.requireNonNull(namespaces);
if (namespaces.isEmpty()) {
throw new IllegalArgumentException("At least one namespace must be provided");
}
}
@Override
public void check() {
namespaces.forEach(namespace -> namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
}
}
}

View File

@@ -123,12 +123,7 @@ public class DefaultRunContext extends RunContext {
this.traceParent = traceParent;
}
/**
* @deprecated Plugin should not use the ApplicationContext anymore, and neither should they cast to this implementation.
* Plugin should instead rely on supported API only.
*/
@JsonIgnore
@Deprecated(since = "1.2.0", forRemoval = true)
public ApplicationContext getApplicationContext() {
return applicationContext;
}
@@ -579,11 +574,6 @@ public class DefaultRunContext extends RunContext {
return isInitialized.get();
}
@Override
public AclChecker acl() {
return new AclCheckerImpl(this.applicationContext, flowInfo());
}
@Override
public LocalPath localPath() {
return localPath;

View File

@@ -26,6 +26,7 @@ import org.apache.commons.lang3.stream.Streams;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
import static io.kestra.core.trace.Tracer.throwCallable;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@@ -53,10 +54,12 @@ public final class ExecutableUtils {
}
public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskrun, Execution execution) {
List<TaskRunAttempt> attempts = parentTaskrun.getAttempts() == null ? new ArrayList<>() : new ArrayList<>(parentTaskrun.getAttempts());
attempts.add(TaskRunAttempt.builder().state(parentTaskrun.getState()).build());
return SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(parentTaskrun.getState().getCurrent())
.parentTaskRun(parentTaskrun.addAttempt(TaskRunAttempt.builder().state(parentTaskrun.getState()).build()))
.parentTaskRun(parentTaskrun.withAttempts(attempts))
.build();
}
@@ -64,7 +67,7 @@ public final class ExecutableUtils {
RunContext runContext,
FlowMetaStoreInterface flowExecutorInterface,
Execution currentExecution,
FlowInterface currentFlow,
Flow currentFlow,
T currentTask,
TaskRun currentTaskRun,
Map<String, Object> inputs,

View File

@@ -7,6 +7,7 @@ import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
@@ -63,11 +64,11 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
public class FlowInputOutput {
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
@@ -78,7 +79,7 @@ public class FlowInputOutput {
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
}
/**
* Validate all the inputs of a given execution of a flow.
*
@@ -88,15 +89,15 @@ public class FlowInputOutput {
* @return The list of {@link InputAndValue}.
*/
public Mono<List<InputAndValue>> validateExecutionInputs(final List<Input<?>> inputs,
final FlowInterface flow,
final Flow flow,
final Execution execution,
final Publisher<CompletedPart> data) {
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
return readData(inputs, execution, data, false)
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -110,7 +111,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -125,7 +126,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
}
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
return Flux.from(data)
.publishOn(Schedulers.boundedElastic())
@@ -234,7 +235,7 @@ public class FlowInputOutput {
}
return MapUtils.flattenToNestedMap(resolved);
}
/**
* Utility method for retrieving types inputs.
*
@@ -251,7 +252,7 @@ public class FlowInputOutput {
) {
return resolveInputs(inputs, flow, execution, data, true);
}
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
@@ -324,7 +325,7 @@ public class FlowInputOutput {
}
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed

View File

@@ -11,7 +11,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.Dag;
import java.util.*;
@@ -144,13 +143,6 @@ public class FlowableUtils {
return Collections.emptyList();
}
// have submitted, leave
Optional<TaskRun> lastSubmitted = execution.findLastSubmitted(taskRuns);
if (lastSubmitted.isPresent()) {
return Collections.emptyList();
}
// last success, find next
Optional<TaskRun> lastTerminated = execution.findLastTerminated(taskRuns);
if (lastTerminated.isPresent()) {
@@ -158,41 +150,14 @@ public class FlowableUtils {
if (currentTasks.size() > lastIndex + 1) {
return Collections.singletonList(currentTasks.get(lastIndex + 1).toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
} else {
return Collections.singletonList(currentTasks.getFirst().toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
}
}
return Collections.emptyList();
}
public static Optional<State.Type> resolveSequentialState(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> _finally,
TaskRun parentTaskRun,
RunContext runContext,
boolean allowFailure,
boolean allowWarning
) {
if (ListUtils.emptyOnNull(tasks).stream()
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
.findAny()
.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return resolveState(
execution,
tasks,
errors,
_finally,
parentTaskRun,
runContext,
allowFailure,
allowWarning
);
}
public static Optional<State.Type> resolveState(
Execution execution,
List<ResolvedTask> tasks,
@@ -248,7 +213,7 @@ public class FlowableUtils {
}
} else {
// first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows
if (execution.hasFailedNoRetry(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
if (execution.hasFailed(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
}
}

View File

@@ -192,16 +192,5 @@ public abstract class RunContext implements PropertyContext {
public record FlowInfo(String tenantId, String namespace, String id, Integer revision) {
}
/**
* @deprecated there is no legitimate use case of this method outside the run context internal self-usage, so it should not be part of the interface
*/
@Deprecated(since = "1.2.0", forRemoval = true)
public abstract boolean isInitialized();
/**
* Get access to the ACL checker.
* Plugins are responsible for using the ACL checker when they access restricted resources, for example,
* when Namespace ACLs are used (EE).
*/
public abstract AclChecker acl();
}

View File

@@ -6,16 +6,16 @@ import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.NamespaceFactory;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.micronaut.context.ApplicationContext;
@@ -41,7 +41,7 @@ public class RunContextFactory {
@Inject
protected VariableRenderer variableRenderer;
@Inject
protected SecureVariableRendererFactory secureVariableRendererFactory;
@@ -49,7 +49,7 @@ public class RunContextFactory {
protected StorageInterface storageInterface;
@Inject
protected NamespaceService namespaceService;
protected FlowService flowService;
@Inject
protected MetricRegistry metricRegistry;
@@ -77,18 +77,15 @@ public class RunContextFactory {
@Inject
private KVStoreService kvStoreService;
@Inject
private NamespaceFactory namespaceFactory;
// hacky
public RunContextInitializer initializer() {
return applicationContext.getBean(RunContextInitializer.class);
}
public RunContext of(FlowInterface flow, Execution execution) {
return of(flow, execution, Function.identity());
}
public RunContext of(FlowInterface flow, Execution execution, boolean decryptVariable) {
return of(flow, execution, Function.identity(), decryptVariable);
}
@@ -96,18 +93,18 @@ public class RunContextFactory {
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier) {
return of(flow, execution, runVariableModifier, true);
}
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier, boolean decryptVariables) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(execution);
VariableRenderer variableRenderer = decryptVariables ? this.variableRenderer : secureVariableRendererFactory.createOrGet();
return newBuilder()
// Logger
.withLogger(runContextLogger)
// Execution
.withPluginConfiguration(Map.of())
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, namespaceService, namespaceFactory))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
.withVariableRenderer(variableRenderer)
.withVariables(runVariableModifier.apply(
newRunVariablesBuilder()
@@ -137,7 +134,7 @@ public class RunContextFactory {
.withLogger(runContextLogger)
// Task
.withPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService, namespaceFactory))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService))
.withVariables(newRunVariablesBuilder()
.withFlow(flow)
.withTask(task)
@@ -153,8 +150,8 @@ public class RunContextFactory {
.build();
}
public RunContext of(FlowInterface flow, AbstractTrigger trigger) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(flow, trigger);
public RunContext of(Flow flow, AbstractTrigger trigger) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(flow, trigger, null);
return newBuilder()
// Logger
.withLogger(runContextLogger)
@@ -173,11 +170,11 @@ public class RunContextFactory {
@VisibleForTesting
public RunContext of(final FlowInterface flow, final Map<String, Object> variables) {
public RunContext of(final Flow flow, final Map<String, Object> variables) {
RunContextLogger runContextLogger = new RunContextLogger();
return newBuilder()
.withLogger(runContextLogger)
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, namespaceService, namespaceFactory))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, flowService))
.withVariables(variables)
.withSecretInputs(secretInputsFromFlow(flow))
.build();
@@ -216,8 +213,7 @@ public class RunContextFactory {
}
},
storageInterface,
namespaceService,
namespaceFactory
flowService
))
.withVariables(variables)
.withTask(task)

View File

@@ -8,9 +8,8 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.services.FlowService;
import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.NamespaceFactory;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
@@ -45,10 +44,7 @@ public class RunContextInitializer {
protected StorageInterface storageInterface;
@Inject
protected NamespaceFactory namespaceFactory;
@Inject
protected NamespaceService namespaceService;
protected FlowService flowService;
@Value("${kestra.encryption.secret-key}")
protected Optional<String> secretKey;
@@ -139,7 +135,7 @@ public class RunContextInitializer {
runContext.setVariables(enrichedVariables);
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()));
runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService, namespaceFactory));
runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService));
runContext.setLogger(runContextLogger);
runContext.setTask(task);
@@ -217,7 +213,7 @@ public class RunContextInitializer {
runContext.init(applicationContext);
final String triggerExecutionId = IdUtils.create();
final RunContextLogger runContextLogger = contextLoggerFactory.create(triggerContext, trigger);
final RunContextLogger runContextLogger = contextLoggerFactory.create(triggerContext, trigger, null);
final Map<String, Object> variables = new HashMap<>(runContext.getVariables());
variables.put(RunVariables.SECRET_CONSUMER_VARIABLE_NAME, (Consumer<String>) runContextLogger::usedSecret);
@@ -234,8 +230,7 @@ public class RunContextInitializer {
runContextLogger.logger(),
context,
storageInterface,
namespaceService,
namespaceFactory
flowService
);
runContext.setLogger(runContextLogger);

View File

@@ -4,7 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
@@ -46,19 +46,19 @@ public class RunContextLoggerFactory {
);
}
public RunContextLogger create(TriggerContext triggerContext, AbstractTrigger trigger) {
public RunContextLogger create(TriggerContext triggerContext, AbstractTrigger trigger, ExecutionKind executionKind) {
return new RunContextLogger(
logQueue,
LogEntry.of(triggerContext, trigger),
LogEntry.of(triggerContext, trigger, executionKind),
trigger.getLogLevel(),
trigger.isLogToFile()
);
}
public RunContextLogger create(FlowInterface flow, AbstractTrigger trigger) {
public RunContextLogger create(Flow flow, AbstractTrigger trigger, ExecutionKind executionKind) {
return new RunContextLogger(
logQueue,
LogEntry.of(flow, trigger),
LogEntry.of(flow, trigger, executionKind),
trigger.getLogLevel(),
trigger.isLogToFile()
);

View File

@@ -5,8 +5,8 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.SecretInput;
@@ -73,7 +73,7 @@ public final class RunVariables {
}
/**
* Creates an immutable map representation of the given {@link FlowInterface}.
* Creates an immutable map representation of the given {@link Flow}.
*
* @param flow The flow from which to create variables.
* @return a new immutable {@link Map}.
@@ -283,7 +283,7 @@ public final class RunVariables {
if (flow != null && flow.getInputs() != null) {
// Create a new PropertyContext with 'flow' variables which are required by some pebble expressions.
PropertyContextWithVariables context = new PropertyContextWithVariables(propertyContext, Map.of("flow", RunVariables.of(flow)));
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
@@ -326,7 +326,7 @@ public final class RunVariables {
}
if (flow == null) {
FlowInterface flowFromExecution = GenericFlow.builder()
Flow flowFromExecution = Flow.builder()
.id(execution.getFlowId())
.tenantId(execution.getTenantId())
.revision(execution.getFlowRevision())
@@ -393,17 +393,17 @@ public final class RunVariables {
}
private RunVariables(){}
private record PropertyContextWithVariables(
PropertyContext delegate,
Map<String, Object> variables
) implements PropertyContext {
@Override
public String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return delegate.render(inline, variables.isEmpty() ? this.variables : variables);
}
@Override
public Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return delegate.render(inline, variables.isEmpty() ? this.variables : variables);

View File

@@ -160,7 +160,7 @@ public class RunnerUtils {
executionEmitter.run();
if (duration == null) {
Await.untilWithSleepInterval(() -> receive.get() != null, Duration.ofMillis(10));
Await.until(() -> receive.get() != null, Duration.ofMillis(10));
} else {
Await.until(() -> receive.get() != null, Duration.ofMillis(10), duration);
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
@@ -27,7 +28,7 @@ public interface SchedulerTriggerStateInterface {
Trigger update(Trigger trigger);
Trigger update(FlowWithSource flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
/**
* QueueException required for Kafka implementation

View File

@@ -2,8 +2,11 @@ package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.runners.LocalPathFactory;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.storages.*;
import io.kestra.core.services.FlowService;
import io.kestra.core.storages.InternalNamespace;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.annotation.Value;
import io.pebbletemplates.pebble.error.PebbleException;
@@ -33,7 +36,7 @@ abstract class AbstractFileFunction implements Function {
private static final Pattern EXECUTION_FILE = Pattern.compile(".*/.*/executions/.*/tasks/.*/.*");
@Inject
protected NamespaceService namespaceService;
protected FlowService flowService;
@Inject
protected StorageInterface storageInterface;
@@ -41,9 +44,6 @@ abstract class AbstractFileFunction implements Function {
@Inject
protected LocalPathFactory localPathFactory;
@Inject
protected NamespaceFactory namespaceFactory;
@Value("${" + LocalPath.ENABLE_FILE_FUNCTIONS_CONFIG + ":true}")
protected boolean enableFileProtocol;
@@ -81,21 +81,23 @@ abstract class AbstractFileFunction implements Function {
} else if (str.startsWith(LocalPath.FILE_PROTOCOL)) {
fileUri = URI.create(str);
namespace = checkEnabledLocalFileAndReturnNamespace(args, flow);
} else if (str.startsWith(Namespace.NAMESPACE_FILE_SCHEME)) {
fileUri = URI.create(str);
namespace = checkedAllowedNamespaceAndReturnNamespace(args, fileUri, tenantId, flow);
} else if(str.startsWith(Namespace.NAMESPACE_FILE_SCHEME)) {
URI nsFileUri = URI.create(str);
namespace = checkedAllowedNamespaceAndReturnNamespace(args, nsFileUri, tenantId, flow);
InternalNamespace internalNamespace = new InternalNamespace(flow.get(TENANT_ID), namespace, storageInterface);
fileUri = internalNamespace.get(Path.of(nsFileUri.getPath())).uri();
} else if (URI_PATTERN.matcher(str).matches()) {
// it is an unsupported URI
throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(str));
} else {
fileUri = URI.create(Namespace.NAMESPACE_FILE_SCHEME + ":///" + str);
namespace = (String) Optional.ofNullable(args.get(NAMESPACE)).orElse(flow.get(NAMESPACE));
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE));
fileUri = URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + "/" + str);
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE));
}
} else {
throw new PebbleException(null, "Unable to read the file " + path, lineNumber, self.getName());
}
return fileFunction(context, fileUri, namespace, tenantId, args);
return fileFunction(context, fileUri, namespace, tenantId);
} catch (IOException e) {
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
}
@@ -108,7 +110,7 @@ abstract class AbstractFileFunction implements Function {
protected abstract String getErrorMessage();
protected abstract Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException;
protected abstract Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException;
boolean isFileUriValid(String namespace, String flowId, String executionId, URI path) {
// Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion
@@ -175,7 +177,7 @@ abstract class AbstractFileFunction implements Function {
// 5. replace '/' with '.'
namespace = namespace.replace("/", ".");
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
return namespace;
}
@@ -196,7 +198,7 @@ abstract class AbstractFileFunction implements Function {
// we will transform nsfile URI into a kestra URI so it is handled seamlessly by all functions
String customNs = Optional.ofNullable((String) args.get(NAMESPACE)).orElse(nsFileUri.getAuthority());
if (customNs != null) {
namespaceService.checkAllowedNamespace(tenantId, customNs, tenantId, flow.get(NAMESPACE));
flowService.checkAllowedNamespace(tenantId, customNs, tenantId, flow.get(NAMESPACE));
}
return Optional.ofNullable(customNs).orElse(flow.get(NAMESPACE));
}

View File

@@ -3,7 +3,7 @@ package io.kestra.core.runners.pebble.functions;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.tasks.retrys.Exponential;
import io.kestra.core.runners.pebble.PebbleUtils;
import io.kestra.core.services.ExecutionLogService;
import io.kestra.core.services.LogService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.RetryUtils;
import io.micronaut.context.annotation.Requires;
@@ -23,7 +23,7 @@ import java.util.Map;
@Requires(property = "kestra.repository.type")
public class ErrorLogsFunction implements Function {
@Inject
private ExecutionLogService logService;
private LogService logService;
@Inject
private PebbleUtils pebbleUtils;

View File

@@ -1,30 +1,22 @@
package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageContext;
import io.pebbletemplates.pebble.template.EvaluationContext;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.Map;
@Singleton
public class FileExistsFunction extends AbstractFileFunction {
private static final String ERROR_MESSAGE = "The 'fileExists' function expects an argument 'path' that is a path to the internal storage URI.";
@Override
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
return switch (path.getScheme()) {
case StorageContext.KESTRA_SCHEME -> storageInterface.exists(tenantId, namespace, path);
case LocalPath.FILE_SCHEME -> localPathFactory.createLocalPath().exists(path);
case Namespace.NAMESPACE_FILE_SCHEME -> {
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
yield namespaceStorage.exists(NamespaceFile.normalize(Path.of(path.getPath()), true));
}
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
};
}

View File

@@ -2,23 +2,19 @@ package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageContext;
import io.pebbletemplates.pebble.template.EvaluationContext;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Map;
@Singleton
public class FileSizeFunction extends AbstractFileFunction {
private static final String ERROR_MESSAGE = "The 'fileSize' function expects an argument 'path' that is a path to the internal storage URI.";
@Override
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
return switch (path.getScheme()) {
case StorageContext.KESTRA_SCHEME -> {
FileAttributes fileAttributes = storageInterface.getAttributes(tenantId, namespace, path);
@@ -28,12 +24,6 @@ public class FileSizeFunction extends AbstractFileFunction {
BasicFileAttributes fileAttributes = localPathFactory.createLocalPath().getAttributes(path);
yield fileAttributes.size();
}
case Namespace.NAMESPACE_FILE_SCHEME -> {
FileAttributes fileAttributes = namespaceFactory
.of(tenantId, namespace, storageInterface)
.getFileMetadata(NamespaceFile.normalize(Path.of(path.getPath()), true));
yield fileAttributes.getSize();
}
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
};
}

View File

@@ -1,24 +1,19 @@
package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageContext;
import io.pebbletemplates.pebble.template.EvaluationContext;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.util.Map;
@Singleton
public class IsFileEmptyFunction extends AbstractFileFunction {
private static final String ERROR_MESSAGE = "The 'isFileEmpty' function expects an argument 'path' that is a path to a namespace file or an internal storage URI.";
@Override
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
return switch (path.getScheme()) {
case StorageContext.KESTRA_SCHEME -> {
try (InputStream inputStream = storageInterface.get(tenantId, namespace, path)) {
@@ -32,12 +27,6 @@ public class IsFileEmptyFunction extends AbstractFileFunction {
yield inputStream.read(buffer, 0, 1) <= 0;
}
}
case Namespace.NAMESPACE_FILE_SCHEME -> {
FileAttributes fileAttributes = namespaceFactory
.of(tenantId, namespace, storageInterface)
.getFileMetadata(NamespaceFile.normalize(Path.of(path.getPath()), true));
yield fileAttributes.getSize() <= 0;
}
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
};
}
@@ -46,4 +35,4 @@ public class IsFileEmptyFunction extends AbstractFileFunction {
protected String getErrorMessage() {
return ERROR_MESSAGE;
}
}
}

View File

@@ -1,37 +1,20 @@
package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageContext;
import io.pebbletemplates.pebble.template.EvaluationContext;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@Singleton
public class ReadFileFunction extends AbstractFileFunction {
public static final String VERSION = "version";
private static final String ERROR_MESSAGE = "The 'read' function expects an argument 'path' that is a path to a namespace file or an internal storage URI.";
@Override
public List<String> getArgumentNames() {
return Stream.concat(
super.getArgumentNames().stream(),
Stream.of(VERSION)
).toList();
}
@Override
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
return switch (path.getScheme()) {
case StorageContext.KESTRA_SCHEME -> {
try (InputStream inputStream = storageInterface.get(tenantId, namespace, path)) {
@@ -43,30 +26,12 @@ public class ReadFileFunction extends AbstractFileFunction {
yield new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
case Namespace.NAMESPACE_FILE_SCHEME -> {
try (InputStream inputStream = contentInputStream(path, namespace, tenantId, args)) {
yield new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
};
}
private InputStream contentInputStream(URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
if (args.containsKey(VERSION)) {
return namespaceStorage.getFileContent(
NamespaceFile.normalize(Path.of(path.getPath()), true),
Integer.parseInt(args.get(VERSION).toString())
);
}
return namespaceStorage.getFileContent(NamespaceFile.normalize(Path.of(path.getPath()), true));
}
@Override
protected String getErrorMessage() {
return ERROR_MESSAGE;
}
}
}

View File

@@ -9,7 +9,6 @@ import io.kestra.core.secret.SecretNotFoundException;
import io.kestra.core.secret.SecretService;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.NamespaceService;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
@@ -37,7 +36,7 @@ public class SecretFunction implements Function {
private SecretService secretService;
@Inject
private NamespaceService namespaceService;
private FlowService flowService;
@Override
public List<String> getArgumentNames() {
@@ -57,7 +56,7 @@ public class SecretFunction implements Function {
if (namespace == null) {
namespace = flowNamespace;
} else {
namespaceService.checkAllowedNamespace(flowTenantId, namespace, flowTenantId, flowNamespace);
flowService.checkAllowedNamespace(flowTenantId, namespace, flowTenantId, flowNamespace);
}
try {

View File

@@ -2,15 +2,12 @@ package io.kestra.core.services;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.slf4j.event.Level;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -20,42 +17,9 @@ import java.util.stream.Stream;
*/
@Singleton
public class ExecutionLogService {
private final LogRepositoryInterface logRepository;
@Inject
public ExecutionLogService(LogRepositoryInterface logRepository) {
this.logRepository = logRepository;
}
private LogRepositoryInterface logRepository;
/**
* Purges log entries matching the given criteria.
*
* @param tenantId the tenant identifier
* @param namespace the namespace of the flow
* @param flowId the flow identifier
* @param executionId the execution identifier
* @param logLevels the list of log levels to delete
* @param startDate the start of the date range
* @param endDate the end of the date range.
* @return the number of log entries deleted
*/
public int purge(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
return logRepository.deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate);
}
/**
* Fetches the error logs of an execution.
* <p>
* This method limits the results to the first 25 error logs, ordered by timestamp asc.
*
* @return the log entries
*/
public List<LogEntry> errorLogs(String tenantId, String executionId) {
return logRepository.findByExecutionId(tenantId, executionId, Level.ERROR, Pageable.from(1, 25, Sort.of(Sort.Order.asc("timestamp"))));
}
public InputStream getExecutionLogsAsStream(String tenantId,
String executionId,
Level minLevel,

View File

@@ -2,10 +2,8 @@ package io.kestra.core.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger;
@@ -14,13 +12,10 @@ import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.Pause;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
@@ -59,9 +54,6 @@ public class FlowService {
@Inject
Optional<FlowTopologyRepositoryInterface> flowTopologyRepository;
@Inject
Provider<RunContextFactory> runContextFactory; // Lazy init: avoid circular dependency error.
/**
* Validates and creates the given flow.
* <p>
@@ -93,50 +85,6 @@ public class FlowService {
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
}
/**
* Evaluates all checks defined in the given flow using the provided inputs.
* <p>
* Each check's {@link Check#getCondition()} is evaluated in the context of the flow.
* If a condition evaluates to {@code false} or fails to evaluate due to a
* variable error, the corresponding {@link Check} is added to the returned list.
* </p>
*
* @param flow the flow containing the checks to evaluate
* @param inputs the input values used when evaluating the conditions
* @return a list of checks whose conditions evaluated to {@code false} or failed to evaluate
*/
public List<Check> getFailedChecks(Flow flow, Map<String, Object> inputs) {
if (!ListUtils.isEmpty(flow.getChecks())) {
RunContext runContext = runContextFactory.get().of(flow, Map.of("inputs", inputs));
List<Check> falseConditions = new ArrayList<>();
for (Check check : flow.getChecks()) {
try {
boolean result = Boolean.TRUE.equals(runContext.renderTyped(check.getCondition()));
if (!result) {
falseConditions.add(check);
}
} catch (IllegalVariableEvaluationException e) {
log.debug("[tenant: {}] [namespace: {}] [flow: {}] Failed to evaluate check condition. Cause.: {}",
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
e.getMessage(),
e
);
falseConditions.add(Check
.builder()
.message("Failed to evaluate check condition. Cause: " + e.getMessage())
.behavior(Check.Behavior.BLOCK_EXECUTION)
.style(Check.Style.ERROR)
.build()
);
}
}
return falseConditions;
}
return List.of();
}
/**
* Validates the given flow source.
* <p>
@@ -508,6 +456,50 @@ public class FlowService {
return flowRepository.get().delete(flow);
}
/**
* Return true if the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean isAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* If not, throw an IllegalArgumentException.
*/
public void checkAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
if (!isAllowedNamespace(tenant, namespace, fromTenant, fromNamespace)) {
throw new IllegalArgumentException("Namespace " + namespace + " is not allowed.");
}
}
/**
* Return true if the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean areAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* If not, throw an IllegalArgumentException.
*/
public void checkAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
if (!areAllowedAllNamespaces(tenant, fromTenant, fromNamespace)) {
throw new IllegalArgumentException("All namespaces are not allowed, you should either filter on a namespace or configure all namespaces to allow your namespace.");
}
}
/**
* Return true if require existing namespace is enabled and the namespace didn't already exist.
* As namespace management is an EE feature, this will always return false in OSS.
*/
public boolean requireExistingNamespace(String tenant, String namespace) {
return false;
}
/**
* Gets the executable flow for the given namespace, id, and revision.
* Warning: this method bypasses ACL so someone with only execution right can create a flow execution

View File

@@ -20,6 +20,9 @@ public class KVStoreService {
@Inject
private StorageInterface storageInterface;
@Inject
private FlowService flowService;
@Inject
private NamespaceService namespaceService;
@@ -35,7 +38,7 @@ public class KVStoreService {
boolean isNotSameNamespace = fromNamespace != null && !namespace.equals(fromNamespace);
if (isNotSameNamespace && isNotParentNamespace(namespace, fromNamespace)) {
try {
namespaceService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
} catch (IllegalArgumentException e) {
throw new KVStoreException(String.format(
"Cannot access the KV store. Access to '%s' namespace is not allowed from '%s'.", namespace, fromNamespace)

View File

@@ -1,27 +1,38 @@
package io.kestra.core.utils;
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
/**
* Utility class for logging
*/
public final class Logs {
import java.time.ZonedDateTime;
import java.util.List;
@Singleton
public class LogService {
private static final String FLOW_PREFIX_WITH_TENANT = "[tenant: {}] [namespace: {}] [flow: {}] ";
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
private Logs() {}
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
private final LogRepositoryInterface logRepository;
@Inject
public LogService(LogRepositoryInterface logRepository) {
this.logRepository = logRepository;
}
public void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
String finalMsg = FLOW_PREFIX_WITH_TENANT + message;
Object[] executionArgs = new Object[] { flow.getTenantId(), flow.getNamespace(), flow.getId() };
Object[] finalArgs = ArrayUtils.addAll(executionArgs, args);
@@ -29,37 +40,37 @@ public final class Logs {
}
/**
* Log an {@link Execution} via the execution logger named: 'execution.{flowId}'.
* Log an execution via the execution logger named: 'execution.{flowId}'.
*/
public static void logExecution(Execution execution, Level level, String message, Object... args) {
public void logExecution(Execution execution, Level level, String message, Object... args) {
Logger logger = logger(execution);
logExecution(execution, logger, level, message, args);
}
public static void logExecution(Execution execution, Logger logger, Level level, String message, Object... args) {
public void logExecution(Execution execution, Logger logger, Level level, String message, Object... args) {
Object[] executionArgs = new Object[] { execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId() };
Object[] finalArgs = ArrayUtils.addAll(executionArgs, args);
logger.atLevel(level).log(EXECUTION_PREFIX_WITH_TENANT + message, finalArgs);
}
/**
* Log a {@link TriggerContext} via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
* Log a trigger via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
*/
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
public void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
Logger logger = logger(triggerContext);
logTrigger(triggerContext, logger, level, message, args);
}
public static void logTrigger(TriggerContext triggerContext, Logger logger, Level level, String message, Object... args) {
public void logTrigger(TriggerContext triggerContext, Logger logger, Level level, String message, Object... args) {
Object[] executionArgs = new Object[] { triggerContext.getTenantId(), triggerContext.getNamespace(), triggerContext.getFlowId(), triggerContext.getTriggerId() };
Object[] finalArgs = ArrayUtils.addAll(executionArgs, args);
logger.atLevel(level).log(TRIGGER_PREFIX_WITH_TENANT + message, finalArgs);
}
/**
* Log a {@link TaskRun} via the taskRun logger named: 'task.{flowId}.{taskId}'.
* Log a taskRun via the taskRun logger named: 'task.{flowId}.{taskId}'.
*/
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
public void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
String prefix = TASKRUN_PREFIX_WITH_TENANT;
String finalMsg = taskRun.getValue() == null ? prefix + message : prefix + "[value: {}] " + message;
Object[] executionArgs = new Object[] { taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getTaskId(), taskRun.getExecutionId(), taskRun.getId() };
@@ -71,19 +82,31 @@ public final class Logs {
logger.atLevel(level).log(finalMsg, finalArgs);
}
private static Logger logger(TaskRun taskRun) {
public int purge(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
return logRepository.deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate);
}
/**
* Fetch the error logs of an execution.
* Will limit the results to the first 25 error logs, ordered by timestamp asc.
*/
public List<LogEntry> errorLogs(String tenantId, String executionId) {
return logRepository.findByExecutionId(tenantId, executionId, Level.ERROR, Pageable.from(1, 25, Sort.of(Sort.Order.asc("timestamp"))));
}
private Logger logger(TaskRun taskRun) {
return LoggerFactory.getLogger(
"task." + taskRun.getFlowId() + "." + taskRun.getTaskId()
);
}
private static Logger logger(TriggerContext triggerContext) {
private Logger logger(TriggerContext triggerContext) {
return LoggerFactory.getLogger(
"trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
);
}
private static Logger logger(Execution execution) {
private Logger logger(Execution execution) {
return LoggerFactory.getLogger(
"execution." + execution.getFlowId()
);

View File

@@ -1,6 +1,5 @@
package io.kestra.core.services;
import io.kestra.core.exceptions.ResourceAccessDeniedException;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.NamespaceUtils;
import jakarta.inject.Inject;
@@ -40,52 +39,4 @@ public class NamespaceService {
}
return false;
}
/**
* Return true if require existing namespace is enabled and the namespace didn't already exist.
* As namespace management is an EE feature, this will always return false in OSS.
*/
public boolean requireExistingNamespace(String tenant, String namespace) {
return false;
}
/**
* Return true if the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean isAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* If not, throw a ResourceAccessDeniedException.
*
* @throws ResourceAccessDeniedException if the namespace is not allowed.
*/
public void checkAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
if (!isAllowedNamespace(tenant, namespace, fromTenant, fromNamespace)) {
throw new ResourceAccessDeniedException("Namespace " + namespace + " is not allowed.");
}
}
/**
* Return true if the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean areAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* If not, throw a ResourceAccessDeniedException.
*
* @throws ResourceAccessDeniedException if all namespaces all aren't allowed.
*/
public void checkAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
if (!areAllowedAllNamespaces(tenant, fromTenant, fromNamespace)) {
throw new ResourceAccessDeniedException("All namespaces are not allowed, you should either filter on a namespace or configure all namespaces to allow your namespace.");
}
}
}

View File

@@ -23,7 +23,6 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.Logs;
import io.kestra.core.utils.MapUtils;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.context.annotation.Value;
@@ -31,6 +30,7 @@ import io.micronaut.core.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
@@ -82,7 +82,10 @@ public class PluginDefaultService {
@Inject
protected PluginRegistry pluginRegistry;
@Inject
protected Provider<LogService> logService; // lazy-init
@Value("{kestra.templates.enabled:false}")
private boolean templatesEnabled;
@@ -252,7 +255,7 @@ public class PluginDefaultService {
if (source == null) {
// This should never happen
String error = "Cannot apply plugin defaults. Cause: flow has no defined source.";
Logs.logExecution(flow, log, Level.ERROR, error);
logService.get().logExecution(flow, log, Level.ERROR, error);
throw new IllegalArgumentException(error);
}
@@ -308,7 +311,7 @@ public class PluginDefaultService {
result = parseFlowWithAllDefaults(flow.getTenantId(), flow.getNamespace(), flow.getRevision(), flow.isDeleted(), source, true, false);
} catch (Exception e) {
if (safe) {
Logs.logExecution(flow, log, Level.ERROR, "Failed to read flow.", e);
logService.get().logExecution(flow, log, Level.ERROR, "Failed to read flow.", e);
result = FlowWithException.from(flow, e);
// deleted is not part of the original 'source'

View File

@@ -1,27 +1,18 @@
package io.kestra.core.storages;
import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.*;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwFunction;
/**
* The default {@link Namespace} implementation.
@@ -37,7 +28,6 @@ public class InternalNamespace implements Namespace {
private final String namespace;
private final String tenant;
private final StorageInterface storage;
private final NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
private final Logger logger;
/**
@@ -46,8 +36,8 @@ public class InternalNamespace implements Namespace {
* @param namespace The namespace
* @param storage The storage.
*/
public InternalNamespace(@Nullable final String tenant, final String namespace, final StorageInterface storage, final NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository) {
this(LOG, tenant, namespace, storage, namespaceFileMetadataRepository);
public InternalNamespace(@Nullable final String tenant, final String namespace, final StorageInterface storage) {
this(LOG, tenant, namespace, storage);
}
/**
@@ -58,27 +48,13 @@ public class InternalNamespace implements Namespace {
* @param tenant The tenant.
* @param storage The storage.
*/
public InternalNamespace(final Logger logger, @Nullable final String tenant, final String namespace, final StorageInterface storage, final NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepositoryInterface) {
public InternalNamespace(final Logger logger, @Nullable final String tenant, final String namespace, final StorageInterface storage) {
this.logger = Objects.requireNonNull(logger, "logger cannot be null");
this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null");
this.storage = Objects.requireNonNull(storage, "storage cannot be null");
this.namespaceFileMetadataRepository = Objects.requireNonNull(namespaceFileMetadataRepositoryInterface, "namespaceFileMetadataRepository cannot be null");
this.tenant = tenant;
}
@Override
public ArrayListTotal<NamespaceFile> find(Pageable pageable, List<QueryFilter> filters, boolean allowDeleted, FetchVersion fetchVersion) {
return namespaceFileMetadataRepository.find(
pageable,
tenant,
Stream.concat(filters.stream(), Stream.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build()
)).toList(),
allowDeleted,
fetchVersion
).map(throwFunction(NamespaceFile::fromMetadata));
}
/**
* {@inheritDoc}
**/
@@ -97,106 +73,35 @@ public class InternalNamespace implements Namespace {
**/
@Override
public List<NamespaceFile> all() throws IOException {
return all(null);
return all(false);
}
/**
* {@inheritDoc}
**/
@Override
public List<NamespaceFile> all(final String containing, boolean includeDirectories) throws IOException {
List<NamespaceFileMetadata> namespaceFilesMetadata = namespaceFileMetadataRepository.find(Pageable.UNPAGED, tenant, Stream.concat(
Stream.of(QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build()),
Optional.ofNullable(containing).flatMap(p -> {
if (p.equals("/")) {
return Optional.empty();
}
return Optional.of(QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value(p).build());
}).stream()
).toList(), false);
if (!includeDirectories) {
namespaceFilesMetadata = namespaceFilesMetadata.stream().filter(nsFileMetadata -> !nsFileMetadata.isDirectory()).toList();
}
return namespaceFilesMetadata.stream().filter(nsFileMetadata -> !nsFileMetadata.getPath().equals("/")).map(nsFileMetadata -> NamespaceFile.of(namespace, Path.of(nsFileMetadata.getPath()), nsFileMetadata.getVersion())).toList();
public List<NamespaceFile> all(final boolean includeDirectories) throws IOException {
return all(null, includeDirectories);
}
/**
* {@inheritDoc}
**/
@Override
public List<NamespaceFileMetadata> children(String parentPath, boolean recursive) throws IOException {
final String normalizedParentPath = NamespaceFile.normalize(Path.of(parentPath), true).toString();
return namespaceFileMetadataRepository.find(Pageable.UNPAGED, tenant, List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build(),
QueryFilter.builder()
.field(QueryFilter.Field.PARENT_PATH)
.operation(recursive ? QueryFilter.Op.STARTS_WITH : QueryFilter.Op.EQUALS)
.value(normalizedParentPath.endsWith("/") ? normalizedParentPath : normalizedParentPath + "/")
.build()
), false);
}
@Override
public List<Pair<NamespaceFile, NamespaceFile>> move(Path source, Path target) throws Exception {
final Path normalizedSource = NamespaceFile.normalize(source, true);
final Path normalizedTarget = NamespaceFile.normalize(target, true);
if (findByPath(normalizedTarget).isPresent()) {
throw new IOException(String.format(
"File '%s' already exists in namespace '%s'.",
normalizedTarget,
namespace
));
}
ArrayListTotal<NamespaceFileMetadata> beforeRename = namespaceFileMetadataRepository.find(Pageable.UNPAGED, tenant, List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build(),
QueryFilter.builder().field(QueryFilter.Field.PATH).operation(QueryFilter.Op.IN).value(List.of(normalizedSource.toString(), normalizedSource + "/")).build()
), true, FetchVersion.ALL);
beforeRename.sort(Comparator.comparing(NamespaceFileMetadata::getVersion));
ArrayListTotal<NamespaceFileMetadata> afterRename = beforeRename
.map(nsFileMetadata -> {
String newPath;
if (nsFileMetadata.isDirectory()) {
newPath = normalizedTarget.toString().endsWith("/") ? normalizedTarget.toString() : normalizedTarget + "/";
} else {
newPath = normalizedTarget.toString();
}
return nsFileMetadata.toBuilder().path(newPath).build();
});
return afterRename.map(throwFunction(nsFileMetadata -> {
NamespaceFile beforeNamespaceFile = NamespaceFile.of(namespace, normalizedSource, nsFileMetadata.getVersion());
Path namespaceFilePath = beforeNamespaceFile.storagePath();
NamespaceFile afterNamespaceFile;
if (nsFileMetadata.isDirectory()) {
afterNamespaceFile = this.createDirectory(Path.of(nsFileMetadata.getPath()));
} else {
try (InputStream oldContent = storage.get(tenant, namespace, namespaceFilePath.toUri())) {
afterNamespaceFile = this.putFile(Path.of(nsFileMetadata.getPath()), oldContent, Conflicts.OVERWRITE).getFirst();
}
}
this.purge(NamespaceFile.of(namespace, normalizedSource, nsFileMetadata.getVersion()));
return Pair.of(beforeNamespaceFile, afterNamespaceFile);
}));
public List<NamespaceFile> all(final String prefix, final boolean includeDirectories) throws IOException {
URI namespacePrefix = URI.create(NamespaceFile.of(namespace, Optional.ofNullable(prefix).map(Path::of).orElse(null)).storagePath().toString().replace("\\","/") + "/");
return storage.allByPrefix(tenant, namespace, namespacePrefix, includeDirectories)
.stream()
.map(uri -> new NamespaceFile(relativize(uri), uri, namespace))
.toList();
}
/**
* {@inheritDoc}
**/
@Override
public NamespaceFile get(Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
int version = findByPath(normalizedPath).map(NamespaceFileMetadata::getVersion).orElse(1);
return NamespaceFile.of(namespace, normalizedPath, version);
public NamespaceFile get(final Path path) {
return NamespaceFile.of(namespace, path);
}
public Path relativize(final URI uri) {
@@ -217,225 +122,90 @@ public class InternalNamespace implements Namespace {
* {@inheritDoc}
**/
@Override
public InputStream getFileContent(Path path, @Nullable Integer version) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
// Throw if file not found OR if it's deleted
NamespaceFileMetadata namespaceFileMetadata = findByPath(normalizedPath, version).orElseThrow(() -> fileNotFound(normalizedPath, version));
Path namespaceFilePath = NamespaceFile.of(namespace, normalizedPath, namespaceFileMetadata.getVersion()).storagePath();
public InputStream getFileContent(final Path path) throws IOException {
Path namespaceFilePath = NamespaceFile.of(namespace, path).storagePath();
return storage.get(tenant, namespace, namespaceFilePath.toUri());
}
@Override
public FileAttributes getFileMetadata(Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
return findByPath(normalizedPath).map(NamespaceFileAttributes::new).orElseThrow(() -> fileNotFound(normalizedPath, null));
}
private FileNotFoundException fileNotFound(Path path, @Nullable Integer version) {
return new FileNotFoundException(Optional.ofNullable(version).map(v -> "Version " + v + " of file").orElse("File") + " '" + path + "' was not found in namespace '" + namespace + "'.");
}
private Optional<NamespaceFileMetadata> findByPath(Path path, boolean allowDeleted, @Nullable Integer version) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
if (version != null) {
return namespaceFileMetadataRepository.find(Pageable.from(1, 1), tenant, List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build(),
QueryFilter.builder().field(QueryFilter.Field.PATH).operation(QueryFilter.Op.EQUALS).value(normalizedPath.toString()).build(),
QueryFilter.builder().field(QueryFilter.Field.VERSION).operation(QueryFilter.Op.EQUALS).value(version).build()
), allowDeleted, FetchVersion.ALL).stream().findFirst();
}
return namespaceFileMetadataRepository.findByPath(tenant, namespace, normalizedPath.toString())
.filter(namespaceFileMetadata -> allowDeleted || !namespaceFileMetadata.isDeleted());
}
private Optional<NamespaceFileMetadata> findByPath(Path path, boolean allowDeleted) throws IOException {
return findByPath(path, allowDeleted, null);
}
private Optional<NamespaceFileMetadata> findByPath(Path path, @Nullable Integer version) throws IOException {
return findByPath(path, false, version);
}
private Optional<NamespaceFileMetadata> findByPath(Path path) throws IOException {
return findByPath(path, null);
}
@Override
public boolean exists(Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
return findByPath(normalizedPath).isPresent();
}
/**
* {@inheritDoc}
**/
@Override
public List<NamespaceFile> putFile(final Path path, final InputStream content, final Conflicts onAlreadyExist) throws IOException, URISyntaxException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
Optional<NamespaceFileMetadata> inRepository = findByPath(normalizedPath, true);
int currentVersion = inRepository.map(NamespaceFileMetadata::getVersion).orElse(0);
NamespaceFile namespaceFile = NamespaceFile.of(namespace, normalizedPath, currentVersion + 1);
Path storagePath = namespaceFile.storagePath();
public NamespaceFile putFile(final Path path, final InputStream content, final Conflicts onAlreadyExist) throws IOException, URISyntaxException {
Path namespaceFilesPrefix = NamespaceFile.of(namespace, path).storagePath();
// Remove Windows letter
URI cleanUri = new URI(storagePath.toUri().toString().replaceFirst("^file:///[a-zA-Z]:", ""));
URI cleanUri = new URI(namespaceFilesPrefix.toUri().toString().replaceFirst("^file:///[a-zA-Z]:", ""));
final boolean exists = storage.exists(tenant, namespace, cleanUri);
List<NamespaceFile> createdFiles = new ArrayList<>();
if (inRepository.isEmpty()) {
storage.put(tenant, namespace, cleanUri, content);
createdFiles.addAll(mkDirs(normalizedPath.toString()));
namespaceFileMetadataRepository.save(
NamespaceFileMetadata.builder()
.tenantId(tenant)
.namespace(namespace)
.path(normalizedPath.toString())
.size(storage.getAttributes(tenant, namespace, cleanUri).getSize())
.build()
);
logger.debug(String.format(
"File '%s' added to namespace '%s'.",
normalizedPath,
namespace
));
createdFiles.add(namespaceFile);
} else if (onAlreadyExist == Conflicts.OVERWRITE || inRepository.get().isDeleted()) {
storage.put(tenant, namespace, cleanUri, content);
createdFiles.addAll(mkDirs(normalizedPath.toString()));
namespaceFileMetadataRepository.save(
inRepository.get().toBuilder().size(storage.getAttributes(tenant, namespace, cleanUri).getSize()).deleted(false).build()
);
if (inRepository.get().isDeleted()) {
logger.debug(String.format(
"File '%s' added to namespace '%s'.",
normalizedPath,
namespace
));
} else {
logger.debug(String.format(
"File '%s' overwritten into namespace '%s'.",
normalizedPath,
namespace
));
return switch (onAlreadyExist) {
case OVERWRITE -> {
URI uri = storage.put(tenant, namespace, cleanUri, content);
NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace);
if (exists) {
logger.debug(String.format(
"File '%s' overwritten into namespace '%s'.",
path,
namespace
));
} else {
logger.debug(String.format(
"File '%s' added to namespace '%s'.",
path,
namespace
));
}
yield namespaceFile;
}
createdFiles.add(namespaceFile);
} else {
// At this point, the file exists and we have to decide what to do based on the conflict strategy
switch (onAlreadyExist) {
case ERROR -> throw new IOException(String.format(
"File '%s' already exists in namespace '%s' and conflict is set to %s",
normalizedPath,
namespace,
Conflicts.ERROR
));
case SKIP -> logger.debug(String.format(
"File '%s' already exists in namespace '%s' and conflict is set to %s. Skipping.",
normalizedPath,
namespace,
Conflicts.SKIP
));
case ERROR -> {
if (!exists) {
URI uri = storage.put(tenant, namespace, namespaceFilesPrefix.toUri(), content);
yield new NamespaceFile(relativize(uri), uri, namespace);
} else {
throw new IOException(String.format(
"File '%s' already exists in namespace '%s' and conflict is set to %s",
path,
namespace,
Conflicts.ERROR
));
}
}
}
return createdFiles;
}
/**
* Make all parent directories for a given path.
*/
private List<NamespaceFile> mkDirs(String path) throws IOException {
List<NamespaceFile> createdDirs = new ArrayList<>();
Optional<Path> maybeParentPath = Optional.empty();
while (
(maybeParentPath = Optional.ofNullable(NamespaceFileMetadata.parentPath(maybeParentPath.map(Path::toString).orElse(path))).map(Path::of)).isPresent()
&& !this.exists(maybeParentPath.get())
) {
this.createDirectory(maybeParentPath.get());
createdDirs.add(NamespaceFile.of(namespace, maybeParentPath.get().toString().endsWith("/") ? maybeParentPath.get().toString() : maybeParentPath.get() + "/", 1));
}
return createdDirs;
case SKIP -> {
if (!exists) {
URI uri = storage.put(tenant, namespace, namespaceFilesPrefix.toUri(), content);
NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace);
logger.debug(String.format(
"File '%s' added to namespace '%s'.",
path,
namespace
));
yield namespaceFile;
} else {
logger.debug(String.format(
"File '%s' already exists in namespace '%s' and conflict is set to %s. Skipping.",
path,
namespace,
Conflicts.SKIP
));
URI uri = URI.create(StorageContext.KESTRA_PROTOCOL + namespaceFilesPrefix);
yield new NamespaceFile(relativize(uri), uri, namespace);
}
}
};
}
/**
* {@inheritDoc}
**/
@Override
public NamespaceFile createDirectory(Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
NamespaceFileMetadata nsFileMetadata = namespaceFileMetadataRepository.save(
NamespaceFileMetadata.builder()
.tenantId(tenant)
.namespace(namespace)
.path(normalizedPath.toString().endsWith("/") ? normalizedPath.toString() : normalizedPath + "/")
.size(0L)
.build()
);
storage.createDirectory(tenant, namespace, NamespaceFile.of(namespace, normalizedPath, 1).storagePath().toUri());
return NamespaceFile.fromMetadata(nsFileMetadata);
public URI createDirectory(Path path) throws IOException {
return storage.createDirectory(tenant, namespace, NamespaceFile.of(namespace, path).storagePath().toUri());
}
/**
* {@inheritDoc}
**/
@Override
public List<NamespaceFile> delete(Path path) throws IOException {
final Path normalizedPath = NamespaceFile.normalize(path, true);
Optional<NamespaceFileMetadata> maybeNamespaceFileMetadata = namespaceFileMetadataRepository.find(Pageable.from(1, 1), tenant, List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value(namespace).build(),
QueryFilter.builder().field(QueryFilter.Field.PATH).operation(QueryFilter.Op.IN).value(List.of(normalizedPath.toString(), normalizedPath + "/")).build()
), false).stream().findFirst();
List<NamespaceFileMetadata> toDelete = Stream.concat(
this.children(normalizedPath.toString(), true).stream().map(NamespaceFileMetadata::toDeleted),
maybeNamespaceFileMetadata.map(NamespaceFileMetadata::toDeleted).stream()
).toList();
toDelete.forEach(namespaceFileMetadataRepository::save);
return toDelete.stream().map(NamespaceFile::fromMetadata).toList();
}
@Override
public boolean purge(NamespaceFile namespaceFile) throws IOException {
storage.delete(tenant, namespace, namespaceFile.storagePath().toUri());
namespaceFileMetadataRepository.purge(List.of(NamespaceFileMetadata.of(tenant, namespaceFile)));
return true;
}
/**
* {@inheritDoc}
*/
@Override
public Integer purge(List<NamespaceFile> namespaceFiles) throws IOException {
Integer purgedMetadataCount = this.namespaceFileMetadataRepository.purge(namespaceFiles.stream().map(namespaceFile -> NamespaceFileMetadata.of(tenant, namespaceFile)).toList());
long actualDeletedEntries = namespaceFiles.stream()
.map(NamespaceFile::storagePath)
.map(Path::toUri)
.map(throwFunction(uri -> this.storage.delete(tenant, namespace, uri)))
.filter(Boolean::booleanValue)
.count();
if (actualDeletedEntries != purgedMetadataCount) {
LOG.warn("Namespace Files Metadata purge reported {} deleted entries, but {} values were actually deleted from storage", purgedMetadataCount, actualDeletedEntries);
}
return purgedMetadataCount;
public boolean delete(Path path) throws IOException {
return storage.delete(tenant, namespace, URI.create(path.toString().replace("\\","/")));
}
}

View File

@@ -1,12 +1,15 @@
package io.kestra.core.storages;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -30,8 +33,7 @@ public class InternalStorage implements Storage {
private final Logger logger;
private final StorageContext context;
private final StorageInterface storage;
private final NamespaceFactory namespaceFactory;
private final NamespaceService namespaceService;
private final FlowService flowService;
/**
* Creates a new {@link InternalStorage} instance.
@@ -39,8 +41,8 @@ public class InternalStorage implements Storage {
* @param context The storage context.
* @param storage The storage to delegate operations.
*/
public InternalStorage(StorageContext context, StorageInterface storage, NamespaceFactory namespaceFactory) {
this(LOG, context, storage, null, namespaceFactory);
public InternalStorage(StorageContext context, StorageInterface storage) {
this(LOG, context, storage, null);
}
/**
@@ -50,12 +52,11 @@ public class InternalStorage implements Storage {
* @param context The storage context.
* @param storage The storage to delegate operations.
*/
public InternalStorage(Logger logger, StorageContext context, StorageInterface storage, NamespaceService namespaceService, NamespaceFactory namespaceFactory) {
public InternalStorage(Logger logger, StorageContext context, StorageInterface storage, FlowService flowService) {
this.logger = logger;
this.context = context;
this.storage = storage;
this.namespaceService = namespaceService;
this.namespaceFactory = namespaceFactory;
this.flowService = flowService;
}
/**
@@ -63,7 +64,7 @@ public class InternalStorage implements Storage {
**/
@Override
public Namespace namespace() {
return namespaceFactory.of(logger, context.getTenantId(), context.getNamespace(), storage);
return new InternalNamespace(logger, context.getTenantId(), context.getNamespace(), storage);
}
/**
@@ -73,13 +74,13 @@ public class InternalStorage implements Storage {
public Namespace namespace(String namespace) {
boolean isExternalNamespace = !namespace.equals(context.getNamespace());
// Checks whether the contextual namespace is allowed to access the passed namespace.
if (isExternalNamespace && namespaceService != null) {
namespaceService.checkAllowedNamespace(
if (isExternalNamespace && flowService != null) {
flowService.checkAllowedNamespace(
context.getTenantId(), namespace, // requested Tenant/Namespace
context.getTenantId(), context.getNamespace() // from Tenant/Namespace
);
}
return namespaceFactory.of(logger, context.getTenantId(), namespace, storage);
return new InternalNamespace(logger, context.getTenantId(), namespace, storage);
}
/**

View File

@@ -1,22 +1,12 @@
package io.kestra.core.storages;
import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.utils.PathMatcherPredicate;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
@@ -26,8 +16,6 @@ import java.util.function.Predicate;
public interface Namespace {
String NAMESPACE_FILE_SCHEME = "nsfile";
ArrayListTotal<NamespaceFile> find(Pageable pageable, List<QueryFilter> filters, boolean allowDeleted, FetchVersion fetchVersion);
/**
* Gets the current namespace.
*
@@ -49,25 +37,19 @@ public interface Namespace {
*/
List<NamespaceFile> all() throws IOException;
default List<NamespaceFile> all(String containing) throws IOException {
return this.all(containing, false);
}
/**
* Gets the URIs of all namespace files for the current namespace that contains the optional <code>containing</code> parameter.
* Gets the URIs of all namespace files for the contextual namespace.
*
* @return The list of {@link URI}.
*/
List<NamespaceFile> all(String containing, boolean includeDirectories) throws IOException;
List<NamespaceFile> all(boolean includeDirectories) throws IOException;
/**
* Gets the URIs of all namespace files for the current namespace under the <code>parentPath</code>.
* Gets the URIs of all namespace files for the current namespace.
*
* @return The list of {@link URI}.
*/
List<NamespaceFileMetadata> children(String parentPath, boolean recursive) throws IOException;
List<Pair<NamespaceFile, NamespaceFile>> move(Path source, Path target) throws Exception;
List<NamespaceFile> all(String prefix, boolean includeDirectories) throws IOException;
/**
* Gets a {@link NamespaceFile} for the given path and the current namespace.
@@ -75,7 +57,7 @@ public interface Namespace {
* @param path the file path.
* @return a new {@link NamespaceFile}
*/
NamespaceFile get(Path path) throws IOException;
NamespaceFile get(Path path);
/**
* Retrieves the URIs of all namespace files for the current namespace matching the given predicate.
@@ -100,45 +82,27 @@ public interface Namespace {
return findAllFilesMatching(predicate);
}
/**
* Retrieves the content of the namespace file at the given path for the latest version.
*/
default InputStream getFileContent(Path path) throws IOException {
return getFileContent(path, null);
}
/**
* Retrieves the content of the namespace file at the given path.
*
* @param path the file path.
* @param version optionally a file version, otherwise will retrieve the latest.
* @return the {@link InputStream}.
* @throws IllegalArgumentException if the given {@link Path} is {@code null} or invalid.
* @throws IOException if an error happens while accessing the file.
*/
InputStream getFileContent(Path path, @Nullable Integer version) throws IOException;
InputStream getFileContent(Path path) throws IOException;
/**
* Retrieves the metadata of the namespace file at the given path.
*
* @param path the file path.
* @return the {@link FileAttributes}.
*/
FileAttributes getFileMetadata(Path path) throws IOException;
boolean exists(Path path) throws IOException;
default List<NamespaceFile> putFile(Path path, InputStream content) throws IOException, URISyntaxException {
default NamespaceFile putFile(Path path, InputStream content) throws IOException, URISyntaxException {
return putFile(path, content, Conflicts.OVERWRITE);
}
List<NamespaceFile> putFile(Path path, InputStream content, Conflicts onAlreadyExist) throws IOException, URISyntaxException;
NamespaceFile putFile(Path path, InputStream content, Conflicts onAlreadyExist) throws IOException, URISyntaxException;
default List<NamespaceFile> putFile(NamespaceFile file, InputStream content) throws IOException, URISyntaxException {
default NamespaceFile putFile(NamespaceFile file, InputStream content) throws IOException, URISyntaxException {
return putFile(file, content, Conflicts.OVERWRITE);
}
default List<NamespaceFile> putFile(NamespaceFile file, InputStream content, Conflicts onAlreadyExist) throws IOException, URISyntaxException {
default NamespaceFile putFile(NamespaceFile file, InputStream content, Conflicts onAlreadyExist) throws IOException, URISyntaxException {
return putFile(Path.of(file.path()), content, onAlreadyExist);
}
@@ -146,47 +110,39 @@ public interface Namespace {
* Creates a new directory for the current namespace.
*
* @param path The {@link Path} of the directory.
* @return The created namespace file.
* @return The URI of the directory in the Kestra's internal storage.
* @throws IOException if an error happens while accessing the file.
*/
NamespaceFile createDirectory(Path path) throws IOException;
URI createDirectory(Path path) throws IOException;
/**
* Deletes any namespaces file at the given path.
* Deletes any namespaces files at the given path.
*
* @param file the {@link NamespaceFile} to be deleted.
* @throws IOException if an error happens while performing the delete operation.
*/
default List<NamespaceFile> delete(NamespaceFile file) throws IOException {
default boolean delete(NamespaceFile file) throws IOException {
return delete(Path.of(file.path()));
}
/**
* Soft-deletes any namespaces files at the given path.
* Deletes namespaces directories at the given path.
*
* @param file the {@link NamespaceFile} to be deleted.
* @throws IOException if an error happens while performing the delete operation.
*/
default boolean deleteDirectory(NamespaceFile file) throws IOException {
return delete(Path.of(file.path()));
}
/**
* Deletes any namespaces files at the given path.
*
* @param path the path to be deleted.
* @return the list of namespace files that got deleted. There can be multiple files if a directory is deleted as its whole content will be.
* @return {@code true} if the file was deleted by this method; {@code false} if the file could not be deleted because it did not exist
* @throws IOException if an error happens while performing the delete operation.
*/
List<NamespaceFile> delete(Path path) throws IOException;
/**
* Hard-deletes any namespaces files.
*
* @param namespaceFile the namespace file to be purged.
* @return {@code true} if the file was purged by this method; {@code false} if the file could not be deleted because it did not exist
* @throws IOException if an error happens while performing the delete operation.
*/
boolean purge(NamespaceFile namespaceFile) throws IOException;
/**
* Hard-deletes all provided namespaces files.
*
* @param namespaceFiles the namespace files to be purged.
* @return the amount of files that were purged.
* @throws IOException if an error happens while performing the delete operation.
*/
Integer purge(List<NamespaceFile> namespaceFiles) throws IOException;
boolean delete(Path path) throws IOException;
/**
* Checks if a directory is empty.

View File

@@ -1,20 +0,0 @@
package io.kestra.core.storages;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
@Singleton
public class NamespaceFactory {
@Inject
private NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepositoryInterface;
public Namespace of(String tenantId, String namespace, StorageInterface storageInterface) {
return new InternalNamespace(tenantId, namespace, storageInterface, namespaceFileMetadataRepositoryInterface);
}
public Namespace of(Logger logger, String tenantId, String namespace, StorageInterface storageInterface) {
return new InternalNamespace(logger, tenantId, namespace, storageInterface, namespaceFileMetadataRepositoryInterface);
}
}

View File

@@ -1,14 +1,11 @@
package io.kestra.core.storages;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import io.kestra.core.utils.WindowsUtils;
import jakarta.annotation.Nullable;
import java.net.URI;
import java.nio.file.Path;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Represents a NamespaceFile object.
@@ -16,22 +13,15 @@ import java.util.regex.Pattern;
* @param path The path of file relative to the namespace.
* @param uri The URI of the namespace file in the Kestra's internal storage.
* @param namespace The namespace of the file.
* @param version The version of the file.
*/
public record NamespaceFile(
String path,
URI uri,
String namespace,
int version
String namespace
) {
private static final Pattern capturePathWithoutVersion = Pattern.compile("(.*)(?:\\.v\\d+)?$");
public NamespaceFile(Path path, URI uri, String namespace) {
this(path.toString(), uri, namespace, 1);
}
public NamespaceFile(String path, URI uri, String namespace) {
this(path, uri, namespace, 1);
this(path.toString(), uri, namespace);
}
/**
@@ -43,19 +33,7 @@ public record NamespaceFile(
* @return a new {@link NamespaceFile} object
*/
public static NamespaceFile of(final String namespace) {
return of(namespace, (Path) null, 1);
}
public static NamespaceFile of(final String namespace, final URI uri) {
return of(namespace, uri, 1);
}
public static NamespaceFile fromMetadata(final NamespaceFileMetadata metadata) {
return of(
metadata.getNamespace(),
Path.of(metadata.getPath()),
metadata.getVersion()
);
return of(namespace, (Path) null);
}
/**
@@ -65,9 +43,9 @@ public record NamespaceFile(
* @param namespace The namespace - cannot be {@code null}.
* @return a new {@link NamespaceFile} object
*/
public static NamespaceFile of(final String namespace, @Nullable final URI uri, int version) {
public static NamespaceFile of(final String namespace, @Nullable final URI uri) {
if (uri == null || uri.equals(URI.create("/"))) {
return of(namespace, (Path) null, version);
return of(namespace, (Path) null);
}
Path path = Path.of(WindowsUtils.windowsToUnixPath(uri.getPath()));
@@ -83,9 +61,9 @@ public record NamespaceFile(
"Invalid Kestra URI. Expected prefix for namespace '%s', but was %s.", namespace, uri)
);
}
namespaceFile = of(namespace, Path.of(StorageContext.namespaceFilePrefix(namespace)).relativize(path), version);
namespaceFile = of(namespace, Path.of(StorageContext.namespaceFilePrefix(namespace)).relativize(path));
} else {
namespaceFile = of(namespace, path, version);
namespaceFile = of(namespace, path);
}
boolean trailingSlash = uri.toString().endsWith("/");
@@ -97,15 +75,10 @@ public record NamespaceFile(
return new NamespaceFile(
namespaceFile.path,
URI.create(namespaceFile.uri.toString() + "/"),
namespaceFile.namespace,
version
namespaceFile.namespace
);
}
public static NamespaceFile of(final String namespace, final Path path) {
return of(namespace, path, 1);
}
/**
* Static factory method for constructing a new {@link NamespaceFile} object.
*
@@ -113,61 +86,31 @@ public record NamespaceFile(
* @param namespace The namespace - cannot be {@code null}.
* @return a new {@link NamespaceFile} object
*/
public static NamespaceFile of(final String namespace, @Nullable final Path path, int version) {
public static NamespaceFile of(final String namespace, @Nullable final Path path) {
Objects.requireNonNull(namespace, "namespace cannot be null");
if (path == null || path.equals(Path.of("/"))) {
return new NamespaceFile(
"",
URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + "/"),
namespace,
// Directory always has a single version
1
namespace
);
}
return of(namespace, path.toString(), version);
}
public static NamespaceFile of(String namespace, String path, int version) {
Path namespacePrefixPath = Path.of(StorageContext.namespaceFilePrefix(namespace));
// Need to remove starting trailing slash for Windows
String pathWithoutLeadingSlash = path.replaceFirst("^[.]*[\\\\|/]+", "");
version = NamespaceFile.isDirectory(pathWithoutLeadingSlash) ? 1 : version;
String storagePath = pathWithoutLeadingSlash;
if (!pathWithoutLeadingSlash.endsWith("/") && version > 1) {
storagePath += ".v" + version;
Path filePath = path.normalize();
if (filePath.isAbsolute()) {
filePath = filePath.getRoot().relativize(filePath);
}
// Need to remove starting trailing slash for Windows
String pathWithoutTrailingSlash = path.toString().replaceFirst("^[.]*[\\\\|/]+", "");
return new NamespaceFile(
pathWithoutLeadingSlash,
URI.create(StorageContext.KESTRA_PROTOCOL + namespacePrefixPath.resolve(storagePath).toString().replace("\\", "/")),
namespace,
version
pathWithoutTrailingSlash,
URI.create(StorageContext.KESTRA_PROTOCOL + namespacePrefixPath.resolve(pathWithoutTrailingSlash).toString().replace("\\","/")),
namespace
);
}
public static Path normalize(String pathStr, boolean withLeadingSlash) {
return normalize(Path.of(pathStr), withLeadingSlash);
}
public static Path normalize(Path path, boolean withLeadingSlash) {
if (path == null) {
return Path.of("/");
}
if (withLeadingSlash && !path.toString().startsWith("/")) {
return Path.of("/" + path);
}
if (!withLeadingSlash && path.toString().startsWith("/")) {
return Path.of(path.toString().substring(1));
}
return path;
}
/**
* Returns the path of file relative to the namespace.
*
@@ -175,13 +118,17 @@ public record NamespaceFile(
* @return The path.
*/
public Path path(boolean withLeadingSlash) {
String strPath = path;
Matcher matcher = capturePathWithoutVersion.matcher(strPath);
if (matcher.matches()) {
strPath = matcher.group(1);
final String strPath = path.toString();
if (!withLeadingSlash) {
if (strPath.startsWith("/")) {
return Path.of(strPath.substring(1));
}
} else {
if (!strPath.startsWith("/")) {
return Path.of("/").resolve(path);
}
}
return normalize(Path.of(strPath), withLeadingSlash);
return Path.of(path);
}
/**
@@ -200,12 +147,8 @@ public record NamespaceFile(
*
* @return {@code true} if this namespace file is a directory.
*/
public static boolean isDirectory(String path) {
return path.endsWith("/");
}
public boolean isDirectory() {
return isDirectory(uri.toString());
return uri.toString().endsWith("/");
}
/**

View File

@@ -1,54 +0,0 @@
package io.kestra.core.storages;
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
public class NamespaceFileAttributes implements FileAttributes {
private final NamespaceFileMetadata namespaceFileMetadata;
public NamespaceFileAttributes(NamespaceFileMetadata namespaceFileMetadata) {
this.namespaceFileMetadata = namespaceFileMetadata;
}
@Override
public String getFileName() {
String name = new File(namespaceFileMetadata.getPath()).getName();
if (name.isEmpty()) {
return "_files";
}
return name;
}
@Override
public long getLastModifiedTime() {
return Optional.ofNullable(namespaceFileMetadata.getUpdated()).map(Instant::toEpochMilli).orElse(0L);
}
@Override
public long getCreationTime() {
return Optional.ofNullable(namespaceFileMetadata.getCreated()).map(Instant::toEpochMilli).orElse(0L);
}
@Override
public FileType getType() {
return namespaceFileMetadata.getPath().endsWith("/") ? FileType.Directory : FileType.File;
}
@Override
public long getSize() {
return namespaceFileMetadata.getSize();
}
@Override
public Map<String, String> getMetadata() throws IOException {
return Collections.emptyMap();
}
}

View File

@@ -1,3 +0,0 @@
package io.kestra.core.storages;
public record NamespaceFileRevision(Integer revision) {}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.storages;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
@@ -61,9 +62,9 @@ public class StorageContext {
taskRun.getValue()
);
}
/**
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link FlowId}.
* Factory method for constructing a new {@link StorageContext} scoped to a given {@link Flow}.
*/
public static StorageContext forFlow(FlowId flow) {
return new StorageContext(flow.getTenantId(), flow.getNamespace(), flow.getId());
@@ -226,7 +227,7 @@ public class StorageContext {
}
/**
* Gets the base storage URI for the current {@link FlowId}.
* Gets the base storage URI for the current {@link io.kestra.core.models.flows.Flow}.
*
* @return the {@link URI}.
*/

View File

@@ -10,14 +10,10 @@ public class Await {
private static final Duration defaultSleep = Duration.ofMillis(100);
public static void until(BooleanSupplier condition) {
Await.untilWithSleepInterval(condition, null);
Await.until(condition, null);
}
public static void untilWithTimeout(BooleanSupplier condition, Duration timeout) throws TimeoutException {
until(condition, null, timeout);
}
public static void untilWithSleepInterval(BooleanSupplier condition, Duration sleep) {
public static void until(BooleanSupplier condition, Duration sleep) {
if (sleep == null) {
sleep = defaultSleep;
}
@@ -78,10 +74,10 @@ public class Await {
return result.get();
}
public static <T> T untilWithSleepInterval(Supplier<T> supplier, Duration sleep) {
public static <T> T until(Supplier<T> supplier, Duration sleep) {
AtomicReference<T> result = new AtomicReference<>();
Await.untilWithSleepInterval(untilSupplier(supplier, result), sleep);
Await.until(untilSupplier(supplier, result), sleep);
return result.get();
}

View File

@@ -1,9 +1,9 @@
package io.kestra.core.utils;
import io.kestra.core.models.flows.FlowInterface;
import io.micronaut.context.annotation.Value;
import org.apache.commons.lang3.StringUtils;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import java.net.URI;
import io.micronaut.core.annotation.Nullable;
@@ -44,7 +44,7 @@ public class UriProvider {
execution.getFlowId());
}
public URI flowUrl(FlowInterface flow) {
public URI flowUrl(Flow flow) {
return this.build("/ui/" +
(flow.getTenantId() != null ? flow.getTenantId() + "/" : "") +
"flows/" +

View File

@@ -1,16 +0,0 @@
package io.kestra.core.validations;
import io.kestra.core.validations.validator.FilesVersionBehaviorValidator;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import jakarta.validation.Constraint;
import jakarta.validation.Payload;
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = FilesVersionBehaviorValidator.class)
public @interface FilesVersionBehaviorValidation {
String message() default "invalid `version` behavior configuration";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
}

View File

@@ -1,35 +0,0 @@
package io.kestra.core.validations.validator;
import io.kestra.core.validations.FilesVersionBehaviorValidation;
import io.kestra.core.validations.KvVersionBehaviorValidation;
import io.kestra.plugin.core.namespace.Version;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.validation.validator.constraints.ConstraintValidator;
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
import jakarta.inject.Singleton;
@Singleton
@Introspected
public class FilesVersionBehaviorValidator implements ConstraintValidator<FilesVersionBehaviorValidation, Version> {
@Override
public boolean isValid(
@Nullable Version value,
@NonNull AnnotationValue<FilesVersionBehaviorValidation> annotationMetadata,
@NonNull ConstraintValidatorContext context) {
if (value == null) {
return true;
}
if (value.getBefore() != null && value.getKeepAmount() != null) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("Cannot set both 'before' and 'keepAmount' properties")
.addConstraintViolation();
return false;
}
return true;
}
}

View File

@@ -6,7 +6,6 @@ import io.kestra.core.models.flows.Input;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.AnnotationValue;
@@ -53,9 +52,6 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
@Inject
private FlowService flowService;
@Inject
private NamespaceService namespaceService;
@Override
public boolean isValid(
@Nullable Flow value,
@@ -71,7 +67,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Flow id is a reserved keyword: " + value.getId() + ". List of reserved keywords: " + String.join(", ", RESERVED_FLOW_IDS));
}
if (namespaceService.requireExistingNamespace(value.getTenantId(), value.getNamespace())) {
if (flowService.requireExistingNamespace(value.getTenantId(), value.getNamespace())) {
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
}

View File

@@ -79,30 +79,20 @@ public class TimeBetween extends Condition implements ScheduleCondition {
RunContext runContext = conditionContext.getRunContext();
Map<String, Object> variables = conditionContext.getVariables();
// cache must be skipped for date rendering as the value can change for each test
String dateRendered = runContext.render(date).skipCache().as(String.class, variables).orElseThrow();
String dateRendered = runContext.render(date).as(String.class, variables).orElseThrow();
OffsetTime currentDate = DateUtils.parseZonedDateTime(dateRendered).toOffsetDateTime().toOffsetTime();
OffsetTime beforeRendered = runContext.render(before).as(OffsetTime.class, variables).orElse(null);
OffsetTime afterRendered = runContext.render(after).as(OffsetTime.class, variables).orElse(null);
if (beforeRendered != null && afterRendered != null) {
// Case 1: Normal range (e.g., 16:00 -> 20:00)
if (afterRendered.isBefore(beforeRendered)) {
return currentDate.isAfter(afterRendered) && currentDate.isBefore(beforeRendered);
// Case 2: Cross-midnight range (e.g., 22:00 -> 02:00)
} else {
return currentDate.isAfter(afterRendered) || currentDate.isBefore(beforeRendered);
}
return currentDate.isAfter(afterRendered) && currentDate.isBefore(beforeRendered);
} else if (beforeRendered != null) {
return currentDate.isBefore(beforeRendered);
} else if (afterRendered != null) {
return currentDate.isAfter(afterRendered);
} else {
throw new IllegalConditionEvaluation("Invalid condition: no 'before' or 'after' value defined");
throw new IllegalConditionEvaluation("Invalid condition with no before nor after");
}
}
}

Some files were not shown because too many files have changed in this diff Show More