mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(tests): add more test coverage for trigger evaluation failure
- This covers failures propagated to evaluateScheduleTrigger() in AbstractScheduler class related to ScheduleTrigger such as (Invalid Expressions, Inputs resolving Issues...)
This commit is contained in:
committed by
Loïc Mathieu
parent
e263224d7b
commit
1be16d5e9d
@@ -491,7 +491,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void failedEvaluationTest() {
|
||||
void failedEvaluation1() {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
Schedule schedule = createScheduleTrigger("Europe/Paris", "* * * * *", "failedEvaluation", false)
|
||||
@@ -527,6 +527,61 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
assertThat(execution).isNotNull();
|
||||
assertThat(execution.getFlowId()).isEqualTo(flow.getId());
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
queueCount.countDown();
|
||||
});
|
||||
|
||||
scheduler.run();
|
||||
|
||||
queueCount.await(1, TimeUnit.MINUTES);
|
||||
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(queueCount.getCount()).isEqualTo(0L);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
@Test
|
||||
void failedEvaluation2() {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
Schedule schedule = createScheduleTrigger("Europe/Paris", "* * * * *", "failedEvaluation", false)
|
||||
.inputs(
|
||||
Map.of("invalidExpressionInput", Expression.builder()
|
||||
.type(Expression.class.getName())
|
||||
.expression(Property.ofExpression("{{ now().hour == 0 ? 3 : 2 }}"))
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.build();
|
||||
FlowWithSource flow = createFlow(this.tenantId,Collections.singletonList(schedule));
|
||||
doReturn(List.of(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// to avoid waiting too much before a trigger execution, we add a last trigger with a date now - 1m.
|
||||
Trigger lastTrigger = Trigger
|
||||
.builder()
|
||||
.triggerId("failedEvaluation")
|
||||
.tenantId(this.tenantId)
|
||||
.flowId(flow.getId())
|
||||
.namespace(flow.getNamespace())
|
||||
.date(ZonedDateTime.now().minusMinutes(1L))
|
||||
.build();
|
||||
triggerState.create(lastTrigger);
|
||||
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
assertThat(execution).isNotNull();
|
||||
assertThat(execution.getFlowId()).isEqualTo(flow.getId());
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user