mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
4 Commits
executions
...
chore/test
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f43947b649 | ||
|
|
18b6b4ce5d | ||
|
|
dd65b4697e | ||
|
|
9294c9f885 |
@@ -4,27 +4,19 @@ import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.ConcurrencyLimit;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.runners.TestRunnerUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
@@ -34,14 +26,7 @@ class ConcurrencyLimitServiceTest {
|
||||
private static final String TENANT_ID = "main";
|
||||
|
||||
@Inject
|
||||
private RunnerUtils runnerUtils;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
private QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepositoryInterface;
|
||||
private TestRunnerUtils runnerUtils;
|
||||
|
||||
@Inject
|
||||
private ConcurrencyLimitService concurrencyLimitService;
|
||||
@@ -57,7 +42,8 @@ class ConcurrencyLimitServiceTest {
|
||||
void unqueueExecution() throws QueueException, TimeoutException {
|
||||
// run a first flow so the second is queued
|
||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
|
||||
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", execution -> execution.getState().isQueued());
|
||||
assertThat(result.getState().isQueued()).isTrue();
|
||||
|
||||
Execution unqueued = concurrencyLimitService.unqueue(result, State.Type.RUNNING);
|
||||
@@ -101,21 +87,4 @@ class ConcurrencyLimitServiceTest {
|
||||
assertThat(list.getFirst().getNamespace()).isEqualTo(execution.getNamespace());
|
||||
assertThat(list.getFirst().getFlowId()).isEqualTo(execution.getFlowId());
|
||||
}
|
||||
|
||||
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
|
||||
return runUntilState(namespace, flowId, State.Type.QUEUED);
|
||||
}
|
||||
|
||||
private Execution runUntilState(String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
|
||||
Execution execution = this.createExecution(namespace, flowId);
|
||||
return runnerUtils.awaitExecution(
|
||||
it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state,
|
||||
throwRunnable(() -> this.executionQueue.emit(execution)),
|
||||
Duration.ofSeconds(1));
|
||||
}
|
||||
|
||||
private Execution createExecution(String namespace, String flowId) {
|
||||
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
|
||||
return Execution.newExecution(flow, null);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
version=1.1.0-SNAPSHOT
|
||||
version=1.2.0-SNAPSHOT
|
||||
|
||||
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
|
||||
org.gradle.parallel=true
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
@@ -162,6 +161,34 @@ public class TestRunnerUtils {
|
||||
return this.emitAndAwaitExecution(isRunningExecution(execution), execution, duration);
|
||||
}
|
||||
|
||||
public Execution runOneUntil(String tenantId, String namespace, String flowId, Predicate<Execution> predicate)
|
||||
throws QueueException {
|
||||
return this.runOneUntil(tenantId, namespace, flowId, null, null, null, predicate);
|
||||
}
|
||||
|
||||
public Execution runOneUntil(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, Predicate<Execution> predicate)
|
||||
throws QueueException {
|
||||
return this.runOneUntil(
|
||||
flowRepository
|
||||
.findById(tenantId, namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
|
||||
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")),
|
||||
inputs,
|
||||
duration,
|
||||
predicate
|
||||
);
|
||||
}
|
||||
|
||||
public Execution runOneUntil(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, Predicate<Execution> predicate)
|
||||
throws QueueException {
|
||||
if (duration == null) {
|
||||
duration = DEFAULT_MAX_WAIT_DURATION;
|
||||
}
|
||||
|
||||
Execution execution = Execution.newExecution(flow, inputs, null, Optional.empty());
|
||||
|
||||
return this.emitAndAwaitExecution(predicate, execution, duration);
|
||||
}
|
||||
|
||||
public Execution emitAndAwaitExecution(Predicate<Execution> predicate, Execution execution) throws QueueException {
|
||||
return emitAndAwaitExecution(predicate, execution, Duration.ofSeconds(20));
|
||||
}
|
||||
@@ -300,7 +327,6 @@ public class TestRunnerUtils {
|
||||
return receive.get();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Execution awaitChildExecution(Flow flow, Execution parentExecution, Execution execution, Duration duration)
|
||||
throws QueueException {
|
||||
return this.emitAndAwaitExecution(isTerminatedChildExecution(parentExecution, flow), execution, duration);
|
||||
|
||||
@@ -78,10 +78,17 @@ export const useBaseNamespacesStore = () => {
|
||||
}
|
||||
const data = response.data;
|
||||
const contentLength = response.headers?.["content-length"];
|
||||
|
||||
let value = data;
|
||||
if (contentLength === (data.length + 2).toString()) {
|
||||
return `"${data}"`;
|
||||
value = `"${data}"`;
|
||||
}
|
||||
return data;
|
||||
return {
|
||||
type: response.headers?.["content-type"] || "STRING",
|
||||
value: value,
|
||||
description: response.headers?.["description"] || "",
|
||||
ttl: response.headers?.["ttl"] || undefined
|
||||
};
|
||||
}
|
||||
|
||||
async function loadInheritedKVs(this: any, id: string) {
|
||||
|
||||
@@ -24,7 +24,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.InputsTest;
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.runners.TestRunnerUtils;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -121,7 +121,7 @@ class ExecutionControllerRunnerTest {
|
||||
private JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@Inject
|
||||
protected RunnerUtils runnerUtils;
|
||||
protected TestRunnerUtils runnerUtils;
|
||||
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
@@ -412,30 +412,30 @@ class ExecutionControllerRunnerTest {
|
||||
assertThat(flow.isPresent()).isTrue();
|
||||
|
||||
// Run child execution starting from a specific task and wait until it finishes
|
||||
Thread.sleep(100);
|
||||
|
||||
Execution createdChidExec = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(createdChidExec).isNotNull();
|
||||
assertThat(createdChidExec.getParentId()).isEqualTo(parentExecution.getId());
|
||||
assertThat(createdChidExec.getTaskRunList().size()).isEqualTo(4);
|
||||
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
|
||||
IntStream
|
||||
.range(0, 3)
|
||||
.mapToObj(value -> createdChidExec.getTaskRunList().get(value))
|
||||
.forEach(taskRun -> assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS));
|
||||
|
||||
assertThat(createdChidExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(createdChidExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
||||
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
||||
flow.get(),
|
||||
parentExecution, throwRunnable(() -> {
|
||||
Thread.sleep(100);
|
||||
|
||||
Execution createdChidExec = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(createdChidExec).isNotNull();
|
||||
assertThat(createdChidExec.getParentId()).isEqualTo(parentExecution.getId());
|
||||
assertThat(createdChidExec.getTaskRunList().size()).isEqualTo(4);
|
||||
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
|
||||
IntStream
|
||||
.range(0, 3)
|
||||
.mapToObj(value -> createdChidExec.getTaskRunList().get(value))
|
||||
.forEach(taskRun -> assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS));
|
||||
|
||||
assertThat(createdChidExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(createdChidExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
||||
}),
|
||||
parentExecution,
|
||||
createdChidExec,
|
||||
Duration.ofSeconds(15));
|
||||
|
||||
assertThat(finishedChildExecution).isNotNull();
|
||||
@@ -465,26 +465,26 @@ class ExecutionControllerRunnerTest {
|
||||
assertThat(flow.isPresent()).isTrue();
|
||||
|
||||
// Run child execution starting from a specific task and wait until it finishes
|
||||
Thread.sleep(100);
|
||||
|
||||
MultipartBody multipartBody = MultipartBody.builder()
|
||||
.addPart("condition", "success")
|
||||
.build();
|
||||
|
||||
Execution replay = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs", multipartBody)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(replay).isNotNull();
|
||||
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
||||
assertThat(replay.getState().getCurrent()).isEqualTo(Type.CREATED);
|
||||
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
||||
flow.get(),
|
||||
parentExecution, throwRunnable(() -> {
|
||||
Thread.sleep(100);
|
||||
|
||||
MultipartBody multipartBody = MultipartBody.builder()
|
||||
.addPart("condition", "success")
|
||||
.build();
|
||||
|
||||
Execution replay = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs", multipartBody)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(replay).isNotNull();
|
||||
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
||||
assertThat(replay.getState().getCurrent()).isEqualTo(Type.CREATED);
|
||||
}),
|
||||
parentExecution,
|
||||
replay,
|
||||
Duration.ofSeconds(15));
|
||||
|
||||
assertThat(finishedChildExecution).isNotNull();
|
||||
@@ -515,27 +515,27 @@ class ExecutionControllerRunnerTest {
|
||||
assertThat(flow.isPresent()).isTrue();
|
||||
|
||||
// Run child execution starting from a specific task and wait until it finishes
|
||||
Thread.sleep(100);
|
||||
|
||||
MultipartBody multipartBody = MultipartBody.builder()
|
||||
.addPart("condition", "success")
|
||||
.build();
|
||||
|
||||
Execution replay = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), multipartBody)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(replay).isNotNull();
|
||||
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
||||
assertThat(replay.getTaskRunList().size()).isEqualTo(2);
|
||||
assertThat(replay.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
|
||||
flow.get(),
|
||||
parentExecution, throwRunnable(() -> {
|
||||
Thread.sleep(100);
|
||||
|
||||
MultipartBody multipartBody = MultipartBody.builder()
|
||||
.addPart("condition", "success")
|
||||
.build();
|
||||
|
||||
Execution replay = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), multipartBody)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(replay).isNotNull();
|
||||
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
|
||||
assertThat(replay.getTaskRunList().size()).isEqualTo(2);
|
||||
assertThat(replay.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
}),
|
||||
parentExecution,
|
||||
replay,
|
||||
Duration.ofSeconds(15));
|
||||
|
||||
assertThat(finishedChildExecution).isNotNull();
|
||||
@@ -563,23 +563,23 @@ class ExecutionControllerRunnerTest {
|
||||
assertThat(flow.isPresent()).isTrue();
|
||||
|
||||
// Run child execution starting from a specific task and wait until it finishes
|
||||
Thread.sleep(100);
|
||||
|
||||
Execution createdChidExec = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(createdChidExec.getState().getHistories()).hasSize(4);
|
||||
assertThat(createdChidExec.getTaskRunList()).hasSize(20);
|
||||
|
||||
assertThat(createdChidExec.getId()).isNotEqualTo(parentExecution.getId());
|
||||
runnerUtils.awaitChildExecution(
|
||||
flow.get(),
|
||||
parentExecution, throwRunnable(() -> {
|
||||
Thread.sleep(100);
|
||||
|
||||
Execution createdChidExec = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(createdChidExec.getState().getHistories()).hasSize(4);
|
||||
assertThat(createdChidExec.getTaskRunList()).hasSize(20);
|
||||
|
||||
assertThat(createdChidExec.getId()).isNotEqualTo(parentExecution.getId());
|
||||
}),
|
||||
parentExecution,
|
||||
createdChidExec,
|
||||
Duration.ofSeconds(30));
|
||||
}
|
||||
|
||||
@@ -599,33 +599,31 @@ class ExecutionControllerRunnerTest {
|
||||
assertThat(flow.isPresent()).isTrue();
|
||||
|
||||
// Restart execution and wait until it finishes
|
||||
Execution restartedExec = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(restartedExec).isNotNull();
|
||||
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
|
||||
assertThat(restartedExec.getParentId()).isNull();
|
||||
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(3);
|
||||
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
|
||||
IntStream
|
||||
.range(0, 2)
|
||||
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
|
||||
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
|
||||
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getAttempts().size()).isEqualTo(1);
|
||||
});
|
||||
|
||||
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
||||
execution -> execution.getId().equals(firstExecution.getId()) &&
|
||||
execution.getTaskRunList().size() == 4 &&
|
||||
execution.getState().isTerminated(),
|
||||
() -> {
|
||||
Execution restartedExec = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(restartedExec).isNotNull();
|
||||
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
|
||||
assertThat(restartedExec.getParentId()).isNull();
|
||||
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(3);
|
||||
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
|
||||
IntStream
|
||||
.range(0, 2)
|
||||
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
|
||||
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
|
||||
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getAttempts().size()).isEqualTo(1);
|
||||
});
|
||||
},
|
||||
execution -> execution.getTaskRunList().size() == 4 && execution.getState().isTerminated(),
|
||||
restartedExec,
|
||||
Duration.ofSeconds(15)
|
||||
);
|
||||
|
||||
@@ -662,35 +660,33 @@ class ExecutionControllerRunnerTest {
|
||||
assertThat(flow.isPresent()).isTrue();
|
||||
|
||||
// Restart execution and wait until it finishes
|
||||
Execution restartedExec = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(restartedExec).isNotNull();
|
||||
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
|
||||
assertThat(restartedExec.getParentId()).isNull();
|
||||
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(4);
|
||||
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
|
||||
IntStream
|
||||
.range(0, 2)
|
||||
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
|
||||
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
|
||||
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
|
||||
assertThat(restartedExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
|
||||
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
||||
});
|
||||
|
||||
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
||||
execution -> execution.getId().equals(firstExecution.getId()) &&
|
||||
execution.getTaskRunList().size() == 5 &&
|
||||
execution.getState().isTerminated(),
|
||||
() -> {
|
||||
Execution restartedExec = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(restartedExec).isNotNull();
|
||||
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
|
||||
assertThat(restartedExec.getParentId()).isNull();
|
||||
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(4);
|
||||
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
|
||||
IntStream
|
||||
.range(0, 2)
|
||||
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
|
||||
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
|
||||
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
|
||||
assertThat(restartedExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
|
||||
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
|
||||
});
|
||||
},
|
||||
execution -> execution.getTaskRunList().size() == 5 && execution.getState().isTerminated(),
|
||||
restartedExec,
|
||||
Duration.ofSeconds(15)
|
||||
);
|
||||
|
||||
@@ -1748,7 +1744,7 @@ class ExecutionControllerRunnerTest {
|
||||
void shouldUnqueueExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
||||
// run a first flow so the second is queued
|
||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||
|
||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/unqueue", null));
|
||||
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
||||
@@ -1756,7 +1752,7 @@ class ExecutionControllerRunnerTest {
|
||||
// waiting for the flow to complete successfully
|
||||
runnerUtils.awaitExecution(
|
||||
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
|
||||
() -> {},
|
||||
result,
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
|
||||
@@ -1777,7 +1773,7 @@ class ExecutionControllerRunnerTest {
|
||||
void shouldUnqueueAQueuedFlowToCancelledState() throws QueueException, TimeoutException {
|
||||
// run a first flow so the second is queued
|
||||
runnerUtils.runOneUntilRunning(TENANT_ID, "io.kestra.tests", "flow-concurrency-queue");
|
||||
Execution result1 = runUntilQueued("io.kestra.tests", "flow-concurrency-queue");
|
||||
Execution result1 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||
|
||||
var cancelResponse = client.toBlocking().exchange(
|
||||
HttpRequest.POST("/api/v1/executions/" + result1.getId() + "/unqueue?state=CANCELLED", null)
|
||||
@@ -1794,9 +1790,9 @@ class ExecutionControllerRunnerTest {
|
||||
void shouldUnqueueExecutionByIdsQueuedFlows() throws TimeoutException, QueueException {
|
||||
// run a first flow so the others are queued
|
||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result1 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result2 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result3 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result1 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||
Execution result2 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||
Execution result3 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||
|
||||
BulkResponse response = client.toBlocking().retrieve(
|
||||
HttpRequest.POST("/api/v1/main/executions/unqueue/by-ids", List.of(result1.getId(), result2.getId(), result3.getId())),
|
||||
@@ -1810,7 +1806,7 @@ class ExecutionControllerRunnerTest {
|
||||
void shouldForceRunExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
||||
// run a first flow so the second is queued
|
||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
|
||||
|
||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/force-run", null));
|
||||
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
||||
@@ -1821,7 +1817,7 @@ class ExecutionControllerRunnerTest {
|
||||
// waiting for the flow to complete successfully
|
||||
runnerUtils.awaitExecution(
|
||||
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
|
||||
() -> {},
|
||||
result,
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
}
|
||||
@@ -2021,22 +2017,11 @@ class ExecutionControllerRunnerTest {
|
||||
.build();
|
||||
}
|
||||
|
||||
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
|
||||
return runUntilState(namespace, flowId, State.Type.QUEUED);
|
||||
}
|
||||
|
||||
private Execution createExecution(String namespace, String flowId) {
|
||||
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
|
||||
return Execution.newExecution(flow, null);
|
||||
}
|
||||
|
||||
private Execution runUntilState(String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
|
||||
Execution execution = this.createExecution(namespace, flowId);
|
||||
return runnerUtils.awaitExecution(
|
||||
it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state,
|
||||
throwRunnable(() -> this.executionQueue.emit(execution)),
|
||||
Duration.ofSeconds(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/minimal.yaml"})
|
||||
@@ -2113,7 +2098,7 @@ class ExecutionControllerRunnerTest {
|
||||
// wait for the exec to be terminated
|
||||
Execution terminated = runnerUtils.awaitExecution(
|
||||
it -> execution.getId().equals(it.getId()) && it.getState().isTerminated(),
|
||||
() -> {},
|
||||
execution,
|
||||
Duration.ofSeconds(10));
|
||||
assertThat(terminated.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(terminated.getTaskRunList()).hasSize(1);
|
||||
|
||||
@@ -202,24 +202,24 @@ class KVControllerTest {
|
||||
|
||||
static Stream<Arguments> kvSetKeyValueArgs() {
|
||||
return Stream.of(
|
||||
Arguments.of("{\"hello\":\"world\"}", Map.class),
|
||||
Arguments.of("[\"hello\",\"world\"]", List.class),
|
||||
Arguments.of("\"hello\"", String.class),
|
||||
Arguments.of("1", Integer.class),
|
||||
Arguments.of("1.0", BigDecimal.class),
|
||||
Arguments.of("true", Boolean.class),
|
||||
Arguments.of("false", Boolean.class),
|
||||
Arguments.of("2021-09-01", LocalDate.class),
|
||||
Arguments.of("2021-09-01T01:02:03Z", Instant.class),
|
||||
Arguments.of("\"PT5S\"", Duration.class)
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "{\"hello\":\"world\"}", Map.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "[\"hello\",\"world\"]", List.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "\"hello\"", String.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "1", Integer.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "1.0", BigDecimal.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "true", Boolean.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "false", Boolean.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "2021-09-01", LocalDate.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "2021-09-01T01:02:03Z", Instant.class),
|
||||
Arguments.of(MediaType.TEXT_PLAIN, "\"PT5S\"", Duration.class)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("kvSetKeyValueArgs")
|
||||
void setKeyValue(String value, Class<?> expectedClass) throws IOException, ResourceExpiredException {
|
||||
void setKeyValue(MediaType mediaType, String value, Class<?> expectedClass) throws IOException, ResourceExpiredException {
|
||||
String myDescription = "myDescription";
|
||||
client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/my-key", value).header("ttl", "PT5M").header("description", myDescription));
|
||||
client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/my-key", value).contentType(mediaType).header("ttl", "PT5M").header("description", myDescription));
|
||||
|
||||
KVStore kvStore = kvStore();
|
||||
Class<?> valueClazz = kvStore.getValue("my-key").get().value().getClass();
|
||||
@@ -294,7 +294,7 @@ class KVControllerTest {
|
||||
assertThat(httpClientResponseException.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
assertThat(httpClientResponseException.getMessage()).isEqualTo(expectedErrorMessage);
|
||||
|
||||
httpClientResponseException = Assertions.assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/bad$key", "\"content\"")));
|
||||
httpClientResponseException = Assertions.assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/bad$key", "\"content\"").contentType(MediaType.TEXT_PLAIN)));
|
||||
assertThat(httpClientResponseException.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
assertThat(httpClientResponseException.getMessage()).isEqualTo(expectedErrorMessage);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user