feat(core): validate in editor if subflow with namespace present (#6717)

* feat(core): validate in editor if subflow with namespace present

* fix(): added test + return all violations instead of only one

---------

Signed-off-by: Aabhas Sao <aabhassao0@gmail.com>
Co-authored-by: YannC <ycoornaert@kestra.io>
This commit is contained in:
Aabhas Sao
2025-01-20 16:09:34 +05:30
committed by GitHub
parent 05e5af73ab
commit 4b2c10d6e2
3 changed files with 70 additions and 3 deletions

View File

@@ -8,15 +8,16 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.ListUtils;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.SneakyThrows;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -111,6 +112,14 @@ public class FlowService {
return flowRepository.get().findByNamespace(tenantId, namespace);
}
public Optional<Flow> findById(String tenantId, String namespace, String flowId) {
if (flowRepository.isEmpty()) {
throw noRepositoryException();
}
return flowRepository.get().findById(tenantId, namespace, flowId);
}
public Stream<FlowWithSource> keepLastVersion(Stream<FlowWithSource> stream) {
return keepLastVersionCollector(stream);
}
@@ -150,6 +159,33 @@ public class FlowService {
return Collections.emptyList();
}
}
// check if subflow is present in given namespace
public void checkValidSubflows(Flow flow) {
List<io.kestra.plugin.core.flow.Subflow> subFlows = ListUtils.emptyOnNull(flow.getTasks()).stream()
.filter(io.kestra.plugin.core.flow.Subflow.class::isInstance)
.map(io.kestra.plugin.core.flow.Subflow.class::cast)
.toList();
Set<ConstraintViolation<?>> violations = new HashSet<>();
subFlows.forEach(subflow -> {
Optional<Flow> optional = findById(flow.getTenantId(), subflow.getNamespace(), subflow.getFlowId());
violations.add(ManualConstraintViolation.of(
"The subflow '" + subflow.getFlowId() + "' not found in namespace '" + subflow.getNamespace() + "'.",
flow,
Flow.class,
"flow.tasks",
flow.getNamespace()
));
});
if (!violations.isEmpty()) {
throw new ConstraintViolationException(violations);
}
}
public record Relocation(String from, String to) {}
@SuppressWarnings("unchecked")

View File

@@ -10,6 +10,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.plugin.core.debug.Echo;
import io.kestra.plugin.core.debug.Return;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@@ -20,6 +21,7 @@ import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest
@@ -312,4 +314,32 @@ class FlowServiceTest {
flowRepository.create(flow, flow.generateSource(), flow);
assertThat(flowService.findByNamespacePrefix(null, "some.namespace").size(), is(1));
}
@Test
void findById() {
Flow flow = create("findByIdTest", "test", 1);
FlowWithSource saved = flowRepository.create(flow, flow.generateSource(), flow);
assertThat(flowService.findById(null, saved.getNamespace(), saved.getId()).isPresent(), is(true));
}
@Test
void checkValidSubflowsNotFound() {
Flow flow = create("mainFlow", "task", 1).toBuilder()
.tasks(List.of(
io.kestra.plugin.core.flow.Subflow.builder()
.id("subflowTask")
.type(io.kestra.plugin.core.flow.Subflow.class.getName())
.namespace("io.kestra.unittest")
.flowId("nonExistentSubflow")
.build()
))
.build();
ConstraintViolationException exception = assertThrows(ConstraintViolationException.class, () -> {
flowService.checkValidSubflows(flow);
});
assertThat(exception.getConstraintViolations().size(), is(1));
assertThat(exception.getConstraintViolations().iterator().next().getMessage(), is("The subflow 'nonExistentSubflow' not found in namespace 'io.kestra.unittest'."));
}
}

View File

@@ -426,7 +426,7 @@ public class FlowController {
return HttpResponse.status(HttpStatus.NOT_FOUND);
}
Flow flowParsed = yamlParser.parse(flow, Flow.class);
flowService.checkValidSubflows(flowParsed);
return HttpResponse.ok(update(flowParsed, existingFlow.get(), flow));
}
@@ -589,6 +589,7 @@ public class FlowController {
validateConstraintViolationBuilder.namespace(flowParse.getNamespace());
modelValidator.validate(pluginDefaultService.injectDefaults(flowParse.withSource(flow)));
flowService.checkValidSubflows(flowParse);
} catch (ConstraintViolationException e) {
validateConstraintViolationBuilder.constraints(e.getMessage());
} catch (RuntimeException re) {