feat(system): remove FlowTopologyHandler and TriggerEventPublisher

And make all flow modification pass throught the FlowService so that downstream consumers are always updated
This commit is contained in:
Loïc Mathieu
2025-12-04 14:15:13 +01:00
parent 638d9979fd
commit c5188074a9
17 changed files with 540 additions and 489 deletions

View File

@@ -204,6 +204,9 @@ subprojects {subProj ->
//assertj
testImplementation 'org.assertj:assertj-core'
// awaitility
testImplementation 'org.awaitility:awaitility'
}
def commonTestConfig = { Test t ->

View File

@@ -4,6 +4,7 @@ import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
@@ -12,6 +13,8 @@ import picocli.CommandLine;
import java.util.List;
import java.util.Objects;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@CommandLine.Command(
name = "reindex",
description = "Reindex all records of a type: read them from the database then update them",
@@ -31,12 +34,13 @@ public class ReindexCommand extends AbstractCommand {
if ("flow".equals(type)) {
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
FlowService flowService = applicationContext.getBean(FlowService.class);
List<Flow> allFlow = flowRepository.findAllForAllTenants();
allFlow.stream()
.map(flow -> flowRepository.findByIdWithSource(flow.getTenantId(), flow.getNamespace(), flow.getId()).orElse(null))
.filter(Objects::nonNull)
.forEach(flow -> flowRepository.update(GenericFlow.of(flow), flow));
.forEach(throwConsumer(flow -> flowService.update(GenericFlow.of(flow), flow)));
stdOut("Successfully reindex " + allFlow.size() + " flow(s).");
}

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
@@ -28,6 +29,8 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@Singleton
@Slf4j
@Requires(property = "micronaut.io.watch.enabled", value = "true")
@@ -40,6 +43,9 @@ public class FileChangedEventListener {
@Inject
private FlowRepositoryInterface flowRepositoryInterface;
@Inject
private FlowService flowService;
@Inject
private PluginDefaultService pluginDefaultService;
@@ -62,7 +68,7 @@ public class FileChangedEventListener {
public void startListeningFromConfig() throws IOException, InterruptedException {
if (fileWatchConfiguration != null && fileWatchConfiguration.isEnabled()) {
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface);
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface, flowService);
List<Path> paths = fileWatchConfiguration.getPaths();
this.setup(paths);
@@ -162,10 +168,10 @@ public class FileChangedEventListener {
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(flowWithPath -> {
.ifPresent(throwConsumer(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
});
}));
} catch (IOException e) {
log.error("Error reading file: {}", entry, e);
}
@@ -175,10 +181,10 @@ public class FileChangedEventListener {
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(flowWithPath -> {
.ifPresent(throwConsumer(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
});
}));
}
}
} catch (Exception e) {
@@ -215,7 +221,11 @@ public class FileChangedEventListener {
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
flows.add(FlowWithPath.of(flow.get(), file.toString()));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
try {
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
} catch (Exception e) {
log.error("Unexpected error while watching flows", e);
}
}
}
return FileVisitResult.CONTINUE;

View File

@@ -2,12 +2,13 @@ package io.kestra.cli.services;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.queues.QueueException;
public interface FlowFilesManager {
FlowWithSource createOrUpdateFlow(GenericFlow flow);
FlowWithSource createOrUpdateFlow(GenericFlow flow) throws Exception;
void deleteFlow(FlowWithSource toDelete);
void deleteFlow(FlowWithSource toDelete) throws QueueException;
void deleteFlow(String tenantId, String namespace, String id);
void deleteFlow(String tenantId, String namespace, String id) throws QueueException;
}

View File

@@ -2,33 +2,41 @@ package io.kestra.cli.services;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowService;
import lombok.extern.slf4j.Slf4j;
import static io.kestra.core.utils.Rethrow.*;
@Slf4j
public class LocalFlowFileWatcher implements FlowFilesManager {
private final FlowRepositoryInterface flowRepository;
private final FlowService flowService;
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository) {
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository, FlowService flowService) {
this.flowRepository = flowRepository;
this.flowService = flowService;
}
@Override
public FlowWithSource createOrUpdateFlow(final GenericFlow flow) {
public FlowWithSource createOrUpdateFlow(final GenericFlow flow) throws Exception {
return flowRepository.findById(flow.getTenantId(), flow.getNamespace(), flow.getId())
.map(previous -> flowRepository.update(flow, previous))
.orElseGet(() -> flowRepository.create(flow));
.map(throwFunction(previous -> flowService.update(flow, previous)))
.orElseGet(throwSupplier(() -> flowService.create(flow)));
}
@Override
public void deleteFlow(FlowWithSource toDelete) {
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepository::delete);
public void deleteFlow(FlowWithSource toDelete) throws QueueException {
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId())
.ifPresent(throwConsumer(flow -> flowService.delete(flow)));
log.info("Flow {} has been deleted", toDelete.getId());
}
@Override
public void deleteFlow(String tenantId, String namespace, String id) {
flowRepository.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepository::delete);
public void deleteFlow(String tenantId, String namespace, String id) throws QueueException {
flowRepository.findByIdWithSource(tenantId, namespace, id)
.ifPresent(throwConsumer(flow -> flowService.delete(flow)));
log.info("Flow {} has been deleted", id);
}
}

View File

@@ -161,10 +161,22 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fie
Flux<Flow> findAsync(String tenantId, List<QueryFilter> filters);
/**
* Create a flow.
* It should not be called directly but instead <code>FlowService.create(GenericFlow flow)</code> should be used as it re-computes topology and triggers.
*/
FlowWithSource create(GenericFlow flow);
/**
* Update a flow.
* It should not be called directly but instead <code>FlowService.update(GenericFlow flow)</code> should be used as it re-computes topology and triggers.
*/
FlowWithSource update(GenericFlow flow, FlowInterface previous) throws ConstraintViolationException;
/**
* Delete a flow.
* It should not be called directly but instead <code>FlowService.delete(GenericFlow flow)</code> should be used as it re-computes topology and triggers.
*/
FlowWithSource delete(FlowInterface flow);
Boolean existAnyNoAcl(String tenantId);

View File

@@ -3,11 +3,10 @@ package io.kestra.core.repositories;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.queues.QueueException;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.Rethrow;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -43,10 +42,7 @@ public class LocalFlowRepositoryLoader {
private FlowRepositoryInterface flowRepository;
@Inject
private ModelValidator modelValidator;
@Inject
private PluginDefaultService pluginDefaultService;
private FlowService flowService;
public void load(URL basePath) throws IOException, URISyntaxException {
load(MAIN_TENANT, basePath);
@@ -95,19 +91,16 @@ public class LocalFlowRepositoryLoader {
String source = Files.readString(Path.of(file.toFile().getPath()), Charset.defaultCharset());
GenericFlow parsed = GenericFlow.fromYaml(tenantId, source);
FlowWithSource flowWithSource = pluginDefaultService.injectAllDefaults(parsed, false);
modelValidator.validate(flowWithSource);
FlowInterface existing = flowByUidInRepository.get(flowWithSource.uidWithoutRevision());
FlowInterface existing = flowByUidInRepository.get(parsed.uidWithoutRevision());
if (existing == null) {
flowRepository.create(parsed);
flowService.create(parsed);
log.trace("Created flow {}.{}", parsed.getNamespace(), parsed.getId());
} else {
flowRepository.update(parsed, existing);
flowService.update(parsed, existing);
log.trace("Updated flow {}.{}", parsed.getNamespace(), parsed.getId());
}
} catch (FlowProcessingException | ConstraintViolationException e) {
} catch (FlowProcessingException | ConstraintViolationException | QueueException e) {
log.warn("Unable to create flow {}", file, e);
}
}));

View File

@@ -9,17 +9,31 @@ import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.models.triggers.WorkerTriggerInterface;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.Pause;
import io.kestra.scheduler.TriggerEventQueue;
import io.kestra.scheduler.events.TriggerCreated;
import io.kestra.scheduler.events.TriggerDeleted;
import io.kestra.scheduler.events.TriggerEvent;
import io.kestra.scheduler.events.TriggerUpdated;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
@@ -30,7 +44,10 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -45,22 +62,48 @@ import java.util.stream.StreamSupport;
@Slf4j
public class FlowService {
@Inject
Optional<FlowRepositoryInterface> flowRepository;
private Optional<FlowRepositoryInterface> flowRepository;
@Inject
PluginDefaultService pluginDefaultService;
private PluginDefaultService pluginDefaultService;
@Inject
PluginRegistry pluginRegistry;
private PluginRegistry pluginRegistry;
@Inject
ModelValidator modelValidator;
private ModelValidator modelValidator;
@Inject
Optional<FlowTopologyRepositoryInterface> flowTopologyRepository;
private Optional<FlowTopologyRepositoryInterface> flowTopologyRepository;
@Inject
Provider<RunContextFactory> runContextFactory; // Lazy init: avoid circular dependency error.
private Provider<RunContextFactory> runContextFactory; // Lazy init: avoid circular dependency error.
@Inject
private FlowTopologyService flowTopologyService;
@Inject
@Named(QueueFactoryInterface.FLOW_NAMED)
private QueueInterface<FlowInterface> flowQueue;
@Inject
private TriggerEventQueue triggerEventQueue;
private final ExecutorService executorService;
@Inject
public FlowService(ExecutorsUtils executorsUtils) {
this.executorService = executorsUtils.maxCachedThreadPool(Runtime.getRuntime().availableProcessors(), "flow-service");
}
@PreDestroy
void close() throws InterruptedException {
executorService.shutdown();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
}
/**
* Validates and creates the given flow.
@@ -68,24 +111,147 @@ public class FlowService {
* The validation of the flow is done from the source after injecting all plugin default values.
*
* @param flow The flow.
* @param strictValidation Specifies whether to perform a strict validation of the flow.
* @return The created {@link FlowWithSource}.
*/
public FlowWithSource create(GenericFlow flow, final boolean strictValidation) throws FlowProcessingException {
public FlowWithSource create(GenericFlow flow) throws FlowProcessingException, QueueException {
// FIXME validation is done both here and in the repo
Objects.requireNonNull(flow, "Cannot create null flow");
if (flow.getSource() == null || flow.getSource().isBlank()) {
throw new IllegalArgumentException("Cannot create flow with null or blank source");
}
// Inject plugin default versions, and perform parsing validation when strictValidation = true (i.e., checking unknown and duplicated properties).
FlowWithSource parsed = pluginDefaultService.parseFlowWithVersionDefaults(flow.getTenantId(), flow.getSource(), strictValidation);
// Inject plugin default versions, and perform strict parsing validation (i.e., checking unknown and duplicated properties).
FlowWithSource parsed = pluginDefaultService.parseFlowWithVersionDefaults(flow.getTenantId(), flow.getSource(), true);
// Validate Flow with defaults values
// Do not perform a strict parsing validation to ignore unknown
// properties that might be injecting through default values.
modelValidator.validate(pluginDefaultService.injectAllDefaults(parsed, false));
return repository().create(flow);
FlowWithSource created = repository().create(flow);
// impact downstream consumers: topology, scheduler and flow metastore
impactDownstreamConsumers(created);
return created;
}
/**
* Validates and creates the given flow.
* <p>
* The validation of the flow is done from the source after injecting all plugin default values.
*
* @param flow The flow.
* @return The created {@link FlowWithSource}.
*/
public FlowWithSource update(GenericFlow flow, FlowInterface previous) throws FlowProcessingException, QueueException {
// FIXME validation is done both here and in the repo
Objects.requireNonNull(flow, "Cannot create null flow");
if (flow.getSource() == null || flow.getSource().isBlank()) {
throw new IllegalArgumentException("Cannot create flow with null or blank source");
}
Objects.requireNonNull(previous, "Cannot update a flow with null previous");
// Inject plugin default versions, and perform strict parsing validation (i.e., checking unknown and duplicated properties).
FlowWithSource parsed = pluginDefaultService.parseFlowWithVersionDefaults(flow.getTenantId(), flow.getSource(), true);
// Validate Flow with defaults values
// Do not perform a strict parsing validation to ignore unknown
// properties that might be injecting through default values.
modelValidator.validate(pluginDefaultService.injectAllDefaults(parsed, false));
FlowWithSource updated = repository().update(flow, previous);
// impact downstream consumers: topology, scheduler and flow metastore
impactDownstreamConsumers(updated);
return updated;
}
/**
* Delete a flow.
*/
public FlowWithSource delete(FlowWithSource flow) throws QueueException {
if (flowRepository.isEmpty()) {
throw noRepositoryException();
}
FlowWithSource deleted = flowRepository.get().delete(flow);
// impact downstream consumers: topology, scheduler and flow metastore
impactDownstreamConsumers(deleted);
return deleted;
}
private void impactDownstreamConsumers(FlowWithSource flow) throws QueueException {
// update the topology asynchronously
executorService.submit(() -> updateTopology(flow));
// compute triggers events for the Scheduler
recomputeTriggers(flow);
// send it to the flow queue for the flow metastore
flowQueue.emit(flow);
}
private void updateTopology(FlowWithSource flow) {
flowTopologyRepository.get().save(
flow,
(flow.isDeleted() ?
Stream.<FlowTopology>empty() :
flowTopologyService
.topology(
flow,
flowRepository.get().findAllWithSource(flow.getTenantId())
)
)
.distinct()
.toList()
);
}
private void recomputeTriggers(FlowWithSource flow) {
var previous = flow.getRevision() <= 1 ? null : flowRepository.get().findById(flow.getTenantId(), flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision() - 1)).orElse(null);
if (flow.isDeleted() || previous != null) {
List<AbstractTrigger> triggersDeleted = flow.isDeleted() ?
ListUtils.emptyOnNull(flow.getTriggers()) :
FlowService.findRemovedTrigger(flow, previous);
triggersDeleted.forEach(trigger ->
sendTriggerEvent(new TriggerDeleted(TriggerId.of(flow, trigger), Instant.now()))
);
}
if (previous != null && !Objects.equals(previous.getRevision(), flow.getRevision())) {
FlowService.findUpdatedTrigger(flow, previous)
.stream()
.filter(trigger -> trigger instanceof WorkerTriggerInterface)
.forEach(trigger ->
sendTriggerEvent(new TriggerUpdated(TriggerId.of(flow, trigger), flow.getRevision(), Instant.now()))
);
FlowService.findNewTrigger(flow, previous)
.stream()
.filter(trigger -> trigger instanceof WorkerTriggerInterface)
.forEach(trigger ->
sendTriggerEvent(new TriggerUpdated(TriggerId.of(flow, trigger), flow.getRevision(), Instant.now()))
);
return;
}
if (flow.getTriggers() != null) {
flow.getTriggers()
.stream()
.filter(trigger -> trigger instanceof WorkerTriggerInterface)
.forEach(trigger ->
sendTriggerEvent(new TriggerCreated(TriggerId.of(flow, trigger), Instant.now(), flow.getRevision()))
);
}
}
private void sendTriggerEvent(TriggerEvent event) {
this.triggerEventQueue.send(event);
}
private FlowRepositoryInterface repository() {
@@ -260,10 +426,6 @@ public class FlowService {
return flowRepository.get().findById(tenantId, namespace, flowId);
}
public Stream<FlowInterface> keepLastVersion(Stream<FlowInterface> stream) {
return keepLastVersionCollector(stream);
}
public List<String> deprecationPaths(Flow flow) {
return deprecationTraversal("", flow).toList();
}
@@ -425,31 +587,6 @@ public class FlowService {
.filter(method -> !Modifier.isStatic(method.getModifiers()));
}
public Collection<FlowInterface> keepLastVersion(List<FlowInterface> flows) {
return keepLastVersionCollector(flows.stream()).toList();
}
public Stream<FlowInterface> keepLastVersionCollector(Stream<FlowInterface> stream) {
// Use a Map to track the latest version of each flow
Map<String, FlowInterface> latestFlows = new HashMap<>();
stream.forEach(flow -> {
String uid = flow.uidWithoutRevision();
FlowInterface existing = latestFlows.get(uid);
// Update only if the current flow has a higher revision
if (existing == null || flow.getRevision() > existing.getRevision()) {
latestFlows.put(uid, flow);
} else if (flow.getRevision().equals(existing.getRevision()) && flow.isDeleted()) {
// Edge case: prefer deleted flow with the same revision
latestFlows.put(uid, flow);
}
});
// Return the non-deleted flows
return latestFlows.values().stream().filter(flow -> !flow.isDeleted());
}
public boolean removeUnwanted(Flow f, Execution execution) {
// we don't allow recursive
return !f.uidWithoutRevision().equals(FlowId.uidWithoutRevision(execution));
@@ -475,6 +612,16 @@ public class FlowService {
.toList();
}
public static List<AbstractTrigger> findNewTrigger(Flow flow, Flow previous) {
return ListUtils.emptyOnNull(flow.getTriggers())
.stream()
.filter(oldTrigger -> ListUtils.emptyOnNull(previous.getTriggers())
.stream()
.noneMatch(trigger -> trigger.getId().equals(oldTrigger.getId()))
)
.toList();
}
public static String cleanupSource(String source) {
return source.replaceFirst("(?m)^revision: \\d+\n?", "");
}
@@ -499,15 +646,6 @@ public class FlowService {
return flowRepository.get().findByNamespacePrefix(tenantId, namespacePrefix);
}
// Used in Git plugin
public FlowWithSource delete(FlowWithSource flow) {
if (flowRepository.isEmpty()) {
throw noRepositoryException();
}
return flowRepository.get().delete(flow);
}
/**
* Gets the executable flow for the given namespace, id, and revision.
* Warning: this method bypasses ACL so someone with only execution right can create a flow execution

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
@@ -7,7 +8,8 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.property.Property;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.services.FlowService;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.core.debug.Return;
@@ -18,7 +20,6 @@ import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
@@ -29,7 +30,7 @@ class DefaultFlowMetaStoreTest {
private DefaultFlowMetaStore flowMetaStore;
@Inject
private FlowRepositoryInterface flowRepository;
private FlowService flowService;
@AfterEach
void clean() {
@@ -37,16 +38,15 @@ class DefaultFlowMetaStoreTest {
}
@Test
void findById() {
Flow test = createFlow();
flowRepository.create(GenericFlow.of(test));
void findById() throws FlowProcessingException, QueueException {
FlowWithSource test = flowService.create(GenericFlow.of(createFlow()));
Optional<FlowInterface> maybeFlow = flowMetaStore.findById(test.getTenantId(), test.getNamespace(), test.getId(), Optional.empty());
assertThat(maybeFlow).isPresent();
assertThat(maybeFlow.get().getId()).isEqualTo(test.getId());
flowRepository.delete(test);
flowService.delete(test);
}
@Test
@@ -58,11 +58,10 @@ class DefaultFlowMetaStoreTest {
}
@Test
void findByIdShouldReturnLastRevision() {
Flow test = createFlow();
flowRepository.create(GenericFlow.of(test));
void findByIdShouldReturnLastRevision() throws FlowProcessingException, QueueException {
FlowWithSource test = flowService.create(GenericFlow.of(createFlow()));
Flow updated = test.toBuilder().tasks(List.of(Return.builder().id("return").format(Property.ofValue("new format")).type(Return.class.getName()).build())).build();
flowRepository.update(GenericFlow.of(updated), test);
flowService.update(GenericFlow.of(updated), test);
Optional<FlowInterface> maybeFlow = flowMetaStore.findById(test.getTenantId(), test.getNamespace(), test.getId(), Optional.of(2));
@@ -70,14 +69,13 @@ class DefaultFlowMetaStoreTest {
assertThat(maybeFlow.get().getId()).isEqualTo(test.getId());
assertThat(maybeFlow.get().getRevision()).isEqualTo(2);
flowRepository.delete(test);
flowService.delete(test);
}
@Test
void findByIdShouldReturnPreviousRevision() {
Flow test = createFlow();
flowRepository.create(GenericFlow.of(test));
flowRepository.update(GenericFlow.of(test.toBuilder().revision(2).build()), test);
void findByIdShouldReturnPreviousRevision() throws FlowProcessingException, QueueException {
FlowWithSource test = flowService.create(GenericFlow.of(createFlow()));
flowService.update(GenericFlow.of(test.toBuilder().revision(2).build()), test);
Optional<FlowInterface> maybeFlow = flowMetaStore.findById(test.getTenantId(), test.getNamespace(), test.getId(), Optional.of(1));
@@ -85,14 +83,13 @@ class DefaultFlowMetaStoreTest {
assertThat(maybeFlow.get().getId()).isEqualTo(test.getId());
assertThat(maybeFlow.get().getRevision()).isEqualTo(1);
flowRepository.delete(test);
flowService.delete(test);
}
@Test
void findByIdShouldReturnEmptyForDeletedFlow() throws InterruptedException {
Flow test = createFlow();
flowRepository.create(GenericFlow.of(test));
flowRepository.delete(test);
void findByIdShouldReturnEmptyForDeletedFlow() throws InterruptedException, FlowProcessingException, QueueException {
FlowWithSource test = flowService.create(GenericFlow.of(createFlow()));
flowService.delete(test);
Thread.sleep(100); // make sure the metastore receive the deletion
Optional<FlowInterface> maybeFlow = flowMetaStore.findById(test.getTenantId(), test.getNamespace(), test.getId(), Optional.empty());
@@ -101,9 +98,9 @@ class DefaultFlowMetaStoreTest {
}
@Test
void findByExecution() {
void findByExecution() throws FlowProcessingException, QueueException {
Flow test = createFlow();
FlowWithSource created = flowRepository.create(GenericFlow.of(test));
FlowWithSource created = flowService.create(GenericFlow.of(test));
Execution execution = Execution.newExecution(created, null, null, Optional.empty());
Optional<FlowInterface> maybeFlow = flowMetaStore.findByExecution(execution);
@@ -111,7 +108,7 @@ class DefaultFlowMetaStoreTest {
assertThat(maybeFlow).isPresent();
assertThat(maybeFlow.get().getId()).isEqualTo(test.getId());
flowRepository.delete(test);
flowService.delete(created);
}
@Test
@@ -125,9 +122,9 @@ class DefaultFlowMetaStoreTest {
}
@Test
void findByExecutionThenInjectDefaults() {
void findByExecutionThenInjectDefaults() throws FlowProcessingException, QueueException {
Flow test = createFlow();
FlowWithSource created = flowRepository.create(GenericFlow.of(test));
FlowWithSource created = flowService.create(GenericFlow.of(test));
Execution execution = Execution.newExecution(created, null, null, Optional.empty());
Optional<FlowWithSource> maybeFlow = flowMetaStore.findByExecutionThenInjectDefaults(execution);
@@ -135,7 +132,7 @@ class DefaultFlowMetaStoreTest {
assertThat(maybeFlow).isPresent();
assertThat(maybeFlow.get().getId()).isEqualTo(test.getId());
flowRepository.delete(test);
flowService.delete(created);
}
@Test
@@ -149,11 +146,11 @@ class DefaultFlowMetaStoreTest {
}
@Test
void allLastVersion() throws TimeoutException, InterruptedException {
void allLastVersion() throws InterruptedException, FlowProcessingException, QueueException {
FlowWithSource test1 = createFlow();
flowRepository.create(GenericFlow.of(test1));
flowService.create(GenericFlow.of(test1));
FlowWithSource test2 = createFlow();
flowRepository.create(GenericFlow.of(test2));
flowService.create(GenericFlow.of(test2));
Thread.sleep(100); // make sure the metastore receive the items
Collection<FlowWithSource> flows = flowMetaStore.allLastVersion();
@@ -163,16 +160,15 @@ class DefaultFlowMetaStoreTest {
}
@Test
void findByIdFromTask() {
Flow test = createFlow();
flowRepository.create(GenericFlow.of(test));
void findByIdFromTask() throws FlowProcessingException, QueueException {
FlowWithSource test = flowService.create(GenericFlow.of(createFlow()));
Optional<FlowInterface> maybeFlow = flowMetaStore.findByIdFromTask(test.getTenantId(), test.getNamespace(), test.getId(), Optional.empty(), test.getTenantId(), test.getNamespace(), test.getId());
assertThat(maybeFlow).isPresent();
assertThat(maybeFlow.get().getId()).isEqualTo(test.getId());
flowRepository.delete(test);
flowService.delete(test);
}
@Test

View File

@@ -7,18 +7,29 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.check.Check;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.core.debug.Return;
import io.kestra.plugin.core.flow.Subflow;
import io.kestra.plugin.core.trigger.Schedule;
import io.kestra.scheduler.TriggerEventQueue;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.test.annotation.MockBean;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.validation.constraints.NotBlank;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -26,18 +37,25 @@ import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;
@KestraTest
@Execution(ExecutionMode.SAME_THREAD)
class FlowServiceTest {
private static final String TEST_NAMESPACE = "io.kestra.unittest";
@@ -45,6 +63,13 @@ class FlowServiceTest {
private FlowService flowService;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private FlowTopologyRepositoryInterface flowTopologyRepository;
@Inject
@Named(QueueFactoryInterface.FLOW_NAMED)
private QueueInterface<FlowInterface> flowQueue;
@Inject
private TriggerEventQueue triggerEventQueue;
private static FlowWithSource create(String flowId, String taskId, Integer revision) {
return create(null, TEST_NAMESPACE, flowId, taskId, revision);
@@ -160,77 +185,6 @@ class FlowServiceTest {
assertThat(fromDb.get().getSource()).isEqualTo(oldSource);
}
@Test
void sameRevisionWithDeletedOrdered() {
Stream<FlowInterface> stream = Stream.of(
create("test", "test", 1),
create("test", "test2", 2),
create("test", "test2", 2).toDeleted(),
create("test", "test2", 4)
);
List<FlowInterface> collect = flowService.keepLastVersion(stream).toList();
assertThat(collect.size()).isEqualTo(1);
assertThat(collect.getFirst().isDeleted()).isFalse();
assertThat(collect.getFirst().getRevision()).isEqualTo(4);
}
@Test
void sameRevisionWithDeletedSameRevision() {
Stream<FlowInterface> stream = Stream.of(
create("test2", "test2", 1),
create("test", "test", 1),
create("test", "test2", 2),
create("test", "test3", 3),
create("test", "test2", 2).toDeleted()
);
List<FlowInterface> collect = flowService.keepLastVersion(stream).toList();
assertThat(collect.size()).isEqualTo(1);
assertThat(collect.getFirst().isDeleted()).isFalse();
assertThat(collect.getFirst().getId()).isEqualTo("test2");
}
@Test
void sameRevisionWithDeletedUnordered() {
Stream<FlowInterface> stream = Stream.of(
create("test", "test", 1),
create("test", "test2", 2),
create("test", "test2", 4),
create("test", "test2", 2).toDeleted()
);
List<FlowInterface> collect = flowService.keepLastVersion(stream).toList();
assertThat(collect.size()).isEqualTo(1);
assertThat(collect.getFirst().isDeleted()).isFalse();
assertThat(collect.getFirst().getRevision()).isEqualTo(4);
}
@Test
void multipleFlow() {
Stream<FlowInterface> stream = Stream.of(
create("test", "test", 2),
create("test", "test2", 1),
create("test2", "test2", 1),
create("test2", "test3", 3),
create("test3", "test1", 2),
create("test3", "test2", 3)
);
List<FlowInterface> collect = flowService.keepLastVersion(stream).toList();
assertThat(collect.size()).isEqualTo(3);
assertThat(collect.stream().filter(flow -> flow.getId().equals("test")).findFirst().orElseThrow().getRevision()).isEqualTo(2);
assertThat(collect.stream().filter(flow -> flow.getId().equals("test2")).findFirst().orElseThrow().getRevision()).isEqualTo(3);
assertThat(collect.stream().filter(flow -> flow.getId().equals("test3")).findFirst().orElseThrow().getRevision()).isEqualTo(3);
}
@Test
void warnings() {
FlowWithSource flow = create("test", "test", 1).toBuilder()
@@ -292,15 +246,6 @@ class FlowServiceTest {
assertThat(flowService.deprecationPaths(flow)).containsExactlyInAnyOrder("tasks[0]");
}
@Test
void delete() {
FlowWithSource flow = create("deleteTest", "test", 1);
FlowWithSource saved = flowRepository.create(GenericFlow.of(flow));
assertThat(flowRepository.findById(flow.getTenantId(), flow.getNamespace(), flow.getId()).isPresent()).isTrue();
flowService.delete(saved);
assertThat(flowRepository.findById(flow.getTenantId(), flow.getNamespace(), flow.getId()).isPresent()).isFalse();
}
@Test
void findByNamespacePrefix() {
FlowWithSource flow = create(null, "some.namespace","findByTest", "test", 1);
@@ -533,6 +478,174 @@ class FlowServiceTest {
assertThat(result).isEmpty();
}
@Test
void create() throws FlowProcessingException, QueueException, InterruptedException {
Flow subflow = Flow.builder()
.id(IdUtils.create())
.tenantId(TenantService.MAIN_TENANT)
.namespace("io.kestra.unittest")
.tasks(List.of(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build()))
.build();
Flow flow = Flow.builder()
.id(IdUtils.create())
.tenantId(TenantService.MAIN_TENANT)
.namespace("io.kestra.unittest")
.tasks(List.of(Subflow.builder().id("test").type(Subflow.class.getName()).namespace("io.kestra.unittest").flowId(subflow.getId()).build()))
.triggers(List.of(Schedule.builder().id("test").type(Schedule.class.getName()).cron("0 0 * * *").build()))
.build();
CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable cancellation = flowQueue.receive(either -> {
if (either.isLeft()) {
if (either.getLeft().getId().equals(flow.getId())) {
countDownLatch.countDown();
}
}
});
flowService.create(GenericFlow.of(subflow));
flowService.create(GenericFlow.of(flow));
// check that it has been created
Optional<FlowWithSource> fromDb = flowRepository.findByIdWithSource(flow.getTenantId(), flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(fromDb).isPresent();
assertThat(fromDb.get().getRevision()).isEqualTo(1);
// check that topology has been inserted
List<FlowTopology> topo = await()
.atMost(Duration.ofSeconds(10))
.until(
() -> flowTopologyRepository.findByFlow(flow.getTenantId(), flow.getNamespace(), flow.getId(), false),
it -> !it.isEmpty()
);
assertThat(topo).hasSize(1);
assertThat(topo.getFirst().getSource().getId()).isEqualTo(flow.getId());
assertThat(topo.getFirst().getDestination().getId()).isEqualTo(subflow.getId());
// check that triggers have been sent
verify(triggerEventQueue).send(any());
// check that the flow has been sent to the queue
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
cancellation.run();
}
@Test
void update() throws FlowProcessingException, QueueException, InterruptedException {
Flow subflow = Flow.builder()
.id(IdUtils.create())
.tenantId(TenantService.MAIN_TENANT)
.namespace("io.kestra.unittest")
.tasks(List.of(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build()))
.build();
Flow flow = Flow.builder()
.id(IdUtils.create())
.tenantId(TenantService.MAIN_TENANT)
.namespace("io.kestra.unittest")
.tasks(List.of(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build()))
.build();
CountDownLatch countDownLatch = new CountDownLatch(2);
Runnable cancellation = flowQueue.receive(either -> {
if (either.isLeft()) {
if (either.getLeft().getId().equals(flow.getId())) {
countDownLatch.countDown();
}
}
});
flowService.create(GenericFlow.of(subflow));
flowService.create(GenericFlow.of(flow));
Flow updated = flow.toBuilder()
.tasks(List.of(Subflow.builder().id("test").type(Subflow.class.getName()).namespace("io.kestra.unittest").flowId(subflow.getId()).build()))
.triggers(List.of(Schedule.builder().id("test").type(Schedule.class.getName()).cron("0 0 * * *").build()))
.build();
flowService.update(GenericFlow.of(updated), GenericFlow.of(flow));
// check that it has been created then updated
Optional<FlowWithSource> fromDb = flowRepository.findByIdWithSource(flow.getTenantId(), flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(fromDb).isPresent();
assertThat(fromDb.get().getRevision()).isEqualTo(2);
// check that topology has been inserted
List<FlowTopology> topo = await()
.atMost(Duration.ofSeconds(10))
.until(
() -> flowTopologyRepository.findByFlow(flow.getTenantId(), flow.getNamespace(), flow.getId(), false),
it -> !it.isEmpty()
);
assertThat(topo).hasSize(1);
assertThat(topo.getFirst().getSource().getId()).isEqualTo(flow.getId());
assertThat(topo.getFirst().getDestination().getId()).isEqualTo(subflow.getId());
// check that triggers have been sent
verify(triggerEventQueue).send(any());
// check that the flow has been sent to the queue 2x
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
cancellation.run();
}
@Test
void delete() throws FlowProcessingException, QueueException, InterruptedException {
Flow subflow = Flow.builder()
.id(IdUtils.create())
.tenantId(TenantService.MAIN_TENANT)
.namespace("io.kestra.unittest")
.tasks(List.of(Return.builder().id("test").type(Return.class.getName()).format(Property.ofValue("test")).build()))
.build();
Flow flow = Flow.builder()
.id(IdUtils.create())
.tenantId(TenantService.MAIN_TENANT)
.namespace("io.kestra.unittest")
.tasks(List.of(Subflow.builder().id("test").type(Subflow.class.getName()).namespace("io.kestra.unittest").flowId(subflow.getId()).build()))
.triggers(List.of(Schedule.builder().id("test").type(Schedule.class.getName()).cron("0 0 * * *").build()))
.build();
CountDownLatch countDownLatch = new CountDownLatch(2);
Runnable cancellation = flowQueue.receive(either -> {
if (either.isLeft()) {
if (either.getLeft().getId().equals(flow.getId())) {
countDownLatch.countDown();
}
}
});
flowService.create(GenericFlow.of(subflow));
FlowWithSource created = flowService.create(GenericFlow.of(flow));
// be sure that topology and triggers have been computed
await()
.atMost(Duration.ofSeconds(10))
.until(() -> !flowTopologyRepository.findByFlow(flow.getTenantId(), flow.getNamespace(), flow.getId(), false).isEmpty());
verify(triggerEventQueue).send(any());
reset(triggerEventQueue);
flowService.delete(created);
// check that it has been deleted
Optional<FlowWithSource> fromDb = flowRepository.findByIdWithSource(flow.getTenantId(), flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(fromDb).isEmpty();
// check that topology has been removed
await()
.atMost(Duration.ofSeconds(10))
.until(() -> flowTopologyRepository.findByFlow(flow.getTenantId(), flow.getNamespace(), flow.getId(), false).isEmpty());
// check that triggers have been removed
verify(triggerEventQueue).send(any());
// check that the flow has been sent to the queue 2x
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
cancellation.run();
}
@MockBean
@Replaces(TriggerEventQueue.class)
TriggerEventQueue triggerEventQueue() {
return mock(TriggerEventQueue.class);
}
@SuperBuilder
@ToString
@EqualsAndHashCode

View File

@@ -1,107 +0,0 @@
package io.kestra.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.utils.Either;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Objects;
import java.util.stream.Stream;
/**
* This component is responsible to compute the flow topology on each flow message received from the Flow queue.
*/
@Singleton
@Slf4j
@Requires(property = "kestra.server-type", pattern = "(EXECUTOR|STANDALONE)")
public class FlowTopologyHandler implements AutoCloseable {
private static final ObjectMapper MAPPER = ExecutorMapper.of();
private final QueueInterface<FlowInterface> flowQueue;
private final FlowTopologyRepositoryInterface flowTopologyRepository;
private final FlowTopologyService flowTopologyService;
private final PluginDefaultService pluginDefaultService;
private final FlowMetaStoreInterface flowMetaStore;
private Runnable cancellation;
@Inject
public FlowTopologyHandler(@Named(QueueFactoryInterface.FLOW_NAMED) QueueInterface<FlowInterface> flowQueue,
FlowTopologyRepositoryInterface flowTopologyRepository,
FlowTopologyService flowTopologyService,
PluginDefaultService pluginDefaultService,
FlowMetaStoreInterface flowMetaStore
) {
this.flowQueue = flowQueue;
this.flowTopologyRepository = flowTopologyRepository;
this.flowTopologyService = flowTopologyService;
this.pluginDefaultService = pluginDefaultService;
this.flowMetaStore = flowMetaStore;
}
// Make it a StartupEvent listener so it starts when Kestra start
@EventListener
public void run(StartupEvent event) {
cancellation = this.flowQueue.receive(FlowTopology.class, this::flowQueue); // TODO it should be FlowTopologyHandler but if we do this we may loose pending messages
}
private void flowQueue(Either<FlowInterface, DeserializationException> either) {
FlowInterface flow;
if (either.isRight()) {
log.error("Unable to deserialize a flow: {}", either.getRight().getMessage());
try {
var jsonNode = MAPPER.readTree(either.getRight().getRecord());
flow = FlowWithException.from(jsonNode, either.getRight()).orElseThrow(IOException::new);
} catch (IOException e) {
// if we cannot create a FlowWithException, ignore the message
log.error("Unexpected exception when trying to handle a deserialization error", e);
return;
}
} else {
flow = either.getLeft();
}
try {
flowTopologyRepository.save(
flow,
(flow.isDeleted() ?
Stream.<FlowTopology>empty() :
flowTopologyService
.topology(
pluginDefaultService.injectVersionDefaults(flow, true),
flowMetaStore.allLastVersion().stream().filter(f -> Objects.equals(f.getTenantId(), flow.getTenantId())).toList()
)
)
.distinct()
.toList()
);
} catch (Exception e) {
log.error("Unable to save flow topology for flow {}", flow.uid(), e);
}
}
@Override
@PreDestroy
public void close() {
if (cancellation != null) {
cancellation.run();
}
}
}

View File

@@ -62,7 +62,6 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
public static final Field<String> TENANT_FIELD = field("tenant_id", String.class);
public static final Field<String> SOURCE_FIELD = field("source_code", String.class);
private final QueueInterface<FlowInterface> flowQueue;
private final ApplicationEventPublisher<CrudEvent<FlowInterface>> eventPublisher;
private final ModelValidator modelValidator;
private final PluginDefaultService pluginDefaultService;
@@ -81,7 +80,6 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
this.modelValidator = applicationContext.getBean(ModelValidator.class);
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
this.flowQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.FLOW_NAMED));
this.jdbcRepository.setDeserializer(record -> {
String source = record.get("value", String.class);
String namespace = record.get("namespace", String.class);
@@ -703,12 +701,12 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
if (checkUpdate.isPresent()) {
throw checkUpdate.get();
}
// Persist
return this.save(flow, CrudEventType.UPDATE);
}
@SneakyThrows({QueueException.class, FlowProcessingException.class})
@SneakyThrows(FlowProcessingException.class)
@VisibleForTesting
public FlowWithSource save(GenericFlow flow, CrudEventType crudEventType) throws ConstraintViolationException {
@@ -733,7 +731,6 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
this.jdbcRepository.persist(flow, fields);
flowQueue.emit(flow);
eventPublisher.publishEvent(new CrudEvent<>(flow, nullOrExisting, crudEventType));
return flowWithSource.toBuilder().revision(revision).build();
@@ -763,7 +760,6 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
this.jdbcRepository.persist(deleted, fields);
flowQueue.emit(deleted);
eventPublisher.publishEvent(CrudEvent.delete(flow));
return deleted;

View File

@@ -1,119 +0,0 @@
package io.kestra.scheduler.pubsub;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerId;
import io.kestra.core.models.triggers.WorkerTriggerInterface;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.Scheduler;
import io.kestra.core.services.FlowService;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.utils.Disposable;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.scheduler.TriggerEventQueue;
import io.kestra.core.scheduler.events.TriggerCreated;
import io.kestra.core.scheduler.events.TriggerDeleted;
import io.kestra.core.scheduler.events.TriggerEvent;
import io.kestra.core.scheduler.events.TriggerUpdated;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@Singleton
@Requires(property = "kestra.server-type", pattern = "(SCHEDULER|STANDALONE)")
@Slf4j
public class TriggerEventPublisher implements AutoCloseable {
private final QueueInterface<FlowInterface> flowQueue;
private final TriggerEventQueue triggerEventQueue;
private final FlowRepositoryInterface flowRepository;
private final PluginDefaultService pluginDefaultService;
private Disposable cancellation;
@Inject
public TriggerEventPublisher(@Named(QueueFactoryInterface.FLOW_NAMED) QueueInterface<FlowInterface> flowQueue,
TriggerEventQueue triggerEventQueue,
FlowRepositoryInterface flowRepository,
PluginDefaultService pluginDefaultService) {
this.flowQueue = flowQueue;
this.triggerEventQueue = triggerEventQueue;
this.flowRepository = flowRepository;
this.pluginDefaultService = pluginDefaultService;
}
// Make it a StartupEvent listener so it starts when Kestra start
@EventListener
public void run(StartupEvent event) {
this.cancellation = Disposable.of(this.flowQueue.receive(Scheduler.class, either -> {
if (either.isRight()) {
log.error("Unable to deserialize a flow: {}", either.getRight().getMessage());
return;
}
FlowWithSource flow;
try {
flow = pluginDefaultService.injectVersionDefaults(either.getLeft(), true);
} catch (FlowProcessingException e) {
log.error("Unable to inject version defaults for flow {}", either.getLeft().getId(), e);
return;
}
var previous = flow.getRevision() <= 1 ? null : flowRepository.findById(flow.getTenantId(), flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision() - 1)).orElse(null);
if (flow.isDeleted() || previous != null) {
List<AbstractTrigger> triggersDeleted = flow.isDeleted() ?
ListUtils.emptyOnNull(flow.getTriggers()) :
FlowService.findRemovedTrigger(flow, previous);
triggersDeleted.forEach(trigger ->
sendEvent(new TriggerDeleted(TriggerId.of(flow, trigger)))
);
}
if (previous != null && !Objects.equals(previous.getRevision(), flow.getRevision())) {
FlowService.findUpdatedTrigger(flow, previous)
.stream()
.filter(trigger -> trigger instanceof WorkerTriggerInterface)
.forEach(trigger ->
sendEvent(new TriggerUpdated(TriggerId.of(flow, trigger), flow.getRevision()))
);
return;
}
if (flow.getTriggers() != null) {
flow.getTriggers()
.stream()
.filter(trigger -> trigger instanceof WorkerTriggerInterface)
.forEach(trigger ->
sendEvent(new TriggerCreated(TriggerId.of(flow, trigger), flow.getRevision()))
);
}
}));
}
@PreDestroy
@Override
public void close() {
this.cancellation.dispose();
}
private void sendEvent(TriggerEvent event) {
this.triggerEventQueue.send(event);
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.junit.extensions;
import static io.kestra.core.junit.extensions.ExtensionUtils.loadFile;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.models.executions.Execution;
@@ -10,9 +11,9 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.time.Duration;
@@ -62,9 +63,10 @@ public class FlowExecutorExtension implements AfterEachCallback, ParameterResolv
}
@Override
public void afterEach(ExtensionContext extensionContext) throws URISyntaxException {
public void afterEach(ExtensionContext extensionContext) throws Exception {
ExecuteFlow executeFlow = getExecuteFlow(extensionContext);
FlowRepositoryInterface flowRepository = context.getBean(FlowRepositoryInterface.class);
FlowService flowService = context.getBean(FlowService.class);
String path = executeFlow.value();
URL resource = loadFile(path);
@@ -72,13 +74,12 @@ public class FlowExecutorExtension implements AfterEachCallback, ParameterResolv
flowRepository.findAllForAllTenants().stream()
.filter(flow -> Objects.equals(flow.getId(), loadedFlow.getId()))
.filter(flow -> Objects.equals(flow.getTenantId(), executeFlow.tenantId()))
.forEach(flow -> flowRepository.delete(FlowWithSource.of(flow, "unused")));
.forEach(throwConsumer(flow -> flowService.delete(FlowWithSource.of(flow, "unused"))));
}
private static ExecuteFlow getExecuteFlow(ExtensionContext extensionContext) {
ExecuteFlow executeFlow = extensionContext.getTestMethod()
return extensionContext.getTestMethod()
.orElseThrow()
.getAnnotation(ExecuteFlow.class);
return executeFlow;
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.junit.extensions;
import static io.kestra.core.junit.extensions.ExtensionUtils.loadFile;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.flows.Flow;
@@ -9,6 +10,7 @@ import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
@@ -51,9 +53,10 @@ public class FlowLoaderExtension implements BeforeEachCallback, AfterEachCallbac
}
@Override
public void afterEach(ExtensionContext extensionContext) throws URISyntaxException {
public void afterEach(ExtensionContext extensionContext) throws Exception {
LoadFlows loadFlows = getLoadFlows(extensionContext);
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
FlowService flowService = applicationContext.getBean(FlowService.class);
ExecutionRepositoryInterface executionRepository = applicationContext.getBean(ExecutionRepositoryInterface.class);
Set<String> flowIds = new HashSet<>();
@@ -65,11 +68,11 @@ public class FlowLoaderExtension implements BeforeEachCallback, AfterEachCallbac
flowRepository.findAllForAllTenants().stream()
.filter(flow -> flowIds.contains(flow.getId()))
.filter(flow -> loadFlows.tenantId().equals(flow.getTenantId()))
.forEach(flow -> {
flowRepository.delete(FlowWithSource.of(flow, "unused"));
.forEach(throwConsumer(flow -> {
flowService.delete(FlowWithSource.of(flow, "unused"));
executionRepository.findByFlowId(loadFlows.tenantId(), flow.getNamespace(), flow.getId(), Pageable.UNPAGED)
.forEach(executionRepository::delete);
});
}));
}
private static LoadFlows getLoadFlows(ExtensionContext extensionContext) {

View File

@@ -17,6 +17,7 @@ import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlParser;
@@ -61,6 +62,9 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Validated
@Controller("/api/v1/{tenant}/flows")
@Slf4j
@@ -273,7 +277,7 @@ public class FlowController {
@SneakyThrows
protected FlowWithSource doCreate(final GenericFlow flow) {
try {
return flowService.create(flow, true);
return flowService.create(flow);
} catch (FlowProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException cve) {
throw cve;
@@ -291,11 +295,11 @@ public class FlowController {
description = "All flow will be created / updated for this namespace.\n" +
"Flow that already created but not in `flows` will be deleted if the query delete is `true`"
)
public List<FlowInterface> updateFlowsInNamespace(
public List<FlowWithSource> updateFlowsInNamespace(
@Parameter(description = "The flow namespace") @PathVariable String namespace,
@RequestBody(description = "A list of flows source code") @Body @Nullable String flows,
@Parameter(description = "If missing flow should be deleted") @QueryValue(defaultValue = "true") Boolean delete
) throws ConstraintViolationException {
) throws Exception {
List<String> sources = flows != null ? List.of(flows.split("---")) : new ArrayList<>();
List<GenericFlow> genericFlows = sources
@@ -306,7 +310,7 @@ public class FlowController {
return this.bulkUpdateOrCreate(namespace, genericFlows, delete, false);
}
protected List<FlowInterface> bulkUpdateOrCreate(@Nullable String namespace, List<GenericFlow> flows, Boolean delete, Boolean allowNamespaceChild) {
protected List<FlowWithSource> bulkUpdateOrCreate(@Nullable String namespace, List<GenericFlow> flows, Boolean delete, Boolean allowNamespaceChild) throws Exception {
if (namespace != null) {
// control namespace to update
@@ -352,7 +356,7 @@ public class FlowController {
.toList();
// delete all not in updated ids
List<? extends FlowInterface> deleted = new ArrayList<>();
List<FlowWithSource> deleted = new ArrayList<>();
if (delete) {
if (namespace != null) {
deleted = flowRepository
@@ -363,17 +367,17 @@ public class FlowController {
}
deleted = deleted.stream()
.filter(flow -> !ids.contains(flow.getId()))
.peek(flow -> flowRepository.delete(flow))
.peek(throwConsumer(flow -> flowService.delete(flow)))
.toList();
}
// update or create flows
List<? extends FlowInterface> updatedOrCreated = flows.stream()
.map(flow ->
List<FlowWithSource> updatedOrCreated = flows.stream()
.map(throwFunction(flow ->
flowRepository.findById(tenantService.resolveTenant(), flow.getNamespace(), flow.getId())
.map(existing -> flowRepository.update(flow, existing))
.map(throwFunction(existing -> flowService.update(flow, existing)))
.orElseGet(() -> this.doCreate(flow))
)
))
.toList();
return Stream.concat(deleted.stream(), updatedOrCreated.stream()).toList();
}
@@ -386,7 +390,7 @@ public class FlowController {
@Parameter(description = "The flow namespace") @PathVariable String namespace,
@Parameter(description = "The flow id") @PathVariable String id,
@RequestBody(description = "The flow source code") @Body String source
) throws ConstraintViolationException, FlowProcessingException {
) throws ConstraintViolationException, FlowProcessingException, QueueException {
final String tenantId = tenantService.resolveTenant();
Optional<Flow> existingFlow = flowRepository.findById(tenantId, namespace, id);
@@ -397,16 +401,8 @@ public class FlowController {
// Parse source as RawFlow.
GenericFlow genericFlow = GenericFlow.fromYaml(tenantId, source);
// Validate Subflows.
// Inject default plugin 'version' props before converting
// to flow to correctly resolve to plugin type.
try {
FlowWithSource flow = pluginDefaultService.injectVersionDefaults(genericFlow, false);
flowService.checkValidSubflows(flow, tenantId);
// Persist
return HttpResponse.ok(updateFlow(genericFlow, existingFlow.get()));
return HttpResponse.ok(doUpdateFlow(genericFlow, existingFlow.get()));
} catch (FlowProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException cve) {
throw cve;
@@ -416,8 +412,8 @@ public class FlowController {
}
}
protected FlowWithSource updateFlow(GenericFlow current, FlowInterface previous) {
return flowRepository.update(current, previous);
protected FlowWithSource doUpdateFlow(GenericFlow current, FlowInterface previous) throws FlowProcessingException, QueueException {
return flowService.update(current, previous);
}
@ExecuteOn(TaskExecutors.IO)
@@ -428,12 +424,12 @@ public class FlowController {
description = "All flow will be created / updated for this namespace.\n" +
"Flow that already created but not in `flows` will be deleted if the query delete is `true`"
)
public List<FlowInterface> bulkUpdateFlows(
public List<FlowWithSource> bulkUpdateFlows(
@RequestBody(description = "A list of flows source code splitted with \"---\"") @Body @Nullable String flows,
@Parameter(description = "If missing flow should be deleted") @QueryValue(defaultValue = "true") Boolean delete,
@Parameter(description = "The namespace where to update flows") @QueryValue @Nullable String namespace,
@Parameter(description = "If namespace child should are allowed to be updated") @QueryValue(defaultValue = "false") Boolean allowNamespaceChild
) throws ConstraintViolationException {
) throws Exception {
List<String> sources = flows != null ? List.of(flows.split("---")) : new ArrayList<>();
List<GenericFlow> genericFlows = sources.stream()
.map(source -> GenericFlow.fromYaml(tenantService.resolveTenant(), source))
@@ -448,10 +444,10 @@ public class FlowController {
public HttpResponse<Void> deleteFlow(
@Parameter(description = "The flow namespace") @PathVariable String namespace,
@Parameter(description = "The flow id") @PathVariable String id
) {
) throws QueueException {
Optional<FlowWithSource> flow = flowRepository.findByIdWithSource(tenantService.resolveTenant(), namespace, id);
if (flow.isPresent()) {
flowRepository.delete(flow.get());
flowService.delete(flow.get());
return HttpResponse.status(HttpStatus.NO_CONTENT);
} else {
return HttpResponse.status(HttpStatus.NOT_FOUND);
@@ -628,13 +624,13 @@ public class FlowController {
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
) {
) throws QueueException {
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
List<Flow> list = flowRepository
.findWithSource(Pageable.UNPAGED, tenantService.resolveTenant(), filters)
.stream()
.peek(flowRepository::delete)
.peek(throwConsumer(flow -> flowService.delete(flow)))
.collect(Collectors.toList());
return HttpResponse.ok(BulkResponse.builder().count(list.size()).build());
@@ -648,11 +644,11 @@ public class FlowController {
)
public HttpResponse<BulkResponse> deleteFlowsByIds(
@RequestBody(description = "A list of tuple flow ID and namespace as flow identifiers") @Body List<IdWithNamespace> ids
) {
) throws QueueException {
List<Flow> list = ids
.stream()
.map(id -> flowRepository.findByIdWithSource(tenantService.resolveTenant(), id.getNamespace(), id.getId()).orElseThrow())
.peek(flowRepository::delete)
.peek(throwConsumer(flow -> flowService.delete(flow)))
.collect(Collectors.toList());
return HttpResponse.ok(BulkResponse.builder().count(list.size()).build());
@@ -671,7 +667,7 @@ public class FlowController {
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
) {
) throws Exception {
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
return HttpResponse.ok(BulkResponse.builder().count(setFlowsDisableByQuery(filters, true).size()).build());
@@ -685,7 +681,7 @@ public class FlowController {
)
public HttpResponse<BulkResponse> disableFlowsByIds(
@RequestBody(description = "A list of tuple flow ID and namespace as flow identifiers") @Body List<IdWithNamespace> ids
) {
) throws Exception {
return HttpResponse.ok(BulkResponse.builder().count(setFlowsDisableByIds(ids, true).size()).build());
}
@@ -703,7 +699,7 @@ public class FlowController {
@Deprecated @Parameter(description = "The scope of the flows to include", deprecated = true) @Nullable @QueryValue List<FlowScope> scope,
@Deprecated @Parameter(description = "A namespace filter prefix", deprecated = true) @Nullable @QueryValue String namespace,
@Deprecated @Parameter(description = "A labels filter as a list of 'key:value'", deprecated = true) @Nullable @QueryValue @Format("MULTI") List<String> labels
) {
) throws Exception {
filters = mapLegacyQueryParamsToNewFilters(filters, query, scope, namespace, labels);
return HttpResponse.ok(BulkResponse.builder().count(setFlowsDisableByQuery(filters, false).size()).build());
@@ -735,7 +731,7 @@ public class FlowController {
)
public HttpResponse<BulkResponse> enableFlowsByIds(
@RequestBody(description = "A list of tuple flow ID and namespace as flow identifiers") @Body List<IdWithNamespace> ids
) {
) throws Exception {
return HttpResponse.ok(BulkResponse.builder().count(setFlowsDisableByIds(ids, false).size()).build());
}
@@ -802,27 +798,27 @@ public class FlowController {
flowService.importFlow(tenantId, source);
}
protected List<FlowWithSource> setFlowsDisableByIds(List<IdWithNamespace> ids, boolean disable) {
protected List<FlowWithSource> setFlowsDisableByIds(List<IdWithNamespace> ids, boolean disable) throws Exception {
return ids
.stream()
.map(id -> flowRepository.findByIdWithSource(tenantService.resolveTenant(), id.getNamespace(), id.getId()).orElseThrow())
.filter(flowWithSource -> disable != flowWithSource.isDisabled())
.peek(flow -> {
.peek(throwConsumer(flow -> {
GenericFlow genericFlowUpdated = parseFlowSource(FlowService.injectDisabled(flow.getSource(), disable));
flowRepository.update(genericFlowUpdated, flow);
})
flowService.update(genericFlowUpdated, flow);
}))
.toList();
}
protected List<FlowWithSource> setFlowsDisableByQuery(List<QueryFilter> filters, boolean disable) {
protected List<FlowWithSource> setFlowsDisableByQuery(List<QueryFilter> filters, boolean disable) throws Exception {
return flowRepository
.findWithSource(Pageable.UNPAGED, tenantService.resolveTenant(), filters)
.stream()
.filter(flowWithSource -> disable != flowWithSource.isDisabled())
.peek(flow -> {
.peek(throwConsumer(flow -> {
GenericFlow genericFlowUpdated = parseFlowSource(FlowService.injectDisabled(flow.getSource(), disable));
flowRepository.update(genericFlowUpdated, flow);
})
flowService.update(genericFlowUpdated, flow);
}))
.toList();
}

View File

@@ -1,7 +1,7 @@
package io.kestra.webserver.services;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowService;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.VersionProvider;
@@ -23,6 +23,8 @@ import reactor.core.publisher.Mono;
import java.util.Objects;
import java.util.function.Function;
import static io.kestra.core.utils.Rethrow.throwFunction;
/**
* Service for automatically loading initial flows from the community blueprints at server startup.
*/
@@ -35,17 +37,18 @@ public class FlowAutoLoaderService {
public static final String PURGE_SYSTEM_FLOW_BLUEPRINT_ID = "234";
@Inject
protected FlowRepositoryInterface repository;
private FlowService flowService;
@Inject
@Client("api")
protected HttpClient httpClient;
private HttpClient httpClient;
@Inject
private NamespaceUtils namespaceUtils;
@Inject
private VersionProvider versionProvider;
@Inject
private TenantService tenantService;
@@ -72,12 +75,12 @@ public class FlowAutoLoaderService {
return body;
})
)
.map(source -> {
.map(throwFunction(source -> {
GenericFlow flow = GenericFlow.fromYaml(tenantService.resolveTenant(), source);
repository.create(flow);
flowService.create(flow);
log.debug("Loaded flow '{}/{}'.", flow.getNamespace(), flow.getId());
return 1;
})
}))
.onErrorReturn(0)
.onErrorContinue((throwable, o) -> {
// log error in debug to not spam user with stacktrace, e.g., flow maybe already registered.