Compare commits

...

2 Commits

Author SHA1 Message Date
Nicolas K.
35aef41622 Merge branch 'develop' into feat/use_tenant_on_execution_controller_test 2025-12-19 17:28:44 +01:00
nKwiatkowski
15443d80df feat: use tenants to remove flaky in execution controllers tests 2025-12-19 17:27:43 +01:00
2 changed files with 226 additions and 159 deletions

View File

@@ -28,6 +28,7 @@ import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFactory;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
@@ -36,6 +37,7 @@ import io.kestra.plugin.core.trigger.Webhook;
import io.kestra.webserver.responses.BulkErrorResponse;
import io.kestra.webserver.responses.BulkResponse;
import io.kestra.webserver.responses.PagedResults;
import io.kestra.webserver.tenants.TenantValidationFilter;
import io.micronaut.context.annotation.Property;
import io.micronaut.core.type.Argument;
import io.micronaut.data.model.Pageable;
@@ -46,12 +48,13 @@ import io.micronaut.http.client.multipart.MultipartBody;
import io.micronaut.http.sse.Event;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import io.micronaut.reactor.http.client.ReactorSseClient;
import io.micronaut.test.annotation.MockBean;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
@@ -85,6 +88,8 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Slf4j
@KestraTest(startRunner = true)
@@ -130,6 +135,18 @@ class ExecutionControllerRunnerTest {
@Inject
private NamespaceFactory namespaceFactory;
@MockBean(TenantService.class)
public TenantService getTenantService(){
return mock(TenantService.class);
}
@Inject
private TenantService tenantService;
@MockBean(TenantValidationFilter.class)
public TenantValidationFilter getTenantValidationFilter(){
return mock(TenantValidationFilter.class);
}
public static final String TESTS_FLOW_NS = "io.kestra.tests";
public static final String TENANT_ID = "main";
@@ -151,16 +168,17 @@ class ExecutionControllerRunnerTest {
- values""")
.build();
@AfterEach
protected void setup() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
@BeforeEach
public void initMock(){
when(tenantService.resolveTenant()).thenReturn(MAIN_TENANT);
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "triggerexecution")
void triggerExecution() {
Execution result = triggerExecutionInputsFlowExecution(false);
String tenantId = "triggerexecution";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution result = triggerExecutionInputsFlowExecution(tenantId, false);
assertThat(result.getState().getCurrent()).isEqualTo(State.Type.CREATED);
assertThat(result.getFlowId()).isEqualTo("inputs");
@@ -181,7 +199,7 @@ class ExecutionControllerRunnerTest {
var notFound = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(
HttpRequest
.POST("/api/v1/main/executions/foo/bar", createExecutionInputsFlowBody())
.POST("/api/v1/%s/executions/foo/bar".formatted(tenantId), createExecutionInputsFlowBody())
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
HttpResponse.class
));
@@ -211,8 +229,10 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/inputs-small-files.yaml"})
@LoadFlows(value = {"flows/valids/inputs-small-files.yaml"}, tenantId = "triggerexecutioninputsmall")
void triggerExecutionInputSmall() {
String tenantId = "triggerexecutioninputsmall";
when(tenantService.resolveTenant()).thenReturn(tenantId);
File applicationFile = new File(Objects.requireNonNull(
ExecutionControllerTest.class.getClassLoader().getResource("application-test.yml")
).getPath());
@@ -221,22 +241,24 @@ class ExecutionControllerRunnerTest {
.addPart("files", "f", MediaType.TEXT_PLAIN_TYPE, applicationFile)
.build();
Execution execution = triggerExecutionExecution(TESTS_FLOW_NS, "inputs-small-files", requestBody, true);
Execution execution = triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "inputs-small-files", requestBody, true);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat((String) execution.getOutputs().get("o")).startsWith("kestra://");
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "invalidinputs")
void invalidInputs() {
String tenantId = "invalidinputs";
when(tenantService.resolveTenant()).thenReturn(tenantId);
MultipartBody.Builder builder = MultipartBody.builder()
.addPart("validatedString", "B-failed");
inputs.forEach((s, o) -> builder.addPart(s, o instanceof String str ? str : null));
HttpClientResponseException e = assertThrows(
HttpClientResponseException.class,
() -> triggerExecutionExecution(TESTS_FLOW_NS, "inputs", builder.build(), false)
() -> triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "inputs", builder.build(), false)
);
String response = e.getResponse().getBody(String.class).orElseThrow();
@@ -246,22 +268,26 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "triggerexecutionandwait")
void triggerExecutionAndWait() {
Execution result = triggerExecutionInputsFlowExecution(true);
String tenantId = "triggerexecutionandwait";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution result = triggerExecutionInputsFlowExecution(tenantId, true);
assertThat(result.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(result.getTaskRunList().size()).isEqualTo(16);
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "getexecution")
void getExecution() {
Execution result = triggerExecutionInputsFlowExecution(false);
String tenantId = "getexecution";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution result = triggerExecutionInputsFlowExecution(tenantId, false);
// Get the triggered execution by execution id
Execution foundExecution = client.retrieve(
GET("/api/v1/main/executions/" + result.getId()),
GET("/api/v1/%s/executions/".formatted(tenantId) + result.getId()),
Execution.class
).block();
@@ -272,24 +298,26 @@ class ExecutionControllerRunnerTest {
@SuppressWarnings("unchecked")
@Test
@LoadFlows({"flows/valids/minimal-bis.yaml"})
@LoadFlows(value = {"flows/valids/minimal-bis.yaml"}, tenantId = "searchexecutionsbyflowid")
void searchExecutionsByFlowId() throws TimeoutException {
String tenantId = "searchexecutionsbyflowid";
when(tenantService.resolveTenant()).thenReturn(tenantId);
String namespace = "io.kestra.tests.minimal.bis";
String flowId = "minimal-bis";
PagedResults<Execution> executionsBefore = client.toBlocking().retrieve(
GET("/api/v1/main/executions?namespace=" + namespace + "&flowId=" + flowId),
GET("/api/v1/" + tenantId + "/executions?namespace=" + namespace + "&flowId=" + flowId),
Argument.of(PagedResults.class, Execution.class)
);
assertThat(executionsBefore.getTotal()).isEqualTo(0L);
triggerExecutionExecution(namespace, flowId, MultipartBody.builder().addPart("string", "myString").build(), false);
triggerExecutionExecution(tenantId, namespace, flowId, MultipartBody.builder().addPart("string", "myString").build(), false);
// Wait for execution indexation
Await.until(() -> executionRepositoryInterface.findByFlowId(TENANT_ID, namespace, flowId, Pageable.from(1)).size() == 1, Duration.ofMillis(100), Duration.ofMillis(10));
Await.until(() -> executionRepositoryInterface.findByFlowId(tenantId, namespace, flowId, Pageable.from(1)).size() == 1, Duration.ofMillis(100), Duration.ofMillis(10));
PagedResults<Execution> executionsAfter = client.toBlocking().retrieve(
GET("/api/v1/main/executions?namespace=" + namespace + "&flowId=" + flowId),
GET("/api/v1/" + tenantId + "/executions?namespace=" + namespace + "&flowId=" + flowId),
Argument.of(PagedResults.class, Execution.class)
);
@@ -297,12 +325,14 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "triggerexecutionandfollowexecution")
void triggerExecutionAndFollowExecution() {
Execution result = triggerExecutionInputsFlowExecution(false);
String tenantId = "triggerexecutionandfollowexecution";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution result = triggerExecutionInputsFlowExecution(tenantId, false);
List<Event<Execution>> results = sseClient
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow", Execution.class)
.eventStream("/api/v1/%s/executions/".formatted(tenantId) + result.getId() + "/follow", Execution.class)
.collectList()
.block();
@@ -314,7 +344,7 @@ class ExecutionControllerRunnerTest {
// check that a second call work: calling follow on an already terminated execution.
results = sseClient
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow", Execution.class)
.eventStream("/api/v1/%s/executions/".formatted(tenantId) + result.getId() + "/follow", Execution.class)
.collectList()
.block();
@@ -346,10 +376,12 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/inputs.yaml",
"flows/valids/encrypted-string.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml",
"flows/valids/encrypted-string.yaml"}, tenantId = "evaltaskrunexpressionkeepencryptedvalues")
void evalTaskRunExpressionKeepEncryptedValues() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "encrypted-string");
String tenantId = "evaltaskrunexpressionkeepencryptedvalues";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution execution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "encrypted-string");
ExecutionController.EvalResult result = this.evalTaskRunExpression(execution, "{{outputs.hello.value}}", 0);
Map<String, Object> resultMap = null;
@@ -361,24 +393,26 @@ class ExecutionControllerRunnerTest {
assertThat(resultMap.get("type")).isEqualTo("io.kestra.datatype:aes_encrypted");
assertThat(resultMap.get("value")).isNotNull();
execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
execution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
result = this.evalTaskRunExpression(execution, "{{inputs.secret}}", 0);
assertThat(result.getResult()).isNotEqualTo(inputs.get("secret"));
}
@Test
@LoadFlows({"flows/valids/restart_with_inputs.yaml"})
@LoadFlows(value = {"flows/valids/restart_with_inputs.yaml"}, tenantId = "restartexecutionfromunknowntaskid")
void restartExecutionFromUnknownTaskId() throws TimeoutException, QueueException {
String tenantId = "restartexecutionfromunknowntaskid";
when(tenantService.resolveTenant()).thenReturn(tenantId);
final String flowId = "restart_with_inputs";
final String referenceTaskId = "unknownTaskId";
// Run execution until it ends
Execution parentExecution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
Execution parentExecution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + referenceTaskId, ImmutableMap.of()),
.POST("/api/v1/" + tenantId + "/executions/" + parentExecution.getId() + "/replay?taskRunId=" + referenceTaskId, ImmutableMap.of()),
Execution.class
));
@@ -388,16 +422,18 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/restart_with_inputs.yaml"})
@LoadFlows(value = {"flows/valids/restart_with_inputs.yaml"}, tenantId = "restartexecutionwithnofailure")
void restartExecutionWithNoFailure() throws TimeoutException, QueueException{
String tenantId = "restartexecutionwithnofailure";
when(tenantService.resolveTenant()).thenReturn(tenantId);
final String flowId = "restart_with_inputs";
// Run execution until it ends
Execution parentExecution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
Execution parentExecution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/restart", ImmutableMap.of()),
.POST("/api/v1/" + tenantId + "/executions/" + parentExecution.getId() + "/restart", ImmutableMap.of()),
Execution.class
));
@@ -407,15 +443,17 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/restart_with_inputs.yaml"})
@LoadFlows(value = {"flows/valids/restart_with_inputs.yaml"}, tenantId = "restartexecutionfromtaskid")
void restartExecutionFromTaskId() throws Exception {
String tenantId = "restartexecutionfromtaskid";
when(tenantService.resolveTenant()).thenReturn(tenantId);
final String flowId = "restart_with_inputs";
final String referenceTaskId = "instant";
// Run execution until it ends
Execution parentExecution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
Execution parentExecution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
Optional<Flow> flow = flowRepositoryInterface.findById(TENANT_ID, TESTS_FLOW_NS, flowId);
Optional<Flow> flow = flowRepositoryInterface.findById(tenantId, TESTS_FLOW_NS, flowId);
assertThat(flow.isPresent()).isTrue();
@@ -427,7 +465,7 @@ class ExecutionControllerRunnerTest {
Execution createdChidExec = client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
.POST("/api/v1/" + tenantId + "/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
Execution.class
);
@@ -722,22 +760,24 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "downloadinternalstoragefilefromexecution")
void downloadInternalStorageFileFromExecution() throws TimeoutException, QueueException{
Execution execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
String tenantId = "downloadinternalstoragefilefromexecution";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution execution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
assertThat(execution.getTaskRunList()).hasSize(16);
String path = (String) execution.getInputs().get("file");
String file = client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + execution.getId() + "/file?path=" + path),
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file?path=" + path),
String.class
);
assertThat(file).isEqualTo("hello");
FileMetas metas = client.retrieve(
GET("/api/v1/main/executions/" + execution.getId() + "/file/metas?path=" + path),
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/metas?path=" + path),
FileMetas.class
).block();
@@ -748,7 +788,7 @@ class ExecutionControllerRunnerTest {
String newExecutionId = IdUtils.create();
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + execution.getId() + "/file?path=" + path.replace(execution.getId(),
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file?path=" + path.replace(execution.getId(),
newExecutionId
)),
String.class
@@ -760,15 +800,17 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "previewinternalstoragefilefromexecution")
void previewInternalStorageFileFromExecution() throws TimeoutException, QueueException{
Execution defaultExecution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
String tenantId = "previewinternalstoragefilefromexecution";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution defaultExecution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
assertThat(defaultExecution.getTaskRunList()).hasSize(16);
String defaultPath = (String) defaultExecution.getInputs().get("file");
String defaultFile = client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + defaultExecution.getId() + "/file/preview?path=" + defaultPath),
GET("/api/v1/" + tenantId + "/executions/" + defaultExecution.getId() + "/file/preview?path=" + defaultPath),
String.class
);
@@ -788,20 +830,20 @@ class ExecutionControllerRunnerTest {
.put("yaml1", "{}")
.build();
Execution latin1Execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, latin1FileInputs));
Execution latin1Execution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, latin1FileInputs));
assertThat(latin1Execution.getTaskRunList()).hasSize(16);
String latin1Path = (String) latin1Execution.getInputs().get("file");
String latin1File = client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + latin1Execution.getId() + "/file/preview?path=" + latin1Path + "&encoding=ISO-8859-1"),
GET("/api/v1/" + tenantId + "/executions/" + latin1Execution.getId() + "/file/preview?path=" + latin1Path + "&encoding=ISO-8859-1"),
String.class
);
assertThat(latin1File).contains("Düsseldorf");
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + latin1Execution.getId() + "/file/preview?path=" + latin1Path + "&encoding=foo"),
GET("/api/v1/" + tenantId + "/executions/" + latin1Execution.getId() + "/file/preview?path=" + latin1Path + "&encoding=foo"),
String.class
));
@@ -810,14 +852,16 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "previewlocalfilefromexecution")
void previewLocalFileFromExecution() throws TimeoutException, QueueException, IOException {
String tenantId = "previewlocalfilefromexecution";
when(tenantService.resolveTenant()).thenReturn(tenantId);
HashMap<String, Object> newInputs = new HashMap<>(InputsTest.inputs);
URI file = createFile();
newInputs.put("file", file);
Execution execution = runnerUtils.runOne(
MAIN_TENANT,
tenantId,
"io.kestra.tests",
"inputs",
null,
@@ -827,7 +871,7 @@ class ExecutionControllerRunnerTest {
// get the metadata of the file
FileMetas metas = client.retrieve(
GET("/api/v1/main/executions/" + execution.getId() + "/file/metas?path=" + file),
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/metas?path=" + file),
FileMetas.class
).block();
assertThat(metas).isNotNull();
@@ -835,7 +879,7 @@ class ExecutionControllerRunnerTest {
// preview the file
Map<String, Object> preview = client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + execution.getId() + "/file/preview?path=" + file),
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/preview?path=" + file),
Map.class
);
assertThat(preview).isNotNull();
@@ -844,21 +888,23 @@ class ExecutionControllerRunnerTest {
// download the file
String content = client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + execution.getId() + "/file?path=" + file),
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file?path=" + file),
String.class
);
assertThat(content).isEqualTo("Hello World");
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"})
void previewNsFileFromExecution() throws TimeoutException, QueueException, IOException, URISyntaxException {
String tenantId = MAIN_TENANT;
when(tenantService.resolveTenant()).thenReturn(tenantId);
HashMap<String, Object> newInputs = new HashMap<>(InputsTest.inputs);
URI file = createNsFile(false);
newInputs.put("file", file);
Execution execution = runnerUtils.runOne(
MAIN_TENANT,
tenantId,
"io.kestra.tests",
"inputs",
null,
@@ -868,7 +914,7 @@ class ExecutionControllerRunnerTest {
// get the metadata of the file
FileMetas metas = client.retrieve(
GET("/api/v1/main/executions/" + execution.getId() + "/file/metas?path=" + file),
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/metas?path=" + file),
FileMetas.class
).block();
assertThat(metas).isNotNull();
@@ -876,7 +922,7 @@ class ExecutionControllerRunnerTest {
// preview the file
Map<String, Object> preview = client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + execution.getId() + "/file/preview?path=" + file),
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/preview?path=" + file),
Map.class
);
assertThat(preview).isNotNull();
@@ -885,7 +931,7 @@ class ExecutionControllerRunnerTest {
// download the file
String content = client.toBlocking().retrieve(
GET("/api/v1/main/executions/" + execution.getId() + "/file?path=" + file),
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file?path=" + file),
String.class
);
assertThat(content).isEqualTo("Hello World");
@@ -1109,17 +1155,19 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/pause-test.yaml"})
void resumeExecutionByQuery() throws TimeoutException, InterruptedException, QueueException {
Execution pausedExecution1 = runnerUtils.runOneUntilPaused(TENANT_ID, TESTS_FLOW_NS, "pause-test");
Execution pausedExecution2 = runnerUtils.runOneUntilPaused(TENANT_ID, TESTS_FLOW_NS, "pause-test");
@LoadFlows(value = {"flows/valids/pause-test.yaml"}, tenantId = "resumeexecutionbyquery")
void resumeExecutionByQuery() throws TimeoutException, QueueException {
String tenantId = "resumeexecutionbyquery";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution pausedExecution1 = runnerUtils.runOneUntilPaused(tenantId, TESTS_FLOW_NS, "pause-test");
Execution pausedExecution2 = runnerUtils.runOneUntilPaused(tenantId, TESTS_FLOW_NS, "pause-test");
assertThat(pausedExecution1.getState().isPaused()).isTrue();
assertThat(pausedExecution2.getState().isPaused()).isTrue();
// resume executions
BulkResponse resumeResponse = client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/resume/by-query?namespace=" + TESTS_FLOW_NS, null),
HttpRequest.POST("/api/v1/" + tenantId + "/executions/resume/by-query?namespace=" + TESTS_FLOW_NS, null),
BulkResponse.class
);
assertThat(resumeResponse.getCount()).isEqualTo(2);
@@ -1132,7 +1180,7 @@ class ExecutionControllerRunnerTest {
HttpClientResponseException e = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().retrieve(HttpRequest.POST(
"/api/v1/main/executions/resume/by-query?namespace=" + TESTS_FLOW_NS, null
"/api/v1/" + tenantId + "/executions/resume/by-query?namespace=" + TESTS_FLOW_NS, null
))
);
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.BAD_REQUEST.getCode());
@@ -1371,32 +1419,34 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "searchexecutions")
void searchExecutions() {
String tenantId = "searchexecutions";
when(tenantService.resolveTenant()).thenReturn(tenantId);
PagedResults<?> executions = client.toBlocking().retrieve(
GET("/api/v1/main/executions/search"), PagedResults.class
GET("/api/v1/" + tenantId + "/executions/search"), PagedResults.class
);
assertThat(executions.getTotal()).isEqualTo(0L);
triggerExecutionInputsFlowExecution(false);
triggerExecutionInputsFlowExecution(tenantId, false);
// + is there to simulate that a space was added (this can be the case from UI autocompletion for eg.)
executions = client.toBlocking().retrieve(
GET("/api/v1/main/executions/search?page=1&size=25&filters[labels][EQUALS][url]="+ENCODED_URL_LABEL_VALUE), PagedResults.class
GET("/api/v1/" + tenantId + "/executions/search?page=1&size=25&filters[labels][EQUALS][url]="+ENCODED_URL_LABEL_VALUE), PagedResults.class
);
assertThat(executions.getTotal()).isEqualTo(1L);
executions = client.toBlocking().retrieve(
GET("/api/v1/main/executions/search?page=1&size=25&labels=url:"+ENCODED_URL_LABEL_VALUE), PagedResults.class
GET("/api/v1/" + tenantId + "/executions/search?page=1&size=25&labels=url:"+ENCODED_URL_LABEL_VALUE), PagedResults.class
);
assertThat(executions.getTotal()).isEqualTo(1L);
HttpClientResponseException e = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?filters[startDate][EQUALS]=2024-01-07T18:43:11.248%2B01:00&filters[timeRange][EQUALS]=PT12H"))
() -> client.toBlocking().retrieve(GET("/api/v1/" + tenantId + "/executions/search?filters[startDate][EQUALS]=2024-01-07T18:43:11.248%2B01:00&filters[timeRange][EQUALS]=PT12H"))
);
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
@@ -1404,39 +1454,33 @@ class ExecutionControllerRunnerTest {
assertThat(e.getResponse().getBody(String.class).get()).contains("are mutually exclusive");
executions = client.toBlocking().retrieve(
GET("/api/v1/main/executions/search?filters[timeRange][EQUALS]=PT12H"), PagedResults.class
GET("/api/v1/" + tenantId + "/executions/search?filters[timeRange][EQUALS]=PT12H"), PagedResults.class
);
assertThat(executions.getTotal()).isEqualTo(1L);
executions = client.toBlocking().retrieve(
GET("/api/v1/main/executions/search?timeRange=PT12H"), PagedResults.class
GET("/api/v1/" + tenantId + "/executions/search?timeRange=PT12H"), PagedResults.class
);
assertThat(executions.getTotal()).isEqualTo(1L);
e = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?filters[timeRange][EQUALS]=P1Y"))
() -> client.toBlocking().retrieve(GET("/api/v1/" + tenantId + "/executions/search?filters[timeRange][EQUALS]=P1Y"))
);
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
e = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?timeRange=P1Y"))
);
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
e = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?page=1&size=-1"))
() -> client.toBlocking().retrieve(GET("/api/v1/" + tenantId + "/executions/search?page=1&size=-1"))
);
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
e = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?page=0"))
() -> client.toBlocking().retrieve(GET("/api/v1/" + tenantId + "/executions/search?page=0"))
);
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
@@ -1537,14 +1581,16 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/minimal.yaml"})
@LoadFlows(value = {"flows/valids/minimal.yaml"}, tenantId = "setlabelsonterminatedexecutionsbyquery")
void setLabelsOnTerminatedExecutionsByQuery() throws TimeoutException, QueueException {
Execution result1 = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
Execution result2 = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
Execution result3 = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
String tenantId = "setlabelsonterminatedexecutionsbyquery";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution result1 = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
Execution result2 = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
Execution result3 = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
BulkResponse response = client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/labels/by-query?namespace=" + result1.getNamespace(),
HttpRequest.POST("/api/v1/" + tenantId + "/executions/labels/by-query?namespace=" + result1.getNamespace(),
List.of(new Label("key", "value"))
),
BulkResponse.class
@@ -1555,7 +1601,7 @@ class ExecutionControllerRunnerTest {
var exception = assertThrows(
HttpClientResponseException.class,
() -> client.toBlocking().exchange(HttpRequest.POST(
"/api/v1/main/executions/labels/by-query?namespace=" + result1.getNamespace(),
"/api/v1/" + tenantId + "/executions/labels/by-query?namespace=" + result1.getNamespace(),
List.of(new Label(null, null)))
)
);
@@ -1753,14 +1799,16 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue.yml",
"flows/valids/minimal.yaml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml",
"flows/valids/minimal.yaml"}, tenantId = "shouldunqueueexecutionaqueuedflow")
void shouldUnqueueExecutionAQueuedFlow() throws QueueException, TimeoutException {
String tenantId = "shouldunqueueexecutionaqueuedflow";
when(tenantService.resolveTenant()).thenReturn(tenantId);
// run a first flow so the second is queued
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
runnerUtils.runOneUntilRunning(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/unqueue", null));
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result.getId() + "/unqueue", null));
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
// waiting for the flow to complete successfully
@@ -1771,60 +1819,66 @@ class ExecutionControllerRunnerTest {
);
var notFound = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/notfound/unqueue", null)));
var notFound = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/notfound/unqueue", null)));
assertThat(notFound.getStatus().getCode()).isEqualTo(HttpStatus.NOT_FOUND.getCode());
// pausing an already completed flow will result in errors
Execution completed = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
Execution completed = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
var notRunning = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + completed.getId() + "/unqueue", null)));
var notRunning = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + completed.getId() + "/unqueue", null)));
assertThat(notRunning.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue.yml",
"flows/valids/minimal.yaml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml",
"flows/valids/minimal.yaml"}, tenantId = "shouldunqueueaqueuedflowtocancelledstate")
void shouldUnqueueAQueuedFlowToCancelledState() throws QueueException, TimeoutException {
String tenantId = "shouldunqueueaqueuedflowtocancelledstate";
when(tenantService.resolveTenant()).thenReturn(tenantId);
// run a first flow so the second is queued
runnerUtils.runOneUntilRunning(TENANT_ID, "io.kestra.tests", "flow-concurrency-queue");
Execution result1 = runUntilQueued("io.kestra.tests", "flow-concurrency-queue");
runnerUtils.runOneUntilRunning(tenantId, "io.kestra.tests", "flow-concurrency-queue");
Execution result1 = runUntilQueued(tenantId, "io.kestra.tests", "flow-concurrency-queue");
var cancelResponse = client.toBlocking().exchange(
HttpRequest.POST("/api/v1/executions/" + result1.getId() + "/unqueue?state=CANCELLED", null)
HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result1.getId() + "/unqueue?state=CANCELLED", null)
);
assertThat(cancelResponse.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
Optional<Execution> cancelledExecution = executionRepositoryInterface.findById(TENANT_ID, result1.getId());
Optional<Execution> cancelledExecution = executionRepositoryInterface.findById(tenantId, result1.getId());
assertThat(cancelledExecution.isPresent()).isTrue();
assertThat(cancelledExecution.get().getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "shouldunqueueexecutionbyidsqueuedflows")
void shouldUnqueueExecutionByIdsQueuedFlows() throws TimeoutException, QueueException {
String tenantId = "shouldunqueueexecutionbyidsqueuedflows";
when(tenantService.resolveTenant()).thenReturn(tenantId);
// run a first flow so the others are queued
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result1 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result2 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result3 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
runnerUtils.runOneUntilRunning(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result1 = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result2 = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result3 = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
BulkResponse response = client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/unqueue/by-ids", List.of(result1.getId(), result2.getId(), result3.getId())),
HttpRequest.POST("/api/v1/" + tenantId + "/executions/unqueue/by-ids", List.of(result1.getId(), result2.getId(), result3.getId())),
BulkResponse.class
);
assertThat(response.getCount()).isEqualTo(3);
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "shouldforcerunexecutionaqueuedflow")
void shouldForceRunExecutionAQueuedFlow() throws QueueException, TimeoutException {
String tenantId = "shouldforcerunexecutionaqueuedflow";
when(tenantService.resolveTenant()).thenReturn(tenantId);
// run a first flow so the second is queued
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
runnerUtils.runOneUntilRunning(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
Execution result = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/force-run", null));
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result.getId() + "/force-run", null));
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
Optional<Execution> forcedRun = executionRepositoryInterface.findById(TENANT_ID, result.getId());
Optional<Execution> forcedRun = executionRepositoryInterface.findById(tenantId, result.getId());
assertThat(forcedRun.isPresent()).isTrue();
assertThat(forcedRun.get().getState().getCurrent()).isNotEqualTo(State.Type.QUEUED);
@@ -1850,14 +1904,16 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/minimal.yaml"})
void shouldForceRunExecutionACreatedFlow() throws QueueException, TimeoutException {
Execution result = this.createExecution(TESTS_FLOW_NS, "minimal");
@LoadFlows(value = {"flows/valids/minimal.yaml"}, tenantId = "shouldforcerunexecutionacreatedflow")
void shouldForceRunExecutionACreatedFlow() throws QueueException {
String tenantId = "shouldforcerunexecutionacreatedflow";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution result = this.createExecution(tenantId, TESTS_FLOW_NS, "minimal");
this.executionQueue.emit(result);
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/force-run", null));
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result.getId() + "/force-run", null));
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
Optional<Execution> forcedRun = executionRepositoryInterface.findById(TENANT_ID, result.getId());
Optional<Execution> forcedRun = executionRepositoryInterface.findById(tenantId, result.getId());
assertThat(forcedRun.isPresent()).isTrue();
assertThat(forcedRun.get().getState().getCurrent()).isNotEqualTo(State.Type.CREATED);
}
@@ -1978,7 +2034,7 @@ class ExecutionControllerRunnerTest {
return client.toBlocking().retrieve(
HttpRequest
.POST(
"/api/v1/main/executions/" + execution.getId() + "/eval/" + execution.getTaskRunList().get(index).getId(),
"/api/v1/" + execution.getTenantId() + "/executions/" + execution.getId() + "/eval/" + execution.getTaskRunList().get(index).getId(),
expression
)
.contentType(MediaType.TEXT_PLAIN_TYPE),
@@ -1987,23 +2043,23 @@ class ExecutionControllerRunnerTest {
}
private Execution triggerExecutionExecution(String namespace, String flowId, MultipartBody requestBody, Boolean wait) {
return triggerExecutionExecution(namespace, flowId, requestBody, wait, null);
private Execution triggerExecutionExecution(String tenantId, String namespace, String flowId, MultipartBody requestBody, Boolean wait) {
return triggerExecutionExecution(tenantId, namespace, flowId, requestBody, wait, null);
}
private Execution triggerExecutionExecution(String namespace, String flowId, MultipartBody requestBody, Boolean wait, String breakpoint) {
private Execution triggerExecutionExecution(String tenantId, String namespace, String flowId, MultipartBody requestBody, Boolean wait, String breakpoint) {
return client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/main/executions/" + namespace + "/" + flowId + "?labels=a:label-1&labels=b:label-2&labels=url:" + ENCODED_URL_LABEL_VALUE + (wait ? "&wait=true" : "") + (breakpoint != null ? "&breakpoints=" + breakpoint : ""), requestBody)
.POST("/api/v1/" + tenantId + "/executions/" + namespace + "/" + flowId + "?labels=a:label-1&labels=b:label-2&labels=url:" + ENCODED_URL_LABEL_VALUE + (wait ? "&wait=true" : "") + (breakpoint != null ? "&breakpoints=" + breakpoint : ""), requestBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
}
private Execution triggerExecutionInputsFlowExecution(Boolean wait) {
private Execution triggerExecutionInputsFlowExecution(String tenantId, Boolean wait) {
MultipartBody requestBody = createExecutionInputsFlowBody();
return triggerExecutionExecution(TESTS_FLOW_NS, "inputs", requestBody, wait);
return triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "inputs", requestBody, wait);
}
private MultipartBody createExecutionInputsFlowBody() {
@@ -2031,17 +2087,17 @@ class ExecutionControllerRunnerTest {
.build();
}
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
return runUntilState(namespace, flowId, State.Type.QUEUED);
private Execution runUntilQueued(String tenantId, String namespace, String flowId) throws TimeoutException, QueueException {
return runUntilState(tenantId, namespace, flowId, State.Type.QUEUED);
}
private Execution createExecution(String namespace, String flowId) {
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
private Execution createExecution(String tenantId, String namespace, String flowId) {
Flow flow = flowRepositoryInterface.findById(tenantId, namespace, flowId).orElseThrow();
return Execution.newExecution(flow, null);
}
private Execution runUntilState(String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
Execution execution = this.createExecution(namespace, flowId);
private Execution runUntilState(String tenantId, String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
Execution execution = this.createExecution(tenantId, namespace, flowId);
return runnerUtils.awaitExecution(
it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state,
throwRunnable(() -> this.executionQueue.emit(execution)),
@@ -2086,14 +2142,16 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/minimal.yaml"})
@LoadFlows(value = {"flows/valids/minimal.yaml"}, tenantId = "shouldnotallowaddingsystemlabels")
void shouldNotAllowAddingSystemLabels() throws QueueException, TimeoutException {
Execution result = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
String tenantId = "shouldnotallowaddingsystemlabels";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution result = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
assertThat(result.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
List<Label> systemLabels = List.of(new Label("system.key", "system-value"));
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/labels", systemLabels),
HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result.getId() + "/labels", systemLabels),
Execution.class
));
@@ -2102,9 +2160,11 @@ class ExecutionControllerRunnerTest {
}
@Test
@LoadFlows({"flows/valids/minimal.yaml"})
void shouldSuspendAtBreakpointThenResume() throws QueueException, TimeoutException, InterruptedException {
Execution execution = triggerExecutionExecution(TESTS_FLOW_NS, "minimal", null, false, "date");
@LoadFlows(value = {"flows/valids/minimal.yaml"}, tenantId = "shouldsuspendatbreakpointthenresume")
void shouldSuspendAtBreakpointThenResume() throws TimeoutException {
String tenantId = "shouldsuspendatbreakpointthenresume";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution execution = triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "minimal", null, false, "date");
assertThat(execution).isNotNull();
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.CREATED);
@@ -2115,7 +2175,7 @@ class ExecutionControllerRunnerTest {
// resume the suspended execution
HttpResponse<Void> resume = client.toBlocking().exchange(
HttpRequest.POST("/api/v1/main/executions/" + suspended.getId() + "/resume-from-breakpoint", null),
HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + suspended.getId() + "/resume-from-breakpoint", null),
Void.class
);
assertThat(resume.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
@@ -2132,15 +2192,20 @@ class ExecutionControllerRunnerTest {
@FlakyTest
@Test
@LoadFlows({"flows/valids/subflow-parent.yaml", "flows/valids/subflow-child.yaml", "flows/valids/subflow-grand-child.yaml"})
@LoadFlows(value = {"flows/valids/subflow-parent.yaml",
"flows/valids/subflow-child.yaml",
"flows/valids/subflow-grand-child.yaml"},
tenantId = "triggerexecutionandfollowdependencies")
void triggerExecutionAndFollowDependencies() throws InterruptedException {
Execution result = triggerExecutionExecution(TESTS_FLOW_NS, "subflow-parent", null, true);
String tenantId = "triggerexecutionandfollowdependencies";
when(tenantService.resolveTenant()).thenReturn(tenantId);
Execution result = triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "subflow-parent", null, true);
// without this slight delay, the event stream may miss some 'end' events
Thread.sleep(500);
List<Event<ExecutionStatusEvent>> results = sseClient
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow-dependencies?expandAll=true", ExecutionStatusEvent.class)
.eventStream("/api/v1/" + tenantId + "/executions/" + result.getId() + "/follow-dependencies?expandAll=true", ExecutionStatusEvent.class)
.collectList()
.block();
@@ -2154,7 +2219,7 @@ class ExecutionControllerRunnerTest {
// check that a second call work: calling follow on an already terminated execution.
results = sseClient
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow-dependencies?expandAll=true", ExecutionStatusEvent.class)
.eventStream("/api/v1/" + tenantId + "/executions/" + result.getId() + "/follow-dependencies?expandAll=true", ExecutionStatusEvent.class)
.collectList()
.block();
@@ -2168,7 +2233,7 @@ class ExecutionControllerRunnerTest {
// check that a without expandAll it would return only the immediate dependencies.
results = sseClient
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow-dependencies", ExecutionStatusEvent.class)
.eventStream("/api/v1/" + tenantId + "/executions/" + result.getId() + "/follow-dependencies", ExecutionStatusEvent.class)
.collectList()
.block();

View File

@@ -2,6 +2,7 @@ package io.kestra.webserver.controllers.api;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
@@ -10,7 +11,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.TaskForExecution;
import io.kestra.core.models.triggers.AbstractTriggerForExecution;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.plugin.core.debug.Return;
import io.kestra.webserver.responses.BulkResponse;
@@ -22,10 +22,8 @@ import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.multipart.MultipartBody;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
@@ -68,15 +66,6 @@ class ExecutionControllerTest {
public static final String TESTS_FLOW_NS = "io.kestra.tests";
public static final String TESTS_WEBHOOK_KEY = "a-secret-key";
@SneakyThrows
@BeforeEach
protected void setup() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
TestsUtils.loads(MAIN_TENANT, repositoryLoader);
}
@Test
void getExecutionNotFound() {
HttpClientResponseException e = assertThrows(
@@ -177,6 +166,7 @@ class ExecutionControllerTest {
}
@Test
@LoadFlows(value = {"flows/valids/webhook-dynamic-key.yaml"})
void webhookDynamicKey() {
Execution execution = client.toBlocking().retrieve(
GET(
@@ -190,6 +180,7 @@ class ExecutionControllerTest {
}
@Test
@LoadFlows(value = {"flows/valids/webhook-secret-key.yaml"})
@EnabledIfEnvironmentVariable(named = "SECRET_WEBHOOK_KEY", matches = ".*")
void webhookDynamicKeyFromASecret() {
Execution execution = client.toBlocking().retrieve(
@@ -204,6 +195,7 @@ class ExecutionControllerTest {
}
@Test
@LoadFlows(value = {"flows/valids/webhook-with-condition.yaml"})
void webhookWithCondition() {
record Hello(String hello) {}
@@ -232,6 +224,7 @@ class ExecutionControllerTest {
}
@Test
@LoadFlows(value = {"flows/valids/webhook-inputs.yaml"})
void webhookWithInputs() {
record Hello(String hello) {}
@@ -289,6 +282,7 @@ class ExecutionControllerTest {
@SuppressWarnings("DataFlowIssue")
@Test
@LoadFlows(value = {"flows/valids/full.yaml"})
void getExecutionFlowForExecution() {
FlowForExecution result = client.toBlocking().retrieve(
GET("/api/v1/main/executions/flows/io.kestra.tests/full"),
@@ -301,6 +295,7 @@ class ExecutionControllerTest {
}
@Test
@LoadFlows(value = {"flows/valids/full.yaml"})
void getExecutionFlowForExecutionWithOldUrl() {
FlowForExecution result = client.toBlocking().retrieve(
GET("/api/v1/main/executions/flows/io.kestra.tests/full"),
@@ -314,6 +309,7 @@ class ExecutionControllerTest {
@SuppressWarnings("DataFlowIssue")
@Test
@LoadFlows(value = {"flows/valids/webhook.yaml"})
void getExecutionFlowForExecutionById() {
Execution execution = client.toBlocking().retrieve(
HttpRequest
@@ -336,24 +332,26 @@ class ExecutionControllerTest {
@SuppressWarnings("unchecked")
@Test
@LoadFlows(value = {"flows/valids/minimal.yaml"})
void getExecutionDistinctNamespaceExecutables() {
List<String> result = client.toBlocking().retrieve(
GET("/api/v1/main/executions/namespaces"),
Argument.of(List.class, String.class)
);
assertThat(result.size()).isGreaterThanOrEqualTo(5);
assertThat(result.size()).isGreaterThanOrEqualTo(1);
}
@SuppressWarnings("unchecked")
@Test
@LoadFlows(value = {"flows/valids/webhook.yaml", "flows/valids/minimal.yaml"})
void getExecutionFlowFromNamespace() {
List<FlowForExecution> result = client.toBlocking().retrieve(
GET("/api/v1/main/executions/namespaces/io.kestra.tests/flows"),
Argument.of(List.class, FlowForExecution.class)
);
assertThat(result.size()).isGreaterThan(100);
assertThat(result.size()).isGreaterThanOrEqualTo(2);
}
@Test
@@ -378,6 +376,7 @@ class ExecutionControllerTest {
}
@Test
@LoadFlows(value = {"flows/valids/inputs.yaml"})
void commaInSingleLabelsValue() {
String encodedCommaWithinLabel = URLEncoder.encode("project:foo,bar", StandardCharsets.UTF_8);
@@ -448,6 +447,7 @@ class ExecutionControllerTest {
}
@Test
@LoadFlows(value = {"flows/valids/minimal.yaml"})
void scheduleDate() {
// given
ZonedDateTime now = ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS).plusSeconds(1);
@@ -489,6 +489,7 @@ class ExecutionControllerTest {
}
@Test
@LoadFlows(value = {"flows/valids/minimal.yaml"})
void shouldHaveAnUrlWhenCreated() {
// ExecutionController.ExecutionResponse cannot be deserialized because it didn't have any default constructor.
// adding it would mean updating the Execution itself, which is too annoying, so for the test we just deserialize to a Map.
@@ -504,6 +505,7 @@ class ExecutionControllerTest {
}
@Test
@LoadFlows(value = {"flows/valids/minimal.yaml"})
void shouldRefuseSystemLabelsWhenCreatingAnExecution() {
var error = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
HttpRequest