mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
1 Commits
dependabot
...
chore/test
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f43947b649 |
@@ -4,27 +4,19 @@ import io.kestra.core.junit.annotations.ExecuteFlow;
|
|||||||
import io.kestra.core.junit.annotations.KestraTest;
|
import io.kestra.core.junit.annotations.KestraTest;
|
||||||
import io.kestra.core.junit.annotations.LoadFlows;
|
import io.kestra.core.junit.annotations.LoadFlows;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.models.flows.Flow;
|
|
||||||
import io.kestra.core.models.flows.State;
|
import io.kestra.core.models.flows.State;
|
||||||
import io.kestra.core.queues.QueueException;
|
import io.kestra.core.queues.QueueException;
|
||||||
import io.kestra.core.queues.QueueFactoryInterface;
|
|
||||||
import io.kestra.core.queues.QueueInterface;
|
|
||||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
|
||||||
import io.kestra.core.runners.ConcurrencyLimit;
|
import io.kestra.core.runners.ConcurrencyLimit;
|
||||||
import io.kestra.core.runners.RunnerUtils;
|
import io.kestra.core.runners.TestRunnerUtils;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.inject.Named;
|
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.TestInstance;
|
import org.junit.jupiter.api.TestInstance;
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@KestraTest(startRunner = true)
|
@KestraTest(startRunner = true)
|
||||||
@@ -34,14 +26,7 @@ class ConcurrencyLimitServiceTest {
|
|||||||
private static final String TENANT_ID = "main";
|
private static final String TENANT_ID = "main";
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private RunnerUtils runnerUtils;
|
private TestRunnerUtils runnerUtils;
|
||||||
|
|
||||||
@Inject
|
|
||||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
|
||||||
private QueueInterface<Execution> executionQueue;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
private FlowRepositoryInterface flowRepositoryInterface;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private ConcurrencyLimitService concurrencyLimitService;
|
private ConcurrencyLimitService concurrencyLimitService;
|
||||||
@@ -57,7 +42,8 @@ class ConcurrencyLimitServiceTest {
|
|||||||
void unqueueExecution() throws QueueException, TimeoutException {
|
void unqueueExecution() throws QueueException, TimeoutException {
|
||||||
// run a first flow so the second is queued
|
// run a first flow so the second is queued
|
||||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||||
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
|
||||||
|
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", execution -> execution.getState().isQueued());
|
||||||
assertThat(result.getState().isQueued()).isTrue();
|
assertThat(result.getState().isQueued()).isTrue();
|
||||||
|
|
||||||
Execution unqueued = concurrencyLimitService.unqueue(result, State.Type.RUNNING);
|
Execution unqueued = concurrencyLimitService.unqueue(result, State.Type.RUNNING);
|
||||||
@@ -101,21 +87,4 @@ class ConcurrencyLimitServiceTest {
|
|||||||
assertThat(list.getFirst().getNamespace()).isEqualTo(execution.getNamespace());
|
assertThat(list.getFirst().getNamespace()).isEqualTo(execution.getNamespace());
|
||||||
assertThat(list.getFirst().getFlowId()).isEqualTo(execution.getFlowId());
|
assertThat(list.getFirst().getFlowId()).isEqualTo(execution.getFlowId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
|
|
||||||
return runUntilState(namespace, flowId, State.Type.QUEUED);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Execution runUntilState(String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
|
|
||||||
Execution execution = this.createExecution(namespace, flowId);
|
|
||||||
return runnerUtils.awaitExecution(
|
|
||||||
it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state,
|
|
||||||
throwRunnable(() -> this.executionQueue.emit(execution)),
|
|
||||||
Duration.ofSeconds(1));
|
|
||||||
}
|
|
||||||
|
|
||||||
private Execution createExecution(String namespace, String flowId) {
|
|
||||||
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
|
|
||||||
return Execution.newExecution(flow, null);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
@@ -1,6 +1,5 @@
|
|||||||
package io.kestra.core.runners;
|
package io.kestra.core.runners;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import io.kestra.core.models.Label;
|
import io.kestra.core.models.Label;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.models.flows.Flow;
|
import io.kestra.core.models.flows.Flow;
|
||||||
@@ -162,6 +161,34 @@ public class TestRunnerUtils {
|
|||||||
return this.emitAndAwaitExecution(isRunningExecution(execution), execution, duration);
|
return this.emitAndAwaitExecution(isRunningExecution(execution), execution, duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Execution runOneUntil(String tenantId, String namespace, String flowId, Predicate<Execution> predicate)
|
||||||
|
throws QueueException {
|
||||||
|
return this.runOneUntil(tenantId, namespace, flowId, null, null, null, predicate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Execution runOneUntil(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, Predicate<Execution> predicate)
|
||||||
|
throws QueueException {
|
||||||
|
return this.runOneUntil(
|
||||||
|
flowRepository
|
||||||
|
.findById(tenantId, namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
|
||||||
|
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")),
|
||||||
|
inputs,
|
||||||
|
duration,
|
||||||
|
predicate
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Execution runOneUntil(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, Predicate<Execution> predicate)
|
||||||
|
throws QueueException {
|
||||||
|
if (duration == null) {
|
||||||
|
duration = DEFAULT_MAX_WAIT_DURATION;
|
||||||
|
}
|
||||||
|
|
||||||
|
Execution execution = Execution.newExecution(flow, inputs, null, Optional.empty());
|
||||||
|
|
||||||
|
return this.emitAndAwaitExecution(predicate, execution, duration);
|
||||||
|
}
|
||||||
|
|
||||||
public Execution emitAndAwaitExecution(Predicate<Execution> predicate, Execution execution) throws QueueException {
|
public Execution emitAndAwaitExecution(Predicate<Execution> predicate, Execution execution) throws QueueException {
|
||||||
return emitAndAwaitExecution(predicate, execution, Duration.ofSeconds(20));
|
return emitAndAwaitExecution(predicate, execution, Duration.ofSeconds(20));
|
||||||
}
|
}
|
||||||
@@ -300,7 +327,6 @@ public class TestRunnerUtils {
|
|||||||
return receive.get();
|
return receive.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public Execution awaitChildExecution(Flow flow, Execution parentExecution, Execution execution, Duration duration)
|
public Execution awaitChildExecution(Flow flow, Execution parentExecution, Execution execution, Duration duration)
|
||||||
throws QueueException {
|
throws QueueException {
|
||||||
return this.emitAndAwaitExecution(isTerminatedChildExecution(parentExecution, flow), execution, duration);
|
return this.emitAndAwaitExecution(isTerminatedChildExecution(parentExecution, flow), execution, duration);
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
|
|||||||
import io.kestra.core.runners.FlowInputOutput;
|
import io.kestra.core.runners.FlowInputOutput;
|
||||||
import io.kestra.core.runners.InputsTest;
|
import io.kestra.core.runners.InputsTest;
|
||||||
import io.kestra.core.runners.LocalPath;
|
import io.kestra.core.runners.LocalPath;
|
||||||
import io.kestra.core.runners.RunnerUtils;
|
import io.kestra.core.runners.TestRunnerUtils;
|
||||||
import io.kestra.core.serializers.JacksonMapper;
|
import io.kestra.core.serializers.JacksonMapper;
|
||||||
import io.kestra.core.storages.StorageContext;
|
import io.kestra.core.storages.StorageContext;
|
||||||
import io.kestra.core.storages.StorageInterface;
|
import io.kestra.core.storages.StorageInterface;
|
||||||
@@ -121,7 +121,7 @@ class ExecutionControllerRunnerTest {
|
|||||||
private JdbcTestUtils jdbcTestUtils;
|
private JdbcTestUtils jdbcTestUtils;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
protected RunnerUtils runnerUtils;
|
protected TestRunnerUtils runnerUtils;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private StorageInterface storageInterface;
|
private StorageInterface storageInterface;
|
||||||
@@ -412,9 +412,6 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(flow.isPresent()).isTrue();
|
assertThat(flow.isPresent()).isTrue();
|
||||||
|
|
||||||
// Run child execution starting from a specific task and wait until it finishes
|
// Run child execution starting from a specific task and wait until it finishes
|
||||||
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
|
||||||
flow.get(),
|
|
||||||
parentExecution, throwRunnable(() -> {
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
Execution createdChidExec = client.toBlocking().retrieve(
|
Execution createdChidExec = client.toBlocking().retrieve(
|
||||||
@@ -435,7 +432,10 @@ class ExecutionControllerRunnerTest {
|
|||||||
|
|
||||||
assertThat(createdChidExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
assertThat(createdChidExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||||
assertThat(createdChidExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
assertThat(createdChidExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
||||||
}),
|
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
||||||
|
flow.get(),
|
||||||
|
parentExecution,
|
||||||
|
createdChidExec,
|
||||||
Duration.ofSeconds(15));
|
Duration.ofSeconds(15));
|
||||||
|
|
||||||
assertThat(finishedChildExecution).isNotNull();
|
assertThat(finishedChildExecution).isNotNull();
|
||||||
@@ -465,9 +465,6 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(flow.isPresent()).isTrue();
|
assertThat(flow.isPresent()).isTrue();
|
||||||
|
|
||||||
// Run child execution starting from a specific task and wait until it finishes
|
// Run child execution starting from a specific task and wait until it finishes
|
||||||
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
|
||||||
flow.get(),
|
|
||||||
parentExecution, throwRunnable(() -> {
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
MultipartBody multipartBody = MultipartBody.builder()
|
MultipartBody multipartBody = MultipartBody.builder()
|
||||||
@@ -484,7 +481,10 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(replay).isNotNull();
|
assertThat(replay).isNotNull();
|
||||||
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
||||||
assertThat(replay.getState().getCurrent()).isEqualTo(Type.CREATED);
|
assertThat(replay.getState().getCurrent()).isEqualTo(Type.CREATED);
|
||||||
}),
|
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
||||||
|
flow.get(),
|
||||||
|
parentExecution,
|
||||||
|
replay,
|
||||||
Duration.ofSeconds(15));
|
Duration.ofSeconds(15));
|
||||||
|
|
||||||
assertThat(finishedChildExecution).isNotNull();
|
assertThat(finishedChildExecution).isNotNull();
|
||||||
@@ -515,9 +515,6 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(flow.isPresent()).isTrue();
|
assertThat(flow.isPresent()).isTrue();
|
||||||
|
|
||||||
// Run child execution starting from a specific task and wait until it finishes
|
// Run child execution starting from a specific task and wait until it finishes
|
||||||
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
|
||||||
flow.get(),
|
|
||||||
parentExecution, throwRunnable(() -> {
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
MultipartBody multipartBody = MultipartBody.builder()
|
MultipartBody multipartBody = MultipartBody.builder()
|
||||||
@@ -535,7 +532,10 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
||||||
assertThat(replay.getTaskRunList().size()).isEqualTo(2);
|
assertThat(replay.getTaskRunList().size()).isEqualTo(2);
|
||||||
assertThat(replay.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
assertThat(replay.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||||
}),
|
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
||||||
|
flow.get(),
|
||||||
|
parentExecution,
|
||||||
|
replay,
|
||||||
Duration.ofSeconds(15));
|
Duration.ofSeconds(15));
|
||||||
|
|
||||||
assertThat(finishedChildExecution).isNotNull();
|
assertThat(finishedChildExecution).isNotNull();
|
||||||
@@ -563,9 +563,6 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(flow.isPresent()).isTrue();
|
assertThat(flow.isPresent()).isTrue();
|
||||||
|
|
||||||
// Run child execution starting from a specific task and wait until it finishes
|
// Run child execution starting from a specific task and wait until it finishes
|
||||||
runnerUtils.awaitChildExecution(
|
|
||||||
flow.get(),
|
|
||||||
parentExecution, throwRunnable(() -> {
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
Execution createdChidExec = client.toBlocking().retrieve(
|
Execution createdChidExec = client.toBlocking().retrieve(
|
||||||
@@ -579,7 +576,10 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(createdChidExec.getTaskRunList()).hasSize(20);
|
assertThat(createdChidExec.getTaskRunList()).hasSize(20);
|
||||||
|
|
||||||
assertThat(createdChidExec.getId()).isNotEqualTo(parentExecution.getId());
|
assertThat(createdChidExec.getId()).isNotEqualTo(parentExecution.getId());
|
||||||
}),
|
runnerUtils.awaitChildExecution(
|
||||||
|
flow.get(),
|
||||||
|
parentExecution,
|
||||||
|
createdChidExec,
|
||||||
Duration.ofSeconds(30));
|
Duration.ofSeconds(30));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -599,11 +599,6 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(flow.isPresent()).isTrue();
|
assertThat(flow.isPresent()).isTrue();
|
||||||
|
|
||||||
// Restart execution and wait until it finishes
|
// Restart execution and wait until it finishes
|
||||||
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
|
||||||
execution -> execution.getId().equals(firstExecution.getId()) &&
|
|
||||||
execution.getTaskRunList().size() == 4 &&
|
|
||||||
execution.getState().isTerminated(),
|
|
||||||
() -> {
|
|
||||||
Execution restartedExec = client.toBlocking().retrieve(
|
Execution restartedExec = client.toBlocking().retrieve(
|
||||||
HttpRequest
|
HttpRequest
|
||||||
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
||||||
@@ -625,7 +620,10 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||||
assertThat(restartedExec.getTaskRunList().get(2).getAttempts().size()).isEqualTo(1);
|
assertThat(restartedExec.getTaskRunList().get(2).getAttempts().size()).isEqualTo(1);
|
||||||
});
|
});
|
||||||
},
|
|
||||||
|
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
||||||
|
execution -> execution.getTaskRunList().size() == 4 && execution.getState().isTerminated(),
|
||||||
|
restartedExec,
|
||||||
Duration.ofSeconds(15)
|
Duration.ofSeconds(15)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -662,11 +660,6 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(flow.isPresent()).isTrue();
|
assertThat(flow.isPresent()).isTrue();
|
||||||
|
|
||||||
// Restart execution and wait until it finishes
|
// Restart execution and wait until it finishes
|
||||||
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
|
||||||
execution -> execution.getId().equals(firstExecution.getId()) &&
|
|
||||||
execution.getTaskRunList().size() == 5 &&
|
|
||||||
execution.getState().isTerminated(),
|
|
||||||
() -> {
|
|
||||||
Execution restartedExec = client.toBlocking().retrieve(
|
Execution restartedExec = client.toBlocking().retrieve(
|
||||||
HttpRequest
|
HttpRequest
|
||||||
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
||||||
@@ -690,7 +683,10 @@ class ExecutionControllerRunnerTest {
|
|||||||
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
|
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
|
||||||
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
||||||
});
|
});
|
||||||
},
|
|
||||||
|
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
||||||
|
execution -> execution.getTaskRunList().size() == 5 && execution.getState().isTerminated(),
|
||||||
|
restartedExec,
|
||||||
Duration.ofSeconds(15)
|
Duration.ofSeconds(15)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -1748,7 +1744,7 @@ class ExecutionControllerRunnerTest {
|
|||||||
void shouldUnqueueExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
void shouldUnqueueExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
||||||
// run a first flow so the second is queued
|
// run a first flow so the second is queued
|
||||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||||
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||||
|
|
||||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/unqueue", null));
|
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/unqueue", null));
|
||||||
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
||||||
@@ -1756,7 +1752,7 @@ class ExecutionControllerRunnerTest {
|
|||||||
// waiting for the flow to complete successfully
|
// waiting for the flow to complete successfully
|
||||||
runnerUtils.awaitExecution(
|
runnerUtils.awaitExecution(
|
||||||
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
|
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
|
||||||
() -> {},
|
result,
|
||||||
Duration.ofSeconds(10)
|
Duration.ofSeconds(10)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -1777,7 +1773,7 @@ class ExecutionControllerRunnerTest {
|
|||||||
void shouldUnqueueAQueuedFlowToCancelledState() throws QueueException, TimeoutException {
|
void shouldUnqueueAQueuedFlowToCancelledState() throws QueueException, TimeoutException {
|
||||||
// run a first flow so the second is queued
|
// run a first flow so the second is queued
|
||||||
runnerUtils.runOneUntilRunning(TENANT_ID, "io.kestra.tests", "flow-concurrency-queue");
|
runnerUtils.runOneUntilRunning(TENANT_ID, "io.kestra.tests", "flow-concurrency-queue");
|
||||||
Execution result1 = runUntilQueued("io.kestra.tests", "flow-concurrency-queue");
|
Execution result1 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||||
|
|
||||||
var cancelResponse = client.toBlocking().exchange(
|
var cancelResponse = client.toBlocking().exchange(
|
||||||
HttpRequest.POST("/api/v1/executions/" + result1.getId() + "/unqueue?state=CANCELLED", null)
|
HttpRequest.POST("/api/v1/executions/" + result1.getId() + "/unqueue?state=CANCELLED", null)
|
||||||
@@ -1794,9 +1790,9 @@ class ExecutionControllerRunnerTest {
|
|||||||
void shouldUnqueueExecutionByIdsQueuedFlows() throws TimeoutException, QueueException {
|
void shouldUnqueueExecutionByIdsQueuedFlows() throws TimeoutException, QueueException {
|
||||||
// run a first flow so the others are queued
|
// run a first flow so the others are queued
|
||||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||||
Execution result1 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
Execution result1 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||||
Execution result2 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
Execution result2 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||||
Execution result3 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
Execution result3 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||||
|
|
||||||
BulkResponse response = client.toBlocking().retrieve(
|
BulkResponse response = client.toBlocking().retrieve(
|
||||||
HttpRequest.POST("/api/v1/main/executions/unqueue/by-ids", List.of(result1.getId(), result2.getId(), result3.getId())),
|
HttpRequest.POST("/api/v1/main/executions/unqueue/by-ids", List.of(result1.getId(), result2.getId(), result3.getId())),
|
||||||
@@ -1810,7 +1806,7 @@ class ExecutionControllerRunnerTest {
|
|||||||
void shouldForceRunExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
void shouldForceRunExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
||||||
// run a first flow so the second is queued
|
// run a first flow so the second is queued
|
||||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||||
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||||
|
|
||||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/force-run", null));
|
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/force-run", null));
|
||||||
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
||||||
@@ -1821,7 +1817,7 @@ class ExecutionControllerRunnerTest {
|
|||||||
// waiting for the flow to complete successfully
|
// waiting for the flow to complete successfully
|
||||||
runnerUtils.awaitExecution(
|
runnerUtils.awaitExecution(
|
||||||
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
|
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
|
||||||
() -> {},
|
result,
|
||||||
Duration.ofSeconds(10)
|
Duration.ofSeconds(10)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -2021,22 +2017,11 @@ class ExecutionControllerRunnerTest {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
|
|
||||||
return runUntilState(namespace, flowId, State.Type.QUEUED);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Execution createExecution(String namespace, String flowId) {
|
private Execution createExecution(String namespace, String flowId) {
|
||||||
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
|
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
|
||||||
return Execution.newExecution(flow, null);
|
return Execution.newExecution(flow, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Execution runUntilState(String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
|
|
||||||
Execution execution = this.createExecution(namespace, flowId);
|
|
||||||
return runnerUtils.awaitExecution(
|
|
||||||
it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state,
|
|
||||||
throwRunnable(() -> this.executionQueue.emit(execution)),
|
|
||||||
Duration.ofSeconds(1));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/minimal.yaml"})
|
@LoadFlows({"flows/valids/minimal.yaml"})
|
||||||
@@ -2113,7 +2098,7 @@ class ExecutionControllerRunnerTest {
|
|||||||
// wait for the exec to be terminated
|
// wait for the exec to be terminated
|
||||||
Execution terminated = runnerUtils.awaitExecution(
|
Execution terminated = runnerUtils.awaitExecution(
|
||||||
it -> execution.getId().equals(it.getId()) && it.getState().isTerminated(),
|
it -> execution.getId().equals(it.getId()) && it.getState().isTerminated(),
|
||||||
() -> {},
|
execution,
|
||||||
Duration.ofSeconds(10));
|
Duration.ofSeconds(10));
|
||||||
assertThat(terminated.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
assertThat(terminated.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
assertThat(terminated.getTaskRunList()).hasSize(1);
|
assertThat(terminated.getTaskRunList()).hasSize(1);
|
||||||
|
|||||||
Reference in New Issue
Block a user