mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
6 Commits
docs/retur
...
flaky/flow
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f8fadf818 | ||
|
|
4782b2cb97 | ||
|
|
80f357968a | ||
|
|
f0cfe27d76 | ||
|
|
af1dcedbc8 | ||
|
|
791f2f52dc |
@@ -1,14 +1,20 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
@@ -19,15 +25,11 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class MultipleConditionTriggerCaseTest {
|
||||
|
||||
@Inject
|
||||
@@ -217,8 +219,15 @@ public class MultipleConditionTriggerCaseTest {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
AtomicReference<Execution> flowTrigger = new AtomicReference<>();
|
||||
|
||||
|
||||
Await.until(() -> this.flowRepository.findById(null, "io.kestra.tests.trigger.paused", "flow-trigger-paused-listen").isPresent(),
|
||||
Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
if(execution.getFlowId().equals("flow-trigger-paused-listen")) {
|
||||
log.warn("debug exec: {}", execution);// TODO only to debug
|
||||
}
|
||||
if (execution.getState().getCurrent() == State.Type.SUCCESS && execution.getFlowId()
|
||||
.equals("flow-trigger-paused-listen")) {
|
||||
flowTrigger.set(execution);
|
||||
@@ -227,12 +236,12 @@ public class MultipleConditionTriggerCaseTest {
|
||||
});
|
||||
|
||||
Execution execution = runnerUtils.runOne(null, "io.kestra.tests.trigger.paused",
|
||||
"flow-trigger-paused-flow", Duration.ofSeconds(60));
|
||||
"flow-trigger-paused-flow");
|
||||
assertThat(execution.getTaskRunList().size()).isEqualTo(2);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
// trigger is done
|
||||
assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
|
||||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
receive.blockLast();
|
||||
assertThat(flowTrigger.get()).isNotNull();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user