|
|
|
|
@@ -24,7 +24,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
|
|
|
|
|
import io.kestra.core.runners.FlowInputOutput;
|
|
|
|
|
import io.kestra.core.runners.InputsTest;
|
|
|
|
|
import io.kestra.core.runners.LocalPath;
|
|
|
|
|
import io.kestra.core.runners.RunnerUtils;
|
|
|
|
|
import io.kestra.core.runners.TestRunnerUtils;
|
|
|
|
|
import io.kestra.core.serializers.JacksonMapper;
|
|
|
|
|
import io.kestra.core.storages.StorageContext;
|
|
|
|
|
import io.kestra.core.storages.StorageInterface;
|
|
|
|
|
@@ -121,7 +121,7 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
private JdbcTestUtils jdbcTestUtils;
|
|
|
|
|
|
|
|
|
|
@Inject
|
|
|
|
|
protected RunnerUtils runnerUtils;
|
|
|
|
|
protected TestRunnerUtils runnerUtils;
|
|
|
|
|
|
|
|
|
|
@Inject
|
|
|
|
|
private StorageInterface storageInterface;
|
|
|
|
|
@@ -412,30 +412,30 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
assertThat(flow.isPresent()).isTrue();
|
|
|
|
|
|
|
|
|
|
// Run child execution starting from a specific task and wait until it finishes
|
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
|
|
|
|
|
Execution createdChidExec = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(createdChidExec).isNotNull();
|
|
|
|
|
assertThat(createdChidExec.getParentId()).isEqualTo(parentExecution.getId());
|
|
|
|
|
assertThat(createdChidExec.getTaskRunList().size()).isEqualTo(4);
|
|
|
|
|
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
|
|
|
|
|
IntStream
|
|
|
|
|
.range(0, 3)
|
|
|
|
|
.mapToObj(value -> createdChidExec.getTaskRunList().get(value))
|
|
|
|
|
.forEach(taskRun -> assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS));
|
|
|
|
|
|
|
|
|
|
assertThat(createdChidExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
assertThat(createdChidExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
|
|
|
|
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
|
|
|
|
flow.get(),
|
|
|
|
|
parentExecution, throwRunnable(() -> {
|
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
|
|
|
|
|
Execution createdChidExec = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(createdChidExec).isNotNull();
|
|
|
|
|
assertThat(createdChidExec.getParentId()).isEqualTo(parentExecution.getId());
|
|
|
|
|
assertThat(createdChidExec.getTaskRunList().size()).isEqualTo(4);
|
|
|
|
|
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
|
|
|
|
|
IntStream
|
|
|
|
|
.range(0, 3)
|
|
|
|
|
.mapToObj(value -> createdChidExec.getTaskRunList().get(value))
|
|
|
|
|
.forEach(taskRun -> assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS));
|
|
|
|
|
|
|
|
|
|
assertThat(createdChidExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
assertThat(createdChidExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
|
|
|
|
}),
|
|
|
|
|
parentExecution,
|
|
|
|
|
createdChidExec,
|
|
|
|
|
Duration.ofSeconds(15));
|
|
|
|
|
|
|
|
|
|
assertThat(finishedChildExecution).isNotNull();
|
|
|
|
|
@@ -465,26 +465,26 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
assertThat(flow.isPresent()).isTrue();
|
|
|
|
|
|
|
|
|
|
// Run child execution starting from a specific task and wait until it finishes
|
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
|
|
|
|
|
MultipartBody multipartBody = MultipartBody.builder()
|
|
|
|
|
.addPart("condition", "success")
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
Execution replay = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs", multipartBody)
|
|
|
|
|
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(replay).isNotNull();
|
|
|
|
|
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
|
|
|
|
assertThat(replay.getState().getCurrent()).isEqualTo(Type.CREATED);
|
|
|
|
|
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
|
|
|
|
flow.get(),
|
|
|
|
|
parentExecution, throwRunnable(() -> {
|
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
|
|
|
|
|
MultipartBody multipartBody = MultipartBody.builder()
|
|
|
|
|
.addPart("condition", "success")
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
Execution replay = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs", multipartBody)
|
|
|
|
|
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(replay).isNotNull();
|
|
|
|
|
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
|
|
|
|
assertThat(replay.getState().getCurrent()).isEqualTo(Type.CREATED);
|
|
|
|
|
}),
|
|
|
|
|
parentExecution,
|
|
|
|
|
replay,
|
|
|
|
|
Duration.ofSeconds(15));
|
|
|
|
|
|
|
|
|
|
assertThat(finishedChildExecution).isNotNull();
|
|
|
|
|
@@ -515,27 +515,27 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
assertThat(flow.isPresent()).isTrue();
|
|
|
|
|
|
|
|
|
|
// Run child execution starting from a specific task and wait until it finishes
|
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
|
|
|
|
|
MultipartBody multipartBody = MultipartBody.builder()
|
|
|
|
|
.addPart("condition", "success")
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
Execution replay = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), multipartBody)
|
|
|
|
|
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(replay).isNotNull();
|
|
|
|
|
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
|
|
|
|
assertThat(replay.getTaskRunList().size()).isEqualTo(2);
|
|
|
|
|
assertThat(replay.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
|
|
|
|
flow.get(),
|
|
|
|
|
parentExecution, throwRunnable(() -> {
|
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
|
|
|
|
|
MultipartBody multipartBody = MultipartBody.builder()
|
|
|
|
|
.addPart("condition", "success")
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
Execution replay = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), multipartBody)
|
|
|
|
|
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(replay).isNotNull();
|
|
|
|
|
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
|
|
|
|
assertThat(replay.getTaskRunList().size()).isEqualTo(2);
|
|
|
|
|
assertThat(replay.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
}),
|
|
|
|
|
parentExecution,
|
|
|
|
|
replay,
|
|
|
|
|
Duration.ofSeconds(15));
|
|
|
|
|
|
|
|
|
|
assertThat(finishedChildExecution).isNotNull();
|
|
|
|
|
@@ -563,23 +563,23 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
assertThat(flow.isPresent()).isTrue();
|
|
|
|
|
|
|
|
|
|
// Run child execution starting from a specific task and wait until it finishes
|
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
|
|
|
|
|
Execution createdChidExec = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
assertThat(createdChidExec.getState().getHistories()).hasSize(4);
|
|
|
|
|
assertThat(createdChidExec.getTaskRunList()).hasSize(20);
|
|
|
|
|
|
|
|
|
|
assertThat(createdChidExec.getId()).isNotEqualTo(parentExecution.getId());
|
|
|
|
|
runnerUtils.awaitChildExecution(
|
|
|
|
|
flow.get(),
|
|
|
|
|
parentExecution, throwRunnable(() -> {
|
|
|
|
|
Thread.sleep(100);
|
|
|
|
|
|
|
|
|
|
Execution createdChidExec = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
assertThat(createdChidExec.getState().getHistories()).hasSize(4);
|
|
|
|
|
assertThat(createdChidExec.getTaskRunList()).hasSize(20);
|
|
|
|
|
|
|
|
|
|
assertThat(createdChidExec.getId()).isNotEqualTo(parentExecution.getId());
|
|
|
|
|
}),
|
|
|
|
|
parentExecution,
|
|
|
|
|
createdChidExec,
|
|
|
|
|
Duration.ofSeconds(30));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -599,33 +599,31 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
assertThat(flow.isPresent()).isTrue();
|
|
|
|
|
|
|
|
|
|
// Restart execution and wait until it finishes
|
|
|
|
|
Execution restartedExec = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(restartedExec).isNotNull();
|
|
|
|
|
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
|
|
|
|
|
assertThat(restartedExec.getParentId()).isNull();
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(3);
|
|
|
|
|
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
|
|
|
|
|
IntStream
|
|
|
|
|
.range(0, 2)
|
|
|
|
|
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
|
|
|
|
|
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
|
|
|
|
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
|
|
|
|
|
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(2).getAttempts().size()).isEqualTo(1);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
|
|
|
|
execution -> execution.getId().equals(firstExecution.getId()) &&
|
|
|
|
|
execution.getTaskRunList().size() == 4 &&
|
|
|
|
|
execution.getState().isTerminated(),
|
|
|
|
|
() -> {
|
|
|
|
|
Execution restartedExec = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(restartedExec).isNotNull();
|
|
|
|
|
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
|
|
|
|
|
assertThat(restartedExec.getParentId()).isNull();
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(3);
|
|
|
|
|
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
|
|
|
|
|
IntStream
|
|
|
|
|
.range(0, 2)
|
|
|
|
|
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
|
|
|
|
|
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
|
|
|
|
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
|
|
|
|
|
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(2).getAttempts().size()).isEqualTo(1);
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
execution -> execution.getTaskRunList().size() == 4 && execution.getState().isTerminated(),
|
|
|
|
|
restartedExec,
|
|
|
|
|
Duration.ofSeconds(15)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
@@ -662,35 +660,33 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
assertThat(flow.isPresent()).isTrue();
|
|
|
|
|
|
|
|
|
|
// Restart execution and wait until it finishes
|
|
|
|
|
Execution restartedExec = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(restartedExec).isNotNull();
|
|
|
|
|
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
|
|
|
|
|
assertThat(restartedExec.getParentId()).isNull();
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(4);
|
|
|
|
|
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
|
|
|
|
|
IntStream
|
|
|
|
|
.range(0, 2)
|
|
|
|
|
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
|
|
|
|
|
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
|
|
|
|
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
|
|
|
|
|
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
|
|
|
|
execution -> execution.getId().equals(firstExecution.getId()) &&
|
|
|
|
|
execution.getTaskRunList().size() == 5 &&
|
|
|
|
|
execution.getState().isTerminated(),
|
|
|
|
|
() -> {
|
|
|
|
|
Execution restartedExec = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest
|
|
|
|
|
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
|
|
|
|
Execution.class
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertThat(restartedExec).isNotNull();
|
|
|
|
|
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
|
|
|
|
|
assertThat(restartedExec.getParentId()).isNull();
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(4);
|
|
|
|
|
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
|
|
|
|
|
IntStream
|
|
|
|
|
.range(0, 2)
|
|
|
|
|
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
|
|
|
|
|
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
|
|
|
|
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
|
|
|
|
|
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
|
|
|
|
|
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
execution -> execution.getTaskRunList().size() == 5 && execution.getState().isTerminated(),
|
|
|
|
|
restartedExec,
|
|
|
|
|
Duration.ofSeconds(15)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
@@ -1748,7 +1744,7 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
void shouldUnqueueExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
|
|
|
|
// run a first flow so the second is queued
|
|
|
|
|
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
|
|
|
|
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
|
|
|
|
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
|
|
|
|
|
|
|
|
|
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/unqueue", null));
|
|
|
|
|
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
|
|
|
|
@@ -1756,7 +1752,7 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
// waiting for the flow to complete successfully
|
|
|
|
|
runnerUtils.awaitExecution(
|
|
|
|
|
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
|
|
|
|
|
() -> {},
|
|
|
|
|
result,
|
|
|
|
|
Duration.ofSeconds(10)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
@@ -1777,7 +1773,7 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
void shouldUnqueueAQueuedFlowToCancelledState() throws QueueException, TimeoutException {
|
|
|
|
|
// run a first flow so the second is queued
|
|
|
|
|
runnerUtils.runOneUntilRunning(TENANT_ID, "io.kestra.tests", "flow-concurrency-queue");
|
|
|
|
|
Execution result1 = runUntilQueued("io.kestra.tests", "flow-concurrency-queue");
|
|
|
|
|
Execution result1 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
|
|
|
|
|
|
|
|
|
var cancelResponse = client.toBlocking().exchange(
|
|
|
|
|
HttpRequest.POST("/api/v1/executions/" + result1.getId() + "/unqueue?state=CANCELLED", null)
|
|
|
|
|
@@ -1794,9 +1790,9 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
void shouldUnqueueExecutionByIdsQueuedFlows() throws TimeoutException, QueueException {
|
|
|
|
|
// run a first flow so the others are queued
|
|
|
|
|
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
|
|
|
|
Execution result1 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
|
|
|
|
Execution result2 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
|
|
|
|
Execution result3 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
|
|
|
|
Execution result1 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
|
|
|
|
Execution result2 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
|
|
|
|
Execution result3 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
|
|
|
|
|
|
|
|
|
BulkResponse response = client.toBlocking().retrieve(
|
|
|
|
|
HttpRequest.POST("/api/v1/main/executions/unqueue/by-ids", List.of(result1.getId(), result2.getId(), result3.getId())),
|
|
|
|
|
@@ -1810,7 +1806,7 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
void shouldForceRunExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
|
|
|
|
// run a first flow so the second is queued
|
|
|
|
|
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
|
|
|
|
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
|
|
|
|
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
|
|
|
|
|
|
|
|
|
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/force-run", null));
|
|
|
|
|
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
|
|
|
|
@@ -1821,7 +1817,7 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
// waiting for the flow to complete successfully
|
|
|
|
|
runnerUtils.awaitExecution(
|
|
|
|
|
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
|
|
|
|
|
() -> {},
|
|
|
|
|
result,
|
|
|
|
|
Duration.ofSeconds(10)
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
@@ -2021,22 +2017,11 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
.build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
|
|
|
|
|
return runUntilState(namespace, flowId, State.Type.QUEUED);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Execution createExecution(String namespace, String flowId) {
|
|
|
|
|
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
|
|
|
|
|
return Execution.newExecution(flow, null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Execution runUntilState(String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
|
|
|
|
|
Execution execution = this.createExecution(namespace, flowId);
|
|
|
|
|
return runnerUtils.awaitExecution(
|
|
|
|
|
it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state,
|
|
|
|
|
throwRunnable(() -> this.executionQueue.emit(execution)),
|
|
|
|
|
Duration.ofSeconds(1));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
@LoadFlows({"flows/valids/minimal.yaml"})
|
|
|
|
|
@@ -2113,7 +2098,7 @@ class ExecutionControllerRunnerTest {
|
|
|
|
|
// wait for the exec to be terminated
|
|
|
|
|
Execution terminated = runnerUtils.awaitExecution(
|
|
|
|
|
it -> execution.getId().equals(it.getId()) && it.getState().isTerminated(),
|
|
|
|
|
() -> {},
|
|
|
|
|
execution,
|
|
|
|
|
Duration.ofSeconds(10));
|
|
|
|
|
assertThat(terminated.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
|
|
|
|
assertThat(terminated.getTaskRunList()).hasSize(1);
|
|
|
|
|
|