Compare commits

...

6 Commits

Author SHA1 Message Date
Roman Acevedo
0f8fadf818 tests(system): add warn log to debug 2025-05-16 15:00:50 +02:00
YannC.
4782b2cb97 test: make sure that the flow listener is loaded 2025-05-15 16:56:15 +02:00
Roman Acevedo
80f357968a Revert "tests(system): add log to debug"
This reverts commit 8aceb0c2b4ef48036ea8793f9d2d5f9adade95ec.
2025-05-15 09:40:54 +02:00
Roman Acevedo
f0cfe27d76 tests(system): add log to debug 2025-05-15 09:40:54 +02:00
Roman Acevedo
af1dcedbc8 Update MultipleConditionTriggerCaseTest.java 2025-05-15 09:40:54 +02:00
Roman Acevedo
791f2f52dc tests(system): try to unflaky AbstractRunnerTest.flowTriggerOnPaused 2025-05-15 09:40:54 +02:00

View File

@@ -1,14 +1,20 @@
package io.kestra.core.runners; package io.kestra.core.runners;
import io.kestra.core.queues.QueueException;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
@@ -19,15 +25,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton @Singleton
@Slf4j
public class MultipleConditionTriggerCaseTest { public class MultipleConditionTriggerCaseTest {
@Inject @Inject
@@ -217,8 +219,15 @@ public class MultipleConditionTriggerCaseTest {
CountDownLatch countDownLatch = new CountDownLatch(1); CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Execution> flowTrigger = new AtomicReference<>(); AtomicReference<Execution> flowTrigger = new AtomicReference<>();
Await.until(() -> this.flowRepository.findById(null, "io.kestra.tests.trigger.paused", "flow-trigger-paused-listen").isPresent(),
Duration.ofMillis(100), Duration.ofSeconds(5));
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> { Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft(); Execution execution = either.getLeft();
if(execution.getFlowId().equals("flow-trigger-paused-listen")) {
log.warn("debug exec: {}", execution);// TODO only to debug
}
if (execution.getState().getCurrent() == State.Type.SUCCESS && execution.getFlowId() if (execution.getState().getCurrent() == State.Type.SUCCESS && execution.getFlowId()
.equals("flow-trigger-paused-listen")) { .equals("flow-trigger-paused-listen")) {
flowTrigger.set(execution); flowTrigger.set(execution);
@@ -227,12 +236,12 @@ public class MultipleConditionTriggerCaseTest {
}); });
Execution execution = runnerUtils.runOne(null, "io.kestra.tests.trigger.paused", Execution execution = runnerUtils.runOne(null, "io.kestra.tests.trigger.paused",
"flow-trigger-paused-flow", Duration.ofSeconds(60)); "flow-trigger-paused-flow");
assertThat(execution.getTaskRunList().size()).isEqualTo(2); assertThat(execution.getTaskRunList().size()).isEqualTo(2);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS); assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// trigger is done // trigger is done
assertTrue(countDownLatch.await(1, TimeUnit.SECONDS)); assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
receive.blockLast(); receive.blockLast();
assertThat(flowTrigger.get()).isNotNull(); assertThat(flowTrigger.get()).isNotNull();