Compare commits

...

4 Commits

Author SHA1 Message Date
Loïc Mathieu
f43947b649 chore(tests): stop using the RunnerUtils in tests 2025-11-04 16:16:02 +01:00
Roman Acevedo
18b6b4ce5d test(kv): plain text header is sent now 2025-11-04 15:16:18 +01:00
Krie
dd65b4697e fix: preserve KV description when editing entries (fixes #12598) (#12606) 2025-11-04 15:04:27 +01:00
brian.mulier
9294c9f885 chore(version): upgrade to v1.2.0-SNAPSHOT 2025-11-04 14:04:13 +01:00
6 changed files with 190 additions and 203 deletions

View File

@@ -4,27 +4,19 @@ import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.ConcurrencyLimit;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest(startRunner = true)
@@ -34,14 +26,7 @@ class ConcurrencyLimitServiceTest {
private static final String TENANT_ID = "main";
@Inject
private RunnerUtils runnerUtils;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
private FlowRepositoryInterface flowRepositoryInterface;
private TestRunnerUtils runnerUtils;
@Inject
private ConcurrencyLimitService concurrencyLimitService;
@@ -57,7 +42,8 @@ class ConcurrencyLimitServiceTest {
void unqueueExecution() throws QueueException, TimeoutException {
// run a first flow so the second is queued
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", execution -> execution.getState().isQueued());
assertThat(result.getState().isQueued()).isTrue();
Execution unqueued = concurrencyLimitService.unqueue(result, State.Type.RUNNING);
@@ -101,21 +87,4 @@ class ConcurrencyLimitServiceTest {
assertThat(list.getFirst().getNamespace()).isEqualTo(execution.getNamespace());
assertThat(list.getFirst().getFlowId()).isEqualTo(execution.getFlowId());
}
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
return runUntilState(namespace, flowId, State.Type.QUEUED);
}
private Execution runUntilState(String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
Execution execution = this.createExecution(namespace, flowId);
return runnerUtils.awaitExecution(
it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state,
throwRunnable(() -> this.executionQueue.emit(execution)),
Duration.ofSeconds(1));
}
private Execution createExecution(String namespace, String flowId) {
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
return Execution.newExecution(flow, null);
}
}

View File

@@ -1,4 +1,4 @@
version=1.1.0-SNAPSHOT
version=1.2.0-SNAPSHOT
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
org.gradle.parallel=true

View File

@@ -1,6 +1,5 @@
package io.kestra.core.runners;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
@@ -162,6 +161,34 @@ public class TestRunnerUtils {
return this.emitAndAwaitExecution(isRunningExecution(execution), execution, duration);
}
public Execution runOneUntil(String tenantId, String namespace, String flowId, Predicate<Execution> predicate)
throws QueueException {
return this.runOneUntil(tenantId, namespace, flowId, null, null, null, predicate);
}
public Execution runOneUntil(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, Predicate<Execution> predicate)
throws QueueException {
return this.runOneUntil(
flowRepository
.findById(tenantId, namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")),
inputs,
duration,
predicate
);
}
public Execution runOneUntil(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, Predicate<Execution> predicate)
throws QueueException {
if (duration == null) {
duration = DEFAULT_MAX_WAIT_DURATION;
}
Execution execution = Execution.newExecution(flow, inputs, null, Optional.empty());
return this.emitAndAwaitExecution(predicate, execution, duration);
}
public Execution emitAndAwaitExecution(Predicate<Execution> predicate, Execution execution) throws QueueException {
return emitAndAwaitExecution(predicate, execution, Duration.ofSeconds(20));
}
@@ -300,7 +327,6 @@ public class TestRunnerUtils {
return receive.get();
}
@VisibleForTesting
public Execution awaitChildExecution(Flow flow, Execution parentExecution, Execution execution, Duration duration)
throws QueueException {
return this.emitAndAwaitExecution(isTerminatedChildExecution(parentExecution, flow), execution, duration);

View File

@@ -78,10 +78,17 @@ export const useBaseNamespacesStore = () => {
}
const data = response.data;
const contentLength = response.headers?.["content-length"];
let value = data;
if (contentLength === (data.length + 2).toString()) {
return `"${data}"`;
value = `"${data}"`;
}
return data;
return {
type: response.headers?.["content-type"] || "STRING",
value: value,
description: response.headers?.["description"] || "",
ttl: response.headers?.["ttl"] || undefined
};
}
async function loadInheritedKVs(this: any, id: string) {

View File

@@ -24,7 +24,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.InputsTest;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
@@ -121,7 +121,7 @@ class ExecutionControllerRunnerTest {
private JdbcTestUtils jdbcTestUtils;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
private StorageInterface storageInterface;
@@ -412,30 +412,30 @@ class ExecutionControllerRunnerTest {
assertThat(flow.isPresent()).isTrue();
// Run child execution starting from a specific task and wait until it finishes
Thread.sleep(100);
Execution createdChidExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
Execution.class
);
assertThat(createdChidExec).isNotNull();
assertThat(createdChidExec.getParentId()).isEqualTo(parentExecution.getId());
assertThat(createdChidExec.getTaskRunList().size()).isEqualTo(4);
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
IntStream
.range(0, 3)
.mapToObj(value -> createdChidExec.getTaskRunList().get(value))
.forEach(taskRun -> assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS));
assertThat(createdChidExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(createdChidExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
flow.get(),
parentExecution, throwRunnable(() -> {
Thread.sleep(100);
Execution createdChidExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
Execution.class
);
assertThat(createdChidExec).isNotNull();
assertThat(createdChidExec.getParentId()).isEqualTo(parentExecution.getId());
assertThat(createdChidExec.getTaskRunList().size()).isEqualTo(4);
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
IntStream
.range(0, 3)
.mapToObj(value -> createdChidExec.getTaskRunList().get(value))
.forEach(taskRun -> assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS));
assertThat(createdChidExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(createdChidExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
}),
parentExecution,
createdChidExec,
Duration.ofSeconds(15));
assertThat(finishedChildExecution).isNotNull();
@@ -465,26 +465,26 @@ class ExecutionControllerRunnerTest {
assertThat(flow.isPresent()).isTrue();
// Run child execution starting from a specific task and wait until it finishes
Thread.sleep(100);
MultipartBody multipartBody = MultipartBody.builder()
.addPart("condition", "success")
.build();
Execution replay = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs", multipartBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
assertThat(replay).isNotNull();
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
assertThat(replay.getState().getCurrent()).isEqualTo(Type.CREATED);
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
flow.get(),
parentExecution, throwRunnable(() -> {
Thread.sleep(100);
MultipartBody multipartBody = MultipartBody.builder()
.addPart("condition", "success")
.build();
Execution replay = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs", multipartBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
assertThat(replay).isNotNull();
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
assertThat(replay.getState().getCurrent()).isEqualTo(Type.CREATED);
}),
parentExecution,
replay,
Duration.ofSeconds(15));
assertThat(finishedChildExecution).isNotNull();
@@ -515,27 +515,27 @@ class ExecutionControllerRunnerTest {
assertThat(flow.isPresent()).isTrue();
// Run child execution starting from a specific task and wait until it finishes
Thread.sleep(100);
MultipartBody multipartBody = MultipartBody.builder()
.addPart("condition", "success")
.build();
Execution replay = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), multipartBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
assertThat(replay).isNotNull();
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
assertThat(replay.getTaskRunList().size()).isEqualTo(2);
assertThat(replay.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedChildExecution = runnerUtils.awaitChildExecution(
flow.get(),
parentExecution, throwRunnable(() -> {
Thread.sleep(100);
MultipartBody multipartBody = MultipartBody.builder()
.addPart("condition", "success")
.build();
Execution replay = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay-with-inputs?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), multipartBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
assertThat(replay).isNotNull();
assertThat(replay.getParentId()).isEqualTo(parentExecution.getId());
assertThat(replay.getTaskRunList().size()).isEqualTo(2);
assertThat(replay.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
}),
parentExecution,
replay,
Duration.ofSeconds(15));
assertThat(finishedChildExecution).isNotNull();
@@ -563,23 +563,23 @@ class ExecutionControllerRunnerTest {
assertThat(flow.isPresent()).isTrue();
// Run child execution starting from a specific task and wait until it finishes
Thread.sleep(100);
Execution createdChidExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
Execution.class
);
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(createdChidExec.getState().getHistories()).hasSize(4);
assertThat(createdChidExec.getTaskRunList()).hasSize(20);
assertThat(createdChidExec.getId()).isNotEqualTo(parentExecution.getId());
runnerUtils.awaitChildExecution(
flow.get(),
parentExecution, throwRunnable(() -> {
Thread.sleep(100);
Execution createdChidExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
Execution.class
);
assertThat(createdChidExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(createdChidExec.getState().getHistories()).hasSize(4);
assertThat(createdChidExec.getTaskRunList()).hasSize(20);
assertThat(createdChidExec.getId()).isNotEqualTo(parentExecution.getId());
}),
parentExecution,
createdChidExec,
Duration.ofSeconds(30));
}
@@ -599,33 +599,31 @@ class ExecutionControllerRunnerTest {
assertThat(flow.isPresent()).isTrue();
// Restart execution and wait until it finishes
Execution restartedExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
Execution.class
);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(3);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
IntStream
.range(0, 2)
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restartedExec.getTaskRunList().get(2).getAttempts().size()).isEqualTo(1);
});
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getId().equals(firstExecution.getId()) &&
execution.getTaskRunList().size() == 4 &&
execution.getState().isTerminated(),
() -> {
Execution restartedExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
Execution.class
);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(3);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
IntStream
.range(0, 2)
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restartedExec.getTaskRunList().get(2).getAttempts().size()).isEqualTo(1);
});
},
execution -> execution.getTaskRunList().size() == 4 && execution.getState().isTerminated(),
restartedExec,
Duration.ofSeconds(15)
);
@@ -662,35 +660,33 @@ class ExecutionControllerRunnerTest {
assertThat(flow.isPresent()).isTrue();
// Restart execution and wait until it finishes
Execution restartedExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
Execution.class
);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(4);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
IntStream
.range(0, 2)
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
assertThat(restartedExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
});
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getId().equals(firstExecution.getId()) &&
execution.getTaskRunList().size() == 5 &&
execution.getState().isTerminated(),
() -> {
Execution restartedExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + firstExecution.getId() + "/restart", ImmutableMap.of()),
Execution.class
);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(4);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
IntStream
.range(0, 2)
.mapToObj(value -> restartedExec.getTaskRunList().get(value)).forEach(taskRun -> {
assertThat(taskRun.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(taskRun.getAttempts().size()).isEqualTo(1);
assertThat(restartedExec.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.RUNNING);
assertThat(restartedExec.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restartedExec.getTaskRunList().get(2).getAttempts()).isNotNull();
assertThat(restartedExec.getTaskRunList().get(3).getAttempts().size()).isEqualTo(1);
});
},
execution -> execution.getTaskRunList().size() == 5 && execution.getState().isTerminated(),
restartedExec,
Duration.ofSeconds(15)
);
@@ -1748,7 +1744,7 @@ class ExecutionControllerRunnerTest {
void shouldUnqueueExecutionAQueuedFlow() throws QueueException, TimeoutException {
// run a first flow so the second is queued
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/unqueue", null));
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
@@ -1756,7 +1752,7 @@ class ExecutionControllerRunnerTest {
// waiting for the flow to complete successfully
runnerUtils.awaitExecution(
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
() -> {},
result,
Duration.ofSeconds(10)
);
@@ -1777,7 +1773,7 @@ class ExecutionControllerRunnerTest {
void shouldUnqueueAQueuedFlowToCancelledState() throws QueueException, TimeoutException {
// run a first flow so the second is queued
runnerUtils.runOneUntilRunning(TENANT_ID, "io.kestra.tests", "flow-concurrency-queue");
Execution result1 = runUntilQueued("io.kestra.tests", "flow-concurrency-queue");
Execution result1 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
var cancelResponse = client.toBlocking().exchange(
HttpRequest.POST("/api/v1/executions/" + result1.getId() + "/unqueue?state=CANCELLED", null)
@@ -1794,9 +1790,9 @@ class ExecutionControllerRunnerTest {
void shouldUnqueueExecutionByIdsQueuedFlows() throws TimeoutException, QueueException {
// run a first flow so the others are queued
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result1 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result2 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result3 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result1 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
Execution result2 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
Execution result3 = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
BulkResponse response = client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/unqueue/by-ids", List.of(result1.getId(), result2.getId(), result3.getId())),
@@ -1810,7 +1806,7 @@ class ExecutionControllerRunnerTest {
void shouldForceRunExecutionAQueuedFlow() throws QueueException, TimeoutException {
// run a first flow so the second is queued
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runnerUtils.runOneUntil(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue", exec -> exec.getState().isQueued());
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/force-run", null));
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
@@ -1821,7 +1817,7 @@ class ExecutionControllerRunnerTest {
// waiting for the flow to complete successfully
runnerUtils.awaitExecution(
execution -> execution.getId().equals(result.getId()) && execution.getState().isSuccess(),
() -> {},
result,
Duration.ofSeconds(10)
);
}
@@ -2021,22 +2017,11 @@ class ExecutionControllerRunnerTest {
.build();
}
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
return runUntilState(namespace, flowId, State.Type.QUEUED);
}
private Execution createExecution(String namespace, String flowId) {
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
return Execution.newExecution(flow, null);
}
private Execution runUntilState(String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
Execution execution = this.createExecution(namespace, flowId);
return runnerUtils.awaitExecution(
it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state,
throwRunnable(() -> this.executionQueue.emit(execution)),
Duration.ofSeconds(1));
}
@Test
@LoadFlows({"flows/valids/minimal.yaml"})
@@ -2113,7 +2098,7 @@ class ExecutionControllerRunnerTest {
// wait for the exec to be terminated
Execution terminated = runnerUtils.awaitExecution(
it -> execution.getId().equals(it.getId()) && it.getState().isTerminated(),
() -> {},
execution,
Duration.ofSeconds(10));
assertThat(terminated.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(terminated.getTaskRunList()).hasSize(1);

View File

@@ -202,24 +202,24 @@ class KVControllerTest {
static Stream<Arguments> kvSetKeyValueArgs() {
return Stream.of(
Arguments.of("{\"hello\":\"world\"}", Map.class),
Arguments.of("[\"hello\",\"world\"]", List.class),
Arguments.of("\"hello\"", String.class),
Arguments.of("1", Integer.class),
Arguments.of("1.0", BigDecimal.class),
Arguments.of("true", Boolean.class),
Arguments.of("false", Boolean.class),
Arguments.of("2021-09-01", LocalDate.class),
Arguments.of("2021-09-01T01:02:03Z", Instant.class),
Arguments.of("\"PT5S\"", Duration.class)
Arguments.of(MediaType.TEXT_PLAIN, "{\"hello\":\"world\"}", Map.class),
Arguments.of(MediaType.TEXT_PLAIN, "[\"hello\",\"world\"]", List.class),
Arguments.of(MediaType.TEXT_PLAIN, "\"hello\"", String.class),
Arguments.of(MediaType.TEXT_PLAIN, "1", Integer.class),
Arguments.of(MediaType.TEXT_PLAIN, "1.0", BigDecimal.class),
Arguments.of(MediaType.TEXT_PLAIN, "true", Boolean.class),
Arguments.of(MediaType.TEXT_PLAIN, "false", Boolean.class),
Arguments.of(MediaType.TEXT_PLAIN, "2021-09-01", LocalDate.class),
Arguments.of(MediaType.TEXT_PLAIN, "2021-09-01T01:02:03Z", Instant.class),
Arguments.of(MediaType.TEXT_PLAIN, "\"PT5S\"", Duration.class)
);
}
@ParameterizedTest
@MethodSource("kvSetKeyValueArgs")
void setKeyValue(String value, Class<?> expectedClass) throws IOException, ResourceExpiredException {
void setKeyValue(MediaType mediaType, String value, Class<?> expectedClass) throws IOException, ResourceExpiredException {
String myDescription = "myDescription";
client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/my-key", value).header("ttl", "PT5M").header("description", myDescription));
client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/my-key", value).contentType(mediaType).header("ttl", "PT5M").header("description", myDescription));
KVStore kvStore = kvStore();
Class<?> valueClazz = kvStore.getValue("my-key").get().value().getClass();
@@ -294,7 +294,7 @@ class KVControllerTest {
assertThat(httpClientResponseException.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
assertThat(httpClientResponseException.getMessage()).isEqualTo(expectedErrorMessage);
httpClientResponseException = Assertions.assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/bad$key", "\"content\"")));
httpClientResponseException = Assertions.assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.PUT("/api/v1/main/namespaces/" + NAMESPACE + "/kv/bad$key", "\"content\"").contentType(MediaType.TEXT_PLAIN)));
assertThat(httpClientResponseException.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
assertThat(httpClientResponseException.getMessage()).isEqualTo(expectedErrorMessage);