fix(system): compilation issue and test failures

This commit is contained in:
Loïc Mathieu
2025-12-08 15:42:58 +01:00
parent 938e17d59c
commit 6bf234b16f
8 changed files with 102 additions and 87 deletions

View File

@@ -1,12 +1,12 @@
package io.kestra.cli.services;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.*;
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
@KestraTest(environments = {"test", "file-watch"})
class FileChangedEventListenerTest {
public static final String FILE_WATCH = "build/file-watch";
@Inject

View File

@@ -2,6 +2,7 @@ package io.kestra.core.repositories;
import java.util.List;
// FIXME rename it to something like IndexedRepository and only implement it for indexed entities
public interface SaveRepositoryInterface<T> {
T save(T item);

View File

@@ -1,5 +1,6 @@
package io.kestra.core.services;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
@@ -8,7 +9,9 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ConcurrencyLimitRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.core.runners.ExecutionEvent;
import io.kestra.core.runners.ExecutionEventType;
import io.kestra.core.runners.RunnerUtils;
@@ -20,6 +23,8 @@ import org.junit.jupiter.api.TestInstance;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -32,7 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ConcurrencyLimitServiceTest {
private static final String TESTS_FLOW_NS = "io.kestra.tests";
private static final String TENANT_ID = "main";
private static final String CONCURRENCY_LIMIT_SERVICE_TEST_UNQUEUE_EXECUTION_TENANT = "concurrency_limit_service_test_unqueue_execution_tenant";
@Inject
@@ -52,6 +56,9 @@ class ConcurrencyLimitServiceTest {
@Inject
private ConcurrencyLimitService concurrencyLimitService;
@Inject
private ConcurrencyLimitRepositoryInterface concurrencyLimitRepository;
@Test
@LoadFlows(value = "flows/valids/flow-concurrency-queue.yml", tenantId = CONCURRENCY_LIMIT_SERVICE_TEST_UNQUEUE_EXECUTION_TENANT)
void unqueueExecution() throws QueueException, TimeoutException, InterruptedException {
@@ -79,7 +86,7 @@ class ConcurrencyLimitServiceTest {
@Test
@ExecuteFlow(value = "flows/valids/flow-concurrency-queue.yml", tenantId = "concurrency_limit_service_test_find_by_id_tenant")
void findById(Execution execution) {
Optional<ConcurrencyLimit> limit = concurrencyLimitService.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId());
Optional<ConcurrencyLimit> limit = concurrencyLimitRepository.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId());
assertThat(limit).isNotEmpty();
assertThat(limit.get().getTenantId()).isEqualTo(execution.getTenantId());
@@ -91,14 +98,14 @@ class ConcurrencyLimitServiceTest {
@Test
@ExecuteFlow(value = "flows/valids/flow-concurrency-queue.yml", tenantId = "concurrency_limit_service_test_update_tenant")
void update(Execution execution) {
Optional<ConcurrencyLimit> limit = concurrencyLimitService.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId());
Optional<ConcurrencyLimit> limit = concurrencyLimitRepository.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId());
assertThat(limit).isNotEmpty();
ConcurrencyLimit updated = limit.get().withRunning(99);
concurrencyLimitService.update(updated);
concurrencyLimitRepository.update(updated);
limit = concurrencyLimitService.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId());
limit = concurrencyLimitRepository.findById(execution.getTenantId(), execution.getNamespace(), execution.getFlowId());
assertThat(limit).isNotEmpty();
assertThat(limit.get().getRunning()).isEqualTo(99);
}
@@ -106,7 +113,7 @@ class ConcurrencyLimitServiceTest {
@Test
@ExecuteFlow(value = "flows/valids/flow-concurrency-queue.yml", tenantId = "concurrency_limit_service_test_list_tenant")
void list(Execution execution) {
List<ConcurrencyLimit> list = concurrencyLimitService.find(execution.getTenantId());
List<ConcurrencyLimit> list = concurrencyLimitRepository.find(execution.getTenantId());
assertThat(list).isNotEmpty();
assertThat(list.getFirst().getTenantId()).isEqualTo(execution.getTenantId());
@@ -114,10 +121,6 @@ class ConcurrencyLimitServiceTest {
assertThat(list.getFirst().getFlowId()).isEqualTo(execution.getFlowId());
}
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
return runUntilQueued(TENANT_ID, namespace, flowId);
}
private Execution runUntilQueued(String tenantId, String namespace, String flowId) throws TimeoutException, QueueException {
return runUntilState(tenantId, namespace, flowId, State.Type.QUEUED);
}

View File

@@ -179,4 +179,10 @@ public abstract class AbstractJdbcNamespaceFileMetadataRepository extends Abstra
return nsFileMetadataToPersist;
});
}
@Override
public int saveBatch(List<NamespaceFileMetadata> items) {
// FIXME should not be needed as it is not indexed
return items.stream().map(it -> this.save(it)).toList().size();
}
}

View File

@@ -26,39 +26,39 @@ import java.util.concurrent.locks.ReentrantLock;
* (once every second) and for processing any queued trigger events.
*/
public class TriggerSchedulingLoop implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(TriggerSchedulingLoop.class);
private static final long SCHEDULE_INTERVAL_MILLIS = Duration.ofSeconds(1).toMillis();
private final int schedulingLoopId;
private final TriggerScheduler triggerScheduler;
private final Clock clock;
// Queue
private final BlockingQueue<CompletableTriggerEvent> triggerEventQueue = new LinkedBlockingQueue<>();
private final ReentrantLock triggerEventQueueLock = new ReentrantLock();
private final Condition notEmptyTriggerEventQueue = triggerEventQueueLock.newCondition();
// Services
private final TriggerEventHandler triggerEventHandler;
// Threading
private volatile Thread thread;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean running = new AtomicBoolean(false);
private final CountDownLatch stopped = new CountDownLatch(1);
// Pause & Resume
private final AtomicBoolean paused = new AtomicBoolean(false);
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition unpaused = pauseLock.newCondition();
private final BlockingQueue<Runnable> internalLoopCallables = new LinkedBlockingQueue<>();
private final Set<Integer> assignments = new HashSet<>();
/**
* Creates a new {@link TriggerSchedulingLoop} instance.
*
@@ -76,7 +76,7 @@ public class TriggerSchedulingLoop implements Runnable {
this.triggerEventHandler = triggerEventHandler;
this.clock = clock;
}
/**
* Gets the identifier of this event-loop.
*
@@ -85,7 +85,7 @@ public class TriggerSchedulingLoop implements Runnable {
public int id() {
return this.schedulingLoopId;
}
/**
* {@inheritDoc}
**/
@@ -94,7 +94,7 @@ public class TriggerSchedulingLoop implements Runnable {
if (!this.running.compareAndSet(false, true)) {
throw new IllegalStateException("Already running");
}
this.thread = Thread.currentThread();
Instant nextScheduleTime = clock.instant();
Instant tick = clock.instant();
@@ -108,14 +108,14 @@ public class TriggerSchedulingLoop implements Runnable {
LOG.warn("Thread starvation, clock leap detected, or too many triggers to evaluate (elapsed since previous loop {}ms)", elapsed);
}
tick = clock.instant();
waitIfPaused();
// Check if the loop was stopped while being paused
if (!running.get()) {
continue;
}
// Check whether vNodes are available for this event-loop
// The list of vNodes assignments can be empty if:
// 1. This scheduler is starting
@@ -128,26 +128,26 @@ public class TriggerSchedulingLoop implements Runnable {
}
continue;
}
final Instant now = clock.instant();
if (!initialized.get()) {
triggerScheduler.onStart(clock, now, assignments);
initialized.set(true);
}
// Process all received triggers events for current assignments.
processTriggerEvents();
// Check whether triggers should be scheduled
if (now.isAfter(nextScheduleTime) || now.equals(nextScheduleTime)) {
triggerScheduler.onSchedule(clock, now, assignments);
nextScheduleTime = nextScheduleTime.plusMillis(SCHEDULE_INTERVAL_MILLIS);
}
// Execute end-loop actions
doOnEndLoop();
// May wait before next iteration
long waitMillis = Math.max(0, nextScheduleTime.toEpochMilli() - clock.instant().toEpochMilli());
if (waitMillis > 0) {
@@ -167,7 +167,7 @@ public class TriggerSchedulingLoop implements Runnable {
LOG.info("[{}-{}] stopped", getClass().getSimpleName(), schedulingLoopId);
}
}
private void waitForNextIterationOrNewEvent(final Duration duration) throws InterruptedException {
if (triggerEventQueue.isEmpty()) {
triggerEventQueueLock.lock();
@@ -180,7 +180,7 @@ public class TriggerSchedulingLoop implements Runnable {
}
}
}
/**
* Stops this loop.
* <p>
@@ -191,9 +191,9 @@ public class TriggerSchedulingLoop implements Runnable {
LOG.debug("[{}] stop() called but not running", getClass().getSimpleName());
return;
}
resume(); // In case it's paused and blocked
if (this.thread != null) {
this.thread.interrupt();
try {
@@ -205,7 +205,7 @@ public class TriggerSchedulingLoop implements Runnable {
}
}
}
private void waitIfPaused() throws InterruptedException {
if (!paused.get()) {
return; // return immediately
@@ -221,16 +221,16 @@ public class TriggerSchedulingLoop implements Runnable {
pauseLock.unlock();
}
}
/**
* Gets the current assignment for this event loop.
*
*
* @return the assignments.
*/
public Set<Integer> assignments() {
return assignments;
}
public void setAssignments(final Set<Integer> assignments) {
this.assignments.clear();
if (assignments != null) {
@@ -238,7 +238,7 @@ public class TriggerSchedulingLoop implements Runnable {
}
this.initialized.set(false);
}
/**
* Pauses this event-loop instance.
*/
@@ -250,7 +250,7 @@ public class TriggerSchedulingLoop implements Runnable {
pauseLock.unlock();
}
}
/**
* Registers an {@link Runnable action} that will be executed on next end loop.
*
@@ -269,7 +269,7 @@ public class TriggerSchedulingLoop implements Runnable {
});
return future;
}
/**
* Resumes this event-loop instance if currently paused.
*/
@@ -283,7 +283,7 @@ public class TriggerSchedulingLoop implements Runnable {
pauseLock.unlock();
}
}
/**
*
* @param events The trigger events.
@@ -292,7 +292,7 @@ public class TriggerSchedulingLoop implements Runnable {
List<CompletableFuture<Void>> futures = events.stream().map(event -> addTriggerEvent(vNode, event)).toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
/**
*
* @param event The trigger event.
@@ -308,7 +308,7 @@ public class TriggerSchedulingLoop implements Runnable {
}
return completable;
}
/**
* Processes all trigger events currently queued by this scheduling loop.
*
@@ -329,31 +329,32 @@ public class TriggerSchedulingLoop implements Runnable {
});
return drained.size();
}
private void doOnEndLoop() {
List<Runnable> drained = new ArrayList<>();
internalLoopCallables.drainTo(drained);
for (Runnable runnable : drained) {
runnable.run();
}
}
/**
* Wraps a {@link TriggerEvent} with the associated Virtual Node (vNodes).
*/
public static class CompletableTriggerEvent extends CompletableFuture<Void> {
public static class
CompletableTriggerEvent extends CompletableFuture<Void> {
private final TriggerEvent event;
private final Integer vnode;
public CompletableTriggerEvent(TriggerEvent event, Integer vnode) {
this.event = Objects.requireNonNull(event, "event must not be null");
this.vnode = Objects.requireNonNull(vnode, "vnode must not be null");
}
public TriggerEvent event() { return event; }
public Integer vnode() { return vnode; }
}
}

View File

@@ -49,7 +49,7 @@ public class TestRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(Executor.class));
Executor executor = applicationContext.getBean(Executor.class);
servers.add(executor);
poolExecutor.execute(executor);

View File

@@ -20,10 +20,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.InputsTest;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.*;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFactory;
@@ -98,6 +95,10 @@ class ExecutionControllerRunnerTest {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_EVENT_NAMED)
protected QueueInterface<ExecutionEvent> executionEventQueue;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@@ -1314,9 +1315,9 @@ class ExecutionControllerRunnerTest {
// listen to the execution queue
AtomicReference<Execution> killedExecution = new AtomicReference<>();
CountDownLatch killedLatch = new CountDownLatch(1);
Flux<Execution> receiveExecutions = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getState().getCurrent() == State.Type.KILLED) {
killedExecution.set(e.getLeft());
Flux<ExecutionEvent> receiveExecutions = TestsUtils.receive(executionEventQueue, e -> {
if (e.getLeft().eventType() == ExecutionEventType.TERMINATED) {
killedExecution.set(executionRepositoryInterface.findById(e.getLeft().tenantId(), e.getLeft().executionId()).orElseThrow());
killedLatch.countDown();
}
});

View File

@@ -1,13 +1,16 @@
package io.kestra.webserver.controllers.api;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.Scheduler;
import io.kestra.core.scheduler.SchedulerConfiguration;
import io.kestra.core.scheduler.model.TriggerState;
import io.kestra.core.services.FlowService;
import io.kestra.core.tasks.test.PollingTrigger;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
@@ -55,7 +58,7 @@ class TriggerControllerTest {
ReactorHttpClient client;
@Inject
AbstractJdbcFlowRepository jdbcFlowRepository;
FlowService flowService;
@Inject
AbstractJdbcTriggerRepository jdbcTriggerRepository;
@@ -79,10 +82,10 @@ class TriggerControllerTest {
@SuppressWarnings("unchecked")
@Test
void shouldFindTriggersGivenQueryOnIdPrefix() {
void shouldFindTriggersGivenQueryOnIdPrefix() throws FlowProcessingException, QueueException {
// GIVEN
Flow flow = generateFlow();
jdbcFlowRepository.create(GenericFlow.of(flow));
flowService.create(GenericFlow.of(flow));
createTriggersFromFlow(flow).forEach(jdbcTriggerRepository::save);
// WHEN
@@ -105,10 +108,10 @@ class TriggerControllerTest {
@SuppressWarnings("unchecked")
@Test
void shouldFindTriggersGivenQueryOnNamespace() {
void shouldFindTriggersGivenQueryOnNamespace() throws FlowProcessingException, QueueException {
// GIVEN
Flow flow = generateFlow();
jdbcFlowRepository.create(GenericFlow.of(flow));
flowService.create(GenericFlow.of(flow));
createTriggersFromFlow(flow).forEach(jdbcTriggerRepository::save);
// WHEN
@@ -131,10 +134,10 @@ class TriggerControllerTest {
@SuppressWarnings("unchecked")
@Test
void shouldFindTriggersGivenFilterOnNamespace() {
void shouldFindTriggersGivenFilterOnNamespace() throws FlowProcessingException, QueueException {
// GIVEN
Flow flow = generateFlow();
jdbcFlowRepository.create(GenericFlow.of(flow));
flowService.create(GenericFlow.of(flow));
List<TriggerState> states = createTriggersFromFlow(flow);
states.forEach(jdbcTriggerRepository::save);
@@ -211,10 +214,10 @@ class TriggerControllerTest {
}
@Test
void shouldGetNoContentWhenRestartingTriggerGivenExist() {
void shouldGetNoContentWhenRestartingTriggerGivenExist() throws FlowProcessingException, QueueException {
// GIVEN
Flow flow = generateFlow();
jdbcFlowRepository.create(GenericFlow.of(flow));
flowService.create(GenericFlow.of(flow));
TriggerState trigger = TriggerState.builder()
.flowId(flow.getId())
@@ -340,14 +343,14 @@ class TriggerControllerTest {
}
@Test
void shouldSetDisabledByTriggerIdsGivenFalse() {
void shouldSetDisabledByTriggerIdsGivenFalse() throws FlowProcessingException, QueueException {
// GIVEN
String namespace = IdUtils.create();
String namespace = "ns-" + IdUtils.create().toLowerCase();
Flow flow1 = generateFlowWithTrigger(namespace);
Flow flow2 = generateFlowWithTrigger(namespace);
jdbcFlowRepository.create(GenericFlow.of(flow1));
jdbcFlowRepository.create(GenericFlow.of(flow2));
flowService.create(GenericFlow.of(flow1));
flowService.create(GenericFlow.of(flow2));
TriggerState triggerDisabled = jdbcTriggerRepository.save(createTriggerFromFlow(flow1, true));
TriggerState triggerNotDisabled = jdbcTriggerRepository.save(createTriggerFromFlow(flow2, false));
@@ -370,14 +373,14 @@ class TriggerControllerTest {
}
@Test
void shouldSetDisabledByTriggerIdsGivenTrue() {
void shouldSetDisabledByTriggerIdsGivenTrue() throws FlowProcessingException, QueueException {
// GIVEN
String namespace = IdUtils.create();
String namespace = "ns-" + IdUtils.create().toLowerCase();
Flow flow1 = generateFlowWithTrigger(namespace);
Flow flow2 = generateFlowWithTrigger(namespace);
jdbcFlowRepository.create(GenericFlow.of(flow1));
jdbcFlowRepository.create(GenericFlow.of(flow2));
flowService.create(GenericFlow.of(flow1));
flowService.create(GenericFlow.of(flow2));
TriggerState triggerDisabled = jdbcTriggerRepository.save(createTriggerFromFlow(flow1, true));
TriggerState triggerToDisable = jdbcTriggerRepository.save(createTriggerFromFlow(flow2, false));
@@ -401,14 +404,14 @@ class TriggerControllerTest {
}
@Test
void shouldSetDisabledByQueryGivenTrue() {
void shouldSetDisabledByQueryGivenTrue() throws FlowProcessingException, QueueException {
// GIVEN
String namespace = IdUtils.create();
String namespace = "ns-" + IdUtils.create().toLowerCase();
Flow flow1 = generateFlowWithTrigger(namespace);
Flow flow2 = generateFlowWithTrigger(namespace);
jdbcFlowRepository.create(GenericFlow.of(flow1));
jdbcFlowRepository.create(GenericFlow.of(flow2));
flowService.create(GenericFlow.of(flow1));
flowService.create(GenericFlow.of(flow2));
jdbcTriggerRepository.save(createTriggerFromFlow(flow1, true));
TriggerState toDisable = jdbcTriggerRepository.save(createTriggerFromFlow(flow2, false));
@@ -439,7 +442,7 @@ class TriggerControllerTest {
private Flow generateFlow() {
return Flow.builder()
.tenantId(TENANT_ID)
.namespace(IdUtils.create())
.namespace("ns-" + IdUtils.create().toLowerCase())
.id(IdUtils.create())
.tasks(Collections.singletonList(Return.builder()
.id("task")