feat(executions): add support to ACL check inside the run context

Part-of: https://github.com/kestra-io/kestra-ee/issues/4228
This commit is contained in:
Loïc Mathieu
2025-11-28 12:16:01 +01:00
parent f2c15185fb
commit 496e01eb3e
25 changed files with 333 additions and 151 deletions

View File

@@ -0,0 +1,15 @@
package io.kestra.core.exceptions;
import java.io.Serial;
public class ResourceAccessDeniedException extends KestraRuntimeException {
@Serial
private static final long serialVersionUID = 1L;
public ResourceAccessDeniedException() {
}
public ResourceAccessDeniedException(String message) {
super(message);
}
}

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import jakarta.validation.constraints.NotNull;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -215,8 +214,7 @@ abstract public class PluginUtilsService {
realNamespace = runContext.render(namespace);
realFlowId = runContext.render(flowId);
// validate that the flow exists: a.k.a access is authorized by this namespace
FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(flowInfo.tenantId(), realNamespace, flowInfo.tenantId(), flowInfo.namespace());
runContext.acl().allowNamespace(realNamespace).check();
} else if (namespace != null || flowId != null) {
throw new IllegalArgumentException("Both `namespace` and `flowId` must be set when `executionId` is set.");
} else {

View File

@@ -0,0 +1,46 @@
package io.kestra.core.runners;
import java.util.List;
/**
* Check if the current taskrun has access to the requested resources.
*
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*
* @see AllowedResources
*/
public interface AclChecker {
/**Tasks that need to access resources outside their namespace should use this interface to check ACL (Allowed namespaces in EE).
* Allow all namespaces.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
AllowedResources allowAllNamespaces();
/**
* Allow only the given namespace.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
AllowedResources allowNamespace(String namespace);
/**
* Allow only the given namespaces.
* <p>
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
*/
AllowedResources allowNamespaces(List<String> namespaces);
/**
* Represents a set of allowed resources.
* Tasks that need to access resources outside their namespace should call the <code>check()</code> method to check the ACL (Allowed namespaces in EE).
*/
interface AllowedResources {
/**
* Check if the current taskrun has access to the requested resources.
*/
void check();
}
}

View File

@@ -0,0 +1,86 @@
package io.kestra.core.runners;
import io.kestra.core.services.NamespaceService;
import io.micronaut.context.ApplicationContext;
import java.util.List;
import java.util.Objects;
class AclCheckerImpl implements AclChecker {
private final NamespaceService namespaceService;
private final RunContext.FlowInfo flowInfo;
AclCheckerImpl(ApplicationContext applicationContext, RunContext.FlowInfo flowInfo) {
this.namespaceService = applicationContext.getBean(NamespaceService.class);
this.flowInfo = flowInfo;
}
@Override
public AllowedResources allowAllNamespaces() {
return new AllowAllNamespaces(flowInfo, namespaceService);
}
@Override
public AllowedResources allowNamespace(String namespace) {
return new AllowNamespace(flowInfo, namespaceService, namespace);
}
@Override
public AllowedResources allowNamespaces(List<String> namespaces) {
return new AllowNamespaces(flowInfo, namespaceService, namespaces);
}
static class AllowAllNamespaces implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
AllowAllNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
}
@Override
public void check() {
this.namespaceService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace());
}
}
static class AllowNamespace implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
private final String namespace;
public AllowNamespace(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, String namespace) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
this.namespace = Objects.requireNonNull(namespace);
}
@Override
public void check() {
namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace());
}
}
static class AllowNamespaces implements AllowedResources {
private final RunContext.FlowInfo flowInfo;
private final NamespaceService namespaceService;
private final List<String> namespaces;
AllowNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, List<String> namespaces) {
this.flowInfo = Objects.requireNonNull(flowInfo);
this.namespaceService = Objects.requireNonNull(namespaceService);
this.namespaces = Objects.requireNonNull(namespaces);
if (namespaces.isEmpty()) {
throw new IllegalArgumentException("At least one namespace must be provided");
}
}
@Override
public void check() {
namespaces.forEach(namespace -> namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
}
}
}

View File

@@ -579,6 +579,11 @@ public class DefaultRunContext extends RunContext {
return isInitialized.get();
}
@Override
public AclChecker acl() {
return new AclCheckerImpl(this.applicationContext, flowInfo());
}
@Override
public LocalPath localPath() {
return localPath;

View File

@@ -197,4 +197,11 @@ public abstract class RunContext implements PropertyContext {
*/
@Deprecated(since = "1.2.0", forRemoval = true)
public abstract boolean isInitialized();
/**
* Get access to the ACL checker.
* Plugins are responsible for using the ACL checker when they access restricted resources, for example,
* when Namespace ACLs are used (EE).
*/
public abstract AclChecker acl();
}

View File

@@ -12,8 +12,8 @@ import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
@@ -48,7 +48,7 @@ public class RunContextFactory {
protected StorageInterface storageInterface;
@Inject
protected FlowService flowService;
protected NamespaceService namespaceService;
@Inject
protected MetricRegistry metricRegistry;
@@ -103,7 +103,7 @@ public class RunContextFactory {
.withLogger(runContextLogger)
// Execution
.withPluginConfiguration(Map.of())
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, namespaceService))
.withVariableRenderer(variableRenderer)
.withVariables(runVariableModifier.apply(
newRunVariablesBuilder()
@@ -133,7 +133,7 @@ public class RunContextFactory {
.withLogger(runContextLogger)
// Task
.withPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService))
.withVariables(newRunVariablesBuilder()
.withFlow(flow)
.withTask(task)
@@ -173,7 +173,7 @@ public class RunContextFactory {
RunContextLogger runContextLogger = new RunContextLogger();
return newBuilder()
.withLogger(runContextLogger)
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, flowService))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, namespaceService))
.withVariables(variables)
.withSecretInputs(secretInputsFromFlow(flow))
.build();
@@ -212,7 +212,7 @@ public class RunContextFactory {
}
},
storageInterface,
flowService
namespaceService
))
.withVariables(variables)
.withTask(task)

View File

@@ -8,7 +8,7 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
@@ -44,7 +44,7 @@ public class RunContextInitializer {
protected StorageInterface storageInterface;
@Inject
protected FlowService flowService;
protected NamespaceService namespaceService;
@Value("${kestra.encryption.secret-key}")
protected Optional<String> secretKey;
@@ -135,7 +135,7 @@ public class RunContextInitializer {
runContext.setVariables(enrichedVariables);
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()));
runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService));
runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService));
runContext.setLogger(runContextLogger);
runContext.setTask(task);
@@ -230,7 +230,7 @@ public class RunContextInitializer {
runContextLogger.logger(),
context,
storageInterface,
flowService
namespaceService
);
runContext.setLogger(runContextLogger);

View File

@@ -2,7 +2,7 @@ package io.kestra.core.runners.pebble.functions;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.runners.LocalPathFactory;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.storages.InternalNamespace;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.StorageContext;
@@ -36,7 +36,7 @@ abstract class AbstractFileFunction implements Function {
private static final Pattern EXECUTION_FILE = Pattern.compile(".*/.*/executions/.*/tasks/.*/.*");
@Inject
protected FlowService flowService;
protected NamespaceService namespaceService;
@Inject
protected StorageInterface storageInterface;
@@ -92,7 +92,7 @@ abstract class AbstractFileFunction implements Function {
} else {
namespace = (String) Optional.ofNullable(args.get(NAMESPACE)).orElse(flow.get(NAMESPACE));
fileUri = URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + "/" + str);
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE));
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE));
}
} else {
throw new PebbleException(null, "Unable to read the file " + path, lineNumber, self.getName());
@@ -177,7 +177,7 @@ abstract class AbstractFileFunction implements Function {
// 5. replace '/' with '.'
namespace = namespace.replace("/", ".");
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
return namespace;
}
@@ -198,7 +198,7 @@ abstract class AbstractFileFunction implements Function {
// we will transform nsfile URI into a kestra URI so it is handled seamlessly by all functions
String customNs = Optional.ofNullable((String) args.get(NAMESPACE)).orElse(nsFileUri.getAuthority());
if (customNs != null) {
flowService.checkAllowedNamespace(tenantId, customNs, tenantId, flow.get(NAMESPACE));
namespaceService.checkAllowedNamespace(tenantId, customNs, tenantId, flow.get(NAMESPACE));
}
return Optional.ofNullable(customNs).orElse(flow.get(NAMESPACE));
}

View File

@@ -9,6 +9,7 @@ import io.kestra.core.secret.SecretNotFoundException;
import io.kestra.core.secret.SecretService;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.NamespaceService;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
@@ -36,7 +37,7 @@ public class SecretFunction implements Function {
private SecretService secretService;
@Inject
private FlowService flowService;
private NamespaceService namespaceService;
@Override
public List<String> getArgumentNames() {
@@ -56,7 +57,7 @@ public class SecretFunction implements Function {
if (namespace == null) {
namespace = flowNamespace;
} else {
flowService.checkAllowedNamespace(flowTenantId, namespace, flowTenantId, flowNamespace);
namespaceService.checkAllowedNamespace(flowTenantId, namespace, flowTenantId, flowNamespace);
}
try {

View File

@@ -9,7 +9,6 @@ import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.plugins.PluginRegistry;
@@ -33,7 +32,6 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -60,7 +58,7 @@ public class FlowService {
@Inject
Optional<FlowTopologyRepositoryInterface> flowTopologyRepository;
@Inject
Provider<RunContextFactory> runContextFactory; // Lazy init: avoid circular dependency error.
@@ -94,7 +92,7 @@ public class FlowService {
return flowRepository
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
}
/**
* Evaluates all checks defined in the given flow using the provided inputs.
* <p>
@@ -138,7 +136,7 @@ public class FlowService {
}
return List.of();
}
/**
* Validates the given flow source.
* <p>
@@ -510,50 +508,6 @@ public class FlowService {
return flowRepository.get().delete(flow);
}
/**
* Return true if the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean isAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* If not, throw an IllegalArgumentException.
*/
public void checkAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
if (!isAllowedNamespace(tenant, namespace, fromTenant, fromNamespace)) {
throw new IllegalArgumentException("Namespace " + namespace + " is not allowed.");
}
}
/**
* Return true if the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean areAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* If not, throw an IllegalArgumentException.
*/
public void checkAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
if (!areAllowedAllNamespaces(tenant, fromTenant, fromNamespace)) {
throw new IllegalArgumentException("All namespaces are not allowed, you should either filter on a namespace or configure all namespaces to allow your namespace.");
}
}
/**
* Return true if require existing namespace is enabled and the namespace didn't already exist.
* As namespace management is an EE feature, this will always return false in OSS.
*/
public boolean requireExistingNamespace(String tenant, String namespace) {
return false;
}
/**
* Gets the executable flow for the given namespace, id, and revision.
* Warning: this method bypasses ACL so someone with only execution right can create a flow execution

View File

@@ -20,9 +20,6 @@ public class KVStoreService {
@Inject
private StorageInterface storageInterface;
@Inject
private FlowService flowService;
@Inject
private NamespaceService namespaceService;
@@ -38,7 +35,7 @@ public class KVStoreService {
boolean isNotSameNamespace = fromNamespace != null && !namespace.equals(fromNamespace);
if (isNotSameNamespace && isNotParentNamespace(namespace, fromNamespace)) {
try {
flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
namespaceService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
} catch (IllegalArgumentException e) {
throw new KVStoreException(String.format(
"Cannot access the KV store. Access to '%s' namespace is not allowed from '%s'.", namespace, fromNamespace)

View File

@@ -1,5 +1,6 @@
package io.kestra.core.services;
import io.kestra.core.exceptions.ResourceAccessDeniedException;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.NamespaceUtils;
import jakarta.inject.Inject;
@@ -39,4 +40,52 @@ public class NamespaceService {
}
return false;
}
/**
* Return true if require existing namespace is enabled and the namespace didn't already exist.
* As namespace management is an EE feature, this will always return false in OSS.
*/
public boolean requireExistingNamespace(String tenant, String namespace) {
return false;
}
/**
* Return true if the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean isAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
* If not, throw a ResourceAccessDeniedException.
*
* @throws ResourceAccessDeniedException if the namespace is not allowed.
*/
public void checkAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
if (!isAllowedNamespace(tenant, namespace, fromTenant, fromNamespace)) {
throw new ResourceAccessDeniedException("Namespace " + namespace + " is not allowed.");
}
}
/**
* Return true if the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* As namespace restriction is an EE feature, this will always return true in OSS.
*/
public boolean areAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
return true;
}
/**
* Check that the namespace is allowed from all the namespace in the 'fromTenant' tenant.
* If not, throw a ResourceAccessDeniedException.
*
* @throws ResourceAccessDeniedException if all namespaces all aren't allowed.
*/
public void checkAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
if (!areAllowedAllNamespaces(tenant, fromTenant, fromNamespace)) {
throw new ResourceAccessDeniedException("All namespaces are not allowed, you should either filter on a namespace or configure all namespaces to allow your namespace.");
}
}
}

View File

@@ -1,15 +1,11 @@
package io.kestra.core.storages;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.services.NamespaceService;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -33,7 +29,7 @@ public class InternalStorage implements Storage {
private final Logger logger;
private final StorageContext context;
private final StorageInterface storage;
private final FlowService flowService;
private final NamespaceService namespaceService;
/**
* Creates a new {@link InternalStorage} instance.
@@ -52,11 +48,11 @@ public class InternalStorage implements Storage {
* @param context The storage context.
* @param storage The storage to delegate operations.
*/
public InternalStorage(Logger logger, StorageContext context, StorageInterface storage, FlowService flowService) {
public InternalStorage(Logger logger, StorageContext context, StorageInterface storage, NamespaceService namespaceService) {
this.logger = logger;
this.context = context;
this.storage = storage;
this.flowService = flowService;
this.namespaceService = namespaceService;
}
/**
@@ -74,8 +70,8 @@ public class InternalStorage implements Storage {
public Namespace namespace(String namespace) {
boolean isExternalNamespace = !namespace.equals(context.getNamespace());
// Checks whether the contextual namespace is allowed to access the passed namespace.
if (isExternalNamespace && flowService != null) {
flowService.checkAllowedNamespace(
if (isExternalNamespace && namespaceService != null) {
namespaceService.checkAllowedNamespace(
context.getTenantId(), namespace, // requested Tenant/Namespace
context.getTenantId(), context.getNamespace() // from Tenant/Namespace
);

View File

@@ -6,6 +6,7 @@ import io.kestra.core.models.flows.Input;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.NamespaceService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.AnnotationValue;
@@ -52,6 +53,9 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
@Inject
private FlowService flowService;
@Inject
private NamespaceService namespaceService;
@Override
public boolean isValid(
@Nullable Flow value,
@@ -67,7 +71,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Flow id is a reserved keyword: " + value.getId() + ". List of reserved keywords: " + String.join(", ", RESERVED_FLOW_IDS));
}
if (flowService.requireExistingNamespace(value.getTenantId(), value.getNamespace())) {
if (namespaceService.requireExistingNamespace(value.getTenantId(), value.getNamespace())) {
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
}

View File

@@ -13,7 +13,6 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
@@ -127,14 +126,13 @@ public class Count extends Task implements RunnableTask<Count.Output> {
var flowInfo = runContext.flowInfo();
// check that all flows are allowed
FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
if (flows != null) {
flows.forEach(flow -> flowService.checkAllowedNamespace(flowInfo.tenantId(), flow.getNamespace(), flowInfo.tenantId(), flowInfo.namespace()));
flows.forEach(flow -> runContext.acl().allowNamespace(flow.getNamespace()));
}
if (namespaces != null) {
var renderedNamespaces = runContext.render(this.namespaces).asList(String.class);
renderedNamespaces.forEach(namespace -> flowService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
renderedNamespaces.forEach(namespace -> runContext.acl().allowNamespace(namespace));
}
List<ExecutionCount> executionCounts = executionRepository.executionCounts(

View File

@@ -9,7 +9,6 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
@@ -113,15 +112,14 @@ public class PurgeExecutions extends Task implements RunnableTask<PurgeExecution
@Override
public PurgeExecutions.Output run(RunContext runContext) throws Exception {
ExecutionService executionService = ((DefaultRunContext)runContext).getApplicationContext().getBean(ExecutionService.class);
FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
// validate that this namespace is authorized on the target namespace / all namespaces
var flowInfo = runContext.flowInfo();
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null);
if (renderedNamespace == null){
flowService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace());
runContext.acl().allowAllNamespaces();
} else if (!renderedNamespace.equals(flowInfo.namespace())) {
flowService.checkAllowedNamespace(flowInfo.tenantId(), renderedNamespace, flowInfo.tenantId(), flowInfo.namespace());
runContext.acl().allowNamespace(renderedNamespace);
}
ExecutionService.PurgeResult purgeResult = executionService.purge(

View File

@@ -5,9 +5,7 @@ import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
@@ -65,9 +63,7 @@ public class Delete extends Task implements RunnableTask<Delete.Output> {
@Override
public Output run(RunContext runContext) throws Exception {
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElseThrow();
FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace());
runContext.acl().allowNamespace(renderedNamespace).check();
String renderedKey = runContext.render(this.key).as(String.class).orElseThrow();

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.kv.KVValue;
import io.swagger.v3.oas.annotations.media.Schema;
@@ -82,8 +81,7 @@ public class Get extends Task implements RunnableTask<Get.Output> {
if (Objects.equals(renderedNamespace, flowNamespace)) {
value = getValueWithInheritance(runContext, flowNamespace, renderedKey);
} else {
FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace());
runContext.acl().allowNamespace(renderedNamespace).check();
value = runContext.namespaceKv(renderedNamespace).getValue(renderedKey);
}

View File

@@ -5,9 +5,7 @@ import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.kestra.core.storages.kv.KVEntry;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
@@ -18,7 +16,6 @@ import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.function.Function;
import java.util.function.Predicate;
@Slf4j
@@ -63,9 +60,7 @@ public class GetKeys extends Task implements RunnableTask<GetKeys.Output> {
@Override
public Output run(RunContext runContext) throws Exception {
String renderedNamespace = runContext.render(this.namespace).as(String.class).orElse(null);
FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class);
flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace());
runContext.acl().allowNamespace(renderedNamespace).check();
String renderedPrefix = runContext.render(this.prefix).as(String.class).orElse(null);
Predicate<String> filter = renderedPrefix == null ? key -> true : key -> key.startsWith(renderedPrefix);

View File

@@ -11,7 +11,6 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.utils.ListUtils;
@@ -134,7 +133,6 @@ public class PurgeKV extends Task implements RunnableTask<PurgeKV.Output> {
@VisibleForTesting
protected List<String> findNamespaces(RunContext runContext) throws IllegalVariableEvaluationException {
String tenantId = runContext.flowInfo().tenantId();
String currentNamespace = runContext.flowInfo().namespace();
FlowRepositoryInterface flowRepositoryInterface = ((DefaultRunContext) runContext)
.getApplicationContext().getBean(FlowRepositoryInterface.class);
List<String> distinctNamespaces = flowRepositoryInterface.findDistinctNamespace(tenantId);
@@ -176,9 +174,8 @@ public class PurgeKV extends Task implements RunnableTask<PurgeKV.Output> {
kvNamespaces.addAll(distinctNamespaces);
}
FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class);
for (String ns : kvNamespaces) {
flowService.checkAllowedNamespace(tenantId, ns, tenantId, currentNamespace);
runContext.acl().allowNamespace(ns);
}
return kvNamespaces;
}

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.ExecutionLogService;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
@@ -91,14 +90,13 @@ public class PurgeLogs extends Task implements RunnableTask<PurgeLogs.Output> {
@Override
public Output run(RunContext runContext) throws Exception {
ExecutionLogService logService = ((DefaultRunContext)runContext).getApplicationContext().getBean(ExecutionLogService.class);
FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
// validate that this namespace is authorized on the target namespace / all namespaces
var flowInfo = runContext.flowInfo();
if (namespace == null){
flowService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace());
runContext.acl().allowAllNamespaces();
} else if (!flowInfo.namespace().equals(runContext.render(namespace).as(String.class).orElse(null))) {
flowService.checkAllowedNamespace(flowInfo.tenantId(), runContext.render(namespace).as(String.class).orElse(null), flowInfo.tenantId(), flowInfo.namespace());
runContext.acl().allowNamespace(runContext.render(namespace).as(String.class).orElse(null));
}
var logLevelsRendered = runContext.render(this.logLevels).asList(Level.class);

View File

@@ -296,26 +296,6 @@ class FlowServiceTest {
assertThat(flowService.deprecationPaths(flow)).containsExactlyInAnyOrder("inputs[1].name", "tasks[0]");
}
@Test
void isAllowedNamespace() {
assertTrue(flowService.isAllowedNamespace("tenant", "namespace", "fromTenant", "fromNamespace"));
}
@Test
void checkAllowedNamespace() {
flowService.checkAllowedNamespace("tenant", "namespace", "fromTenant", "fromNamespace");
}
@Test
void areAllowedAllNamespaces() {
assertTrue(flowService.areAllowedAllNamespaces("tenant", "fromTenant", "fromNamespace"));
}
@Test
void checkAllowedAllNamespaces() {
flowService.checkAllowedAllNamespaces("tenant", "fromTenant", "fromNamespace");
}
@Test
void delete() {
FlowWithSource flow = create("deleteTest", "test", 1);
@@ -437,20 +417,20 @@ class FlowServiceTest {
assertThat(results).hasSize(1);
assertThat(results.getFirst().getConstraints()).contains("Flow id is a reserved keyword: pause");
}
@Test
void shouldReturnEmptyListGivenFlowWithNoChecks() {
// Given
Flow flow = mock(Flow.class);
when(flow.getChecks()).thenReturn(List.of());
// When
List<Check> result = flowService.getFailedChecks(flow, Map.of());
// Then
assertThat(result).isEmpty();
}
@Test
void shouldReturnCheckWhenConditionEvaluatesFalse() {
// Given
@@ -463,15 +443,15 @@ class FlowServiceTest {
when(flow.getChecks()).thenReturn(List.of(failingCheck));
when(flow.getNamespace()).thenReturn("io.kestra.unittest");
when(flow.getId()).thenReturn("test");
// When
List<Check> result = flowService.getFailedChecks(flow, Map.of());
// Then
assertThat(result).hasSize(1);
assertThat(result.getFirst()).isEqualTo(failingCheck);
}
@Test
void shouldReturnEmptyListWhenConditionEvaluatesTrue() {
// Given
@@ -484,14 +464,14 @@ class FlowServiceTest {
when(flow.getChecks()).thenReturn(List.of(passingCheck));
when(flow.getNamespace()).thenReturn("io.kestra.unittest");
when(flow.getId()).thenReturn("test");
// When
List<Check> result = flowService.getFailedChecks(flow, Map.of());
// Then
assertThat(result).isEmpty();
}
@Test
void shouldReturnCheckWithErrorMessageWhenExceptionThrown() {
// Given
@@ -504,10 +484,10 @@ class FlowServiceTest {
when(flow.getChecks()).thenReturn(List.of(check));
when(flow.getNamespace()).thenReturn("io.kestra.unittest");
when(flow.getId()).thenReturn("test");
// When
List<Check> result = flowService.getFailedChecks(flow, Map.of());
// Then
assertThat(result).hasSize(1);
Check errorCheck = result.getFirst();
@@ -515,22 +495,22 @@ class FlowServiceTest {
assertThat(errorCheck.getStyle()).isEqualTo(Check.Style.ERROR);
assertThat(errorCheck.getMessage()).contains("Failed to evaluate check condition. Cause:");
}
@Test
void shouldHandleMultipleChecksWithMixedResults() {
// Given
Check passCheck = Check.builder().condition("{{ true }}").message("pass").build();
Check failCheck = Check.builder().condition("{{ false }}").message("fail").build();
Check exceptionCheck = Check.builder().condition("{{ invalidFunction }}").message("exception").build();
Flow flow = mock(Flow.class);
when(flow.getChecks()).thenReturn(List.of(passCheck, failCheck, exceptionCheck));
when(flow.getNamespace()).thenReturn("io.kestra.unittest");
when(flow.getId()).thenReturn("test");
// When
List<Check> result = flowService.getFailedChecks(flow, Map.of());
// Then
assertThat(result).hasSize(2);
assertThat(result).contains(failCheck);

View File

@@ -0,0 +1,61 @@
package io.kestra.core.services;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.tenant.TenantService;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest
class NamespaceServiceTest {
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private NamespaceService namespaceService;
@Test
void isNamespaceExists() {
Flow flow = Flow.builder().id("test").namespace("io.kestra").tenantId(TenantService.MAIN_TENANT).build();
flowRepository.create(GenericFlow.of(flow));
assertThat(namespaceService.isNamespaceExists(TenantService.MAIN_TENANT, "io.kestra")).isTrue();
}
@Test
void isNamespaceExistsShouldReturnFalseWhenNotFond() {
assertThat(namespaceService.isNamespaceExists(TenantService.MAIN_TENANT, "notFound")).isFalse();
}
@Test
void isNamespaceAllowed() {
assertThat(namespaceService.requireExistingNamespace(TenantService.MAIN_TENANT, "io.kestra")).isFalse();
}
@Test
void isAllowedNamespace() {
assertTrue(namespaceService.isAllowedNamespace("tenant", "namespace", "fromTenant", "fromNamespace"));
}
@Test
void checkAllowedNamespace() {
assertDoesNotThrow(() -> namespaceService.checkAllowedNamespace("tenant", "namespace", "fromTenant", "fromNamespace"));
}
@Test
void areAllowedAllNamespaces() {
assertTrue(namespaceService.areAllowedAllNamespaces("tenant", "fromTenant", "fromNamespace"));
}
@Test
void checkAllowedAllNamespaces() {
assertDoesNotThrow(() -> namespaceService.checkAllowedAllNamespaces("tenant", "fromTenant", "fromNamespace"));
}
}

View File

@@ -137,6 +137,9 @@ public class ExecutionController {
@Inject
private FlowService flowService;
@Inject
private NamespaceService namespaceService;
@Inject
protected ExecutionRepositoryInterface executionRepository;
@@ -871,7 +874,7 @@ public class ExecutionController {
if (Namespace.NAMESPACE_FILE_SCHEME.equals(path.getScheme())) {
// if there is an authority, it means the namespace file is for another namespace, so we check it
if (path.getAuthority() != null) {
flowService.checkAllowedNamespace(execution.getTenantId(), path.getAuthority(), execution.getTenantId(), execution.getNamespace());
namespaceService.checkAllowedNamespace(execution.getTenantId(), path.getAuthority(), execution.getTenantId(), execution.getNamespace());
}
return null;
}