mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 20:00:14 -05:00
Compare commits
2 Commits
develop
...
feat/use_t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
35aef41622 | ||
|
|
15443d80df |
@@ -28,6 +28,7 @@ import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
@@ -36,6 +37,7 @@ import io.kestra.plugin.core.trigger.Webhook;
|
||||
import io.kestra.webserver.responses.BulkErrorResponse;
|
||||
import io.kestra.webserver.responses.BulkResponse;
|
||||
import io.kestra.webserver.responses.PagedResults;
|
||||
import io.kestra.webserver.tenants.TenantValidationFilter;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
@@ -46,12 +48,13 @@ import io.micronaut.http.client.multipart.MultipartBody;
|
||||
import io.micronaut.http.sse.Event;
|
||||
import io.micronaut.reactor.http.client.ReactorHttpClient;
|
||||
import io.micronaut.reactor.http.client.ReactorSseClient;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@@ -85,6 +88,8 @@ import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@Slf4j
|
||||
@KestraTest(startRunner = true)
|
||||
@@ -130,6 +135,18 @@ class ExecutionControllerRunnerTest {
|
||||
@Inject
|
||||
private NamespaceFactory namespaceFactory;
|
||||
|
||||
@MockBean(TenantService.class)
|
||||
public TenantService getTenantService(){
|
||||
return mock(TenantService.class);
|
||||
}
|
||||
@Inject
|
||||
private TenantService tenantService;
|
||||
|
||||
@MockBean(TenantValidationFilter.class)
|
||||
public TenantValidationFilter getTenantValidationFilter(){
|
||||
return mock(TenantValidationFilter.class);
|
||||
}
|
||||
|
||||
public static final String TESTS_FLOW_NS = "io.kestra.tests";
|
||||
public static final String TENANT_ID = "main";
|
||||
|
||||
@@ -151,16 +168,17 @@ class ExecutionControllerRunnerTest {
|
||||
- values""")
|
||||
.build();
|
||||
|
||||
@AfterEach
|
||||
protected void setup() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
@BeforeEach
|
||||
public void initMock(){
|
||||
when(tenantService.resolveTenant()).thenReturn(MAIN_TENANT);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "triggerexecution")
|
||||
void triggerExecution() {
|
||||
Execution result = triggerExecutionInputsFlowExecution(false);
|
||||
String tenantId = "triggerexecution";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution result = triggerExecutionInputsFlowExecution(tenantId, false);
|
||||
|
||||
assertThat(result.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(result.getFlowId()).isEqualTo("inputs");
|
||||
@@ -181,7 +199,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
var notFound = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/foo/bar", createExecutionInputsFlowBody())
|
||||
.POST("/api/v1/%s/executions/foo/bar".formatted(tenantId), createExecutionInputsFlowBody())
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
||||
HttpResponse.class
|
||||
));
|
||||
@@ -211,8 +229,10 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs-small-files.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs-small-files.yaml"}, tenantId = "triggerexecutioninputsmall")
|
||||
void triggerExecutionInputSmall() {
|
||||
String tenantId = "triggerexecutioninputsmall";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
File applicationFile = new File(Objects.requireNonNull(
|
||||
ExecutionControllerTest.class.getClassLoader().getResource("application-test.yml")
|
||||
).getPath());
|
||||
@@ -221,22 +241,24 @@ class ExecutionControllerRunnerTest {
|
||||
.addPart("files", "f", MediaType.TEXT_PLAIN_TYPE, applicationFile)
|
||||
.build();
|
||||
|
||||
Execution execution = triggerExecutionExecution(TESTS_FLOW_NS, "inputs-small-files", requestBody, true);
|
||||
Execution execution = triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "inputs-small-files", requestBody, true);
|
||||
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat((String) execution.getOutputs().get("o")).startsWith("kestra://");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "invalidinputs")
|
||||
void invalidInputs() {
|
||||
String tenantId = "invalidinputs";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
MultipartBody.Builder builder = MultipartBody.builder()
|
||||
.addPart("validatedString", "B-failed");
|
||||
inputs.forEach((s, o) -> builder.addPart(s, o instanceof String str ? str : null));
|
||||
|
||||
HttpClientResponseException e = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() -> triggerExecutionExecution(TESTS_FLOW_NS, "inputs", builder.build(), false)
|
||||
() -> triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "inputs", builder.build(), false)
|
||||
);
|
||||
|
||||
String response = e.getResponse().getBody(String.class).orElseThrow();
|
||||
@@ -246,22 +268,26 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "triggerexecutionandwait")
|
||||
void triggerExecutionAndWait() {
|
||||
Execution result = triggerExecutionInputsFlowExecution(true);
|
||||
String tenantId = "triggerexecutionandwait";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution result = triggerExecutionInputsFlowExecution(tenantId, true);
|
||||
|
||||
assertThat(result.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(result.getTaskRunList().size()).isEqualTo(16);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "getexecution")
|
||||
void getExecution() {
|
||||
Execution result = triggerExecutionInputsFlowExecution(false);
|
||||
String tenantId = "getexecution";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution result = triggerExecutionInputsFlowExecution(tenantId, false);
|
||||
|
||||
// Get the triggered execution by execution id
|
||||
Execution foundExecution = client.retrieve(
|
||||
GET("/api/v1/main/executions/" + result.getId()),
|
||||
GET("/api/v1/%s/executions/".formatted(tenantId) + result.getId()),
|
||||
Execution.class
|
||||
).block();
|
||||
|
||||
@@ -272,24 +298,26 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/minimal-bis.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/minimal-bis.yaml"}, tenantId = "searchexecutionsbyflowid")
|
||||
void searchExecutionsByFlowId() throws TimeoutException {
|
||||
String tenantId = "searchexecutionsbyflowid";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
String namespace = "io.kestra.tests.minimal.bis";
|
||||
String flowId = "minimal-bis";
|
||||
|
||||
PagedResults<Execution> executionsBefore = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions?namespace=" + namespace + "&flowId=" + flowId),
|
||||
GET("/api/v1/" + tenantId + "/executions?namespace=" + namespace + "&flowId=" + flowId),
|
||||
Argument.of(PagedResults.class, Execution.class)
|
||||
);
|
||||
|
||||
assertThat(executionsBefore.getTotal()).isEqualTo(0L);
|
||||
|
||||
triggerExecutionExecution(namespace, flowId, MultipartBody.builder().addPart("string", "myString").build(), false);
|
||||
triggerExecutionExecution(tenantId, namespace, flowId, MultipartBody.builder().addPart("string", "myString").build(), false);
|
||||
|
||||
// Wait for execution indexation
|
||||
Await.until(() -> executionRepositoryInterface.findByFlowId(TENANT_ID, namespace, flowId, Pageable.from(1)).size() == 1, Duration.ofMillis(100), Duration.ofMillis(10));
|
||||
Await.until(() -> executionRepositoryInterface.findByFlowId(tenantId, namespace, flowId, Pageable.from(1)).size() == 1, Duration.ofMillis(100), Duration.ofMillis(10));
|
||||
PagedResults<Execution> executionsAfter = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions?namespace=" + namespace + "&flowId=" + flowId),
|
||||
GET("/api/v1/" + tenantId + "/executions?namespace=" + namespace + "&flowId=" + flowId),
|
||||
Argument.of(PagedResults.class, Execution.class)
|
||||
);
|
||||
|
||||
@@ -297,12 +325,14 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "triggerexecutionandfollowexecution")
|
||||
void triggerExecutionAndFollowExecution() {
|
||||
Execution result = triggerExecutionInputsFlowExecution(false);
|
||||
String tenantId = "triggerexecutionandfollowexecution";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution result = triggerExecutionInputsFlowExecution(tenantId, false);
|
||||
|
||||
List<Event<Execution>> results = sseClient
|
||||
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow", Execution.class)
|
||||
.eventStream("/api/v1/%s/executions/".formatted(tenantId) + result.getId() + "/follow", Execution.class)
|
||||
.collectList()
|
||||
.block();
|
||||
|
||||
@@ -314,7 +344,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// check that a second call work: calling follow on an already terminated execution.
|
||||
results = sseClient
|
||||
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow", Execution.class)
|
||||
.eventStream("/api/v1/%s/executions/".formatted(tenantId) + result.getId() + "/follow", Execution.class)
|
||||
.collectList()
|
||||
.block();
|
||||
|
||||
@@ -346,10 +376,12 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml",
|
||||
"flows/valids/encrypted-string.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml",
|
||||
"flows/valids/encrypted-string.yaml"}, tenantId = "evaltaskrunexpressionkeepencryptedvalues")
|
||||
void evalTaskRunExpressionKeepEncryptedValues() throws TimeoutException, QueueException {
|
||||
Execution execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "encrypted-string");
|
||||
String tenantId = "evaltaskrunexpressionkeepencryptedvalues";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution execution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "encrypted-string");
|
||||
|
||||
ExecutionController.EvalResult result = this.evalTaskRunExpression(execution, "{{outputs.hello.value}}", 0);
|
||||
Map<String, Object> resultMap = null;
|
||||
@@ -361,24 +393,26 @@ class ExecutionControllerRunnerTest {
|
||||
assertThat(resultMap.get("type")).isEqualTo("io.kestra.datatype:aes_encrypted");
|
||||
assertThat(resultMap.get("value")).isNotNull();
|
||||
|
||||
execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
execution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
|
||||
result = this.evalTaskRunExpression(execution, "{{inputs.secret}}", 0);
|
||||
assertThat(result.getResult()).isNotEqualTo(inputs.get("secret"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/restart_with_inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/restart_with_inputs.yaml"}, tenantId = "restartexecutionfromunknowntaskid")
|
||||
void restartExecutionFromUnknownTaskId() throws TimeoutException, QueueException {
|
||||
String tenantId = "restartexecutionfromunknowntaskid";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
final String flowId = "restart_with_inputs";
|
||||
final String referenceTaskId = "unknownTaskId";
|
||||
|
||||
// Run execution until it ends
|
||||
Execution parentExecution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
Execution parentExecution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
|
||||
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + referenceTaskId, ImmutableMap.of()),
|
||||
.POST("/api/v1/" + tenantId + "/executions/" + parentExecution.getId() + "/replay?taskRunId=" + referenceTaskId, ImmutableMap.of()),
|
||||
Execution.class
|
||||
));
|
||||
|
||||
@@ -388,16 +422,18 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/restart_with_inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/restart_with_inputs.yaml"}, tenantId = "restartexecutionwithnofailure")
|
||||
void restartExecutionWithNoFailure() throws TimeoutException, QueueException{
|
||||
String tenantId = "restartexecutionwithnofailure";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
final String flowId = "restart_with_inputs";
|
||||
|
||||
// Run execution until it ends
|
||||
Execution parentExecution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
Execution parentExecution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
|
||||
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/restart", ImmutableMap.of()),
|
||||
.POST("/api/v1/" + tenantId + "/executions/" + parentExecution.getId() + "/restart", ImmutableMap.of()),
|
||||
Execution.class
|
||||
));
|
||||
|
||||
@@ -407,15 +443,17 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/restart_with_inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/restart_with_inputs.yaml"}, tenantId = "restartexecutionfromtaskid")
|
||||
void restartExecutionFromTaskId() throws Exception {
|
||||
String tenantId = "restartexecutionfromtaskid";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
final String flowId = "restart_with_inputs";
|
||||
final String referenceTaskId = "instant";
|
||||
|
||||
// Run execution until it ends
|
||||
Execution parentExecution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
Execution parentExecution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, flowId, null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
|
||||
Optional<Flow> flow = flowRepositoryInterface.findById(TENANT_ID, TESTS_FLOW_NS, flowId);
|
||||
Optional<Flow> flow = flowRepositoryInterface.findById(tenantId, TESTS_FLOW_NS, flowId);
|
||||
|
||||
assertThat(flow.isPresent()).isTrue();
|
||||
|
||||
@@ -427,7 +465,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
Execution createdChidExec = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
||||
.POST("/api/v1/" + tenantId + "/executions/" + parentExecution.getId() + "/replay?taskRunId=" + parentExecution.findTaskRunByTaskIdAndValue(referenceTaskId, List.of()).getId(), ImmutableMap.of()),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
@@ -722,22 +760,24 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "downloadinternalstoragefilefromexecution")
|
||||
void downloadInternalStorageFileFromExecution() throws TimeoutException, QueueException{
|
||||
Execution execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
String tenantId = "downloadinternalstoragefilefromexecution";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution execution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
assertThat(execution.getTaskRunList()).hasSize(16);
|
||||
|
||||
String path = (String) execution.getInputs().get("file");
|
||||
|
||||
String file = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/" + execution.getId() + "/file?path=" + path),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file?path=" + path),
|
||||
String.class
|
||||
);
|
||||
|
||||
assertThat(file).isEqualTo("hello");
|
||||
|
||||
FileMetas metas = client.retrieve(
|
||||
GET("/api/v1/main/executions/" + execution.getId() + "/file/metas?path=" + path),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/metas?path=" + path),
|
||||
FileMetas.class
|
||||
).block();
|
||||
|
||||
@@ -748,7 +788,7 @@ class ExecutionControllerRunnerTest {
|
||||
String newExecutionId = IdUtils.create();
|
||||
|
||||
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/" + execution.getId() + "/file?path=" + path.replace(execution.getId(),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file?path=" + path.replace(execution.getId(),
|
||||
newExecutionId
|
||||
)),
|
||||
String.class
|
||||
@@ -760,15 +800,17 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "previewinternalstoragefilefromexecution")
|
||||
void previewInternalStorageFileFromExecution() throws TimeoutException, QueueException{
|
||||
Execution defaultExecution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
String tenantId = "previewinternalstoragefilefromexecution";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution defaultExecution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs));
|
||||
assertThat(defaultExecution.getTaskRunList()).hasSize(16);
|
||||
|
||||
String defaultPath = (String) defaultExecution.getInputs().get("file");
|
||||
|
||||
String defaultFile = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/" + defaultExecution.getId() + "/file/preview?path=" + defaultPath),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + defaultExecution.getId() + "/file/preview?path=" + defaultPath),
|
||||
String.class
|
||||
);
|
||||
|
||||
@@ -788,20 +830,20 @@ class ExecutionControllerRunnerTest {
|
||||
.put("yaml1", "{}")
|
||||
.build();
|
||||
|
||||
Execution latin1Execution = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, latin1FileInputs));
|
||||
Execution latin1Execution = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "inputs", null, (flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, latin1FileInputs));
|
||||
assertThat(latin1Execution.getTaskRunList()).hasSize(16);
|
||||
|
||||
String latin1Path = (String) latin1Execution.getInputs().get("file");
|
||||
|
||||
String latin1File = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/" + latin1Execution.getId() + "/file/preview?path=" + latin1Path + "&encoding=ISO-8859-1"),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + latin1Execution.getId() + "/file/preview?path=" + latin1Path + "&encoding=ISO-8859-1"),
|
||||
String.class
|
||||
);
|
||||
|
||||
assertThat(latin1File).contains("Düsseldorf");
|
||||
|
||||
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/" + latin1Execution.getId() + "/file/preview?path=" + latin1Path + "&encoding=foo"),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + latin1Execution.getId() + "/file/preview?path=" + latin1Path + "&encoding=foo"),
|
||||
String.class
|
||||
));
|
||||
|
||||
@@ -810,14 +852,16 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "previewlocalfilefromexecution")
|
||||
void previewLocalFileFromExecution() throws TimeoutException, QueueException, IOException {
|
||||
String tenantId = "previewlocalfilefromexecution";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
HashMap<String, Object> newInputs = new HashMap<>(InputsTest.inputs);
|
||||
URI file = createFile();
|
||||
newInputs.put("file", file);
|
||||
|
||||
Execution execution = runnerUtils.runOne(
|
||||
MAIN_TENANT,
|
||||
tenantId,
|
||||
"io.kestra.tests",
|
||||
"inputs",
|
||||
null,
|
||||
@@ -827,7 +871,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// get the metadata of the file
|
||||
FileMetas metas = client.retrieve(
|
||||
GET("/api/v1/main/executions/" + execution.getId() + "/file/metas?path=" + file),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/metas?path=" + file),
|
||||
FileMetas.class
|
||||
).block();
|
||||
assertThat(metas).isNotNull();
|
||||
@@ -835,7 +879,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// preview the file
|
||||
Map<String, Object> preview = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/" + execution.getId() + "/file/preview?path=" + file),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/preview?path=" + file),
|
||||
Map.class
|
||||
);
|
||||
assertThat(preview).isNotNull();
|
||||
@@ -844,21 +888,23 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// download the file
|
||||
String content = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/" + execution.getId() + "/file?path=" + file),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file?path=" + file),
|
||||
String.class
|
||||
);
|
||||
assertThat(content).isEqualTo("Hello World");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"})
|
||||
void previewNsFileFromExecution() throws TimeoutException, QueueException, IOException, URISyntaxException {
|
||||
String tenantId = MAIN_TENANT;
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
HashMap<String, Object> newInputs = new HashMap<>(InputsTest.inputs);
|
||||
URI file = createNsFile(false);
|
||||
newInputs.put("file", file);
|
||||
|
||||
Execution execution = runnerUtils.runOne(
|
||||
MAIN_TENANT,
|
||||
tenantId,
|
||||
"io.kestra.tests",
|
||||
"inputs",
|
||||
null,
|
||||
@@ -868,7 +914,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// get the metadata of the file
|
||||
FileMetas metas = client.retrieve(
|
||||
GET("/api/v1/main/executions/" + execution.getId() + "/file/metas?path=" + file),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/metas?path=" + file),
|
||||
FileMetas.class
|
||||
).block();
|
||||
assertThat(metas).isNotNull();
|
||||
@@ -876,7 +922,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// preview the file
|
||||
Map<String, Object> preview = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/" + execution.getId() + "/file/preview?path=" + file),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file/preview?path=" + file),
|
||||
Map.class
|
||||
);
|
||||
assertThat(preview).isNotNull();
|
||||
@@ -885,7 +931,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// download the file
|
||||
String content = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/" + execution.getId() + "/file?path=" + file),
|
||||
GET("/api/v1/" + tenantId + "/executions/" + execution.getId() + "/file?path=" + file),
|
||||
String.class
|
||||
);
|
||||
assertThat(content).isEqualTo("Hello World");
|
||||
@@ -1109,17 +1155,19 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/pause-test.yaml"})
|
||||
void resumeExecutionByQuery() throws TimeoutException, InterruptedException, QueueException {
|
||||
Execution pausedExecution1 = runnerUtils.runOneUntilPaused(TENANT_ID, TESTS_FLOW_NS, "pause-test");
|
||||
Execution pausedExecution2 = runnerUtils.runOneUntilPaused(TENANT_ID, TESTS_FLOW_NS, "pause-test");
|
||||
@LoadFlows(value = {"flows/valids/pause-test.yaml"}, tenantId = "resumeexecutionbyquery")
|
||||
void resumeExecutionByQuery() throws TimeoutException, QueueException {
|
||||
String tenantId = "resumeexecutionbyquery";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution pausedExecution1 = runnerUtils.runOneUntilPaused(tenantId, TESTS_FLOW_NS, "pause-test");
|
||||
Execution pausedExecution2 = runnerUtils.runOneUntilPaused(tenantId, TESTS_FLOW_NS, "pause-test");
|
||||
|
||||
assertThat(pausedExecution1.getState().isPaused()).isTrue();
|
||||
assertThat(pausedExecution2.getState().isPaused()).isTrue();
|
||||
|
||||
// resume executions
|
||||
BulkResponse resumeResponse = client.toBlocking().retrieve(
|
||||
HttpRequest.POST("/api/v1/main/executions/resume/by-query?namespace=" + TESTS_FLOW_NS, null),
|
||||
HttpRequest.POST("/api/v1/" + tenantId + "/executions/resume/by-query?namespace=" + TESTS_FLOW_NS, null),
|
||||
BulkResponse.class
|
||||
);
|
||||
assertThat(resumeResponse.getCount()).isEqualTo(2);
|
||||
@@ -1132,7 +1180,7 @@ class ExecutionControllerRunnerTest {
|
||||
HttpClientResponseException e = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() -> client.toBlocking().retrieve(HttpRequest.POST(
|
||||
"/api/v1/main/executions/resume/by-query?namespace=" + TESTS_FLOW_NS, null
|
||||
"/api/v1/" + tenantId + "/executions/resume/by-query?namespace=" + TESTS_FLOW_NS, null
|
||||
))
|
||||
);
|
||||
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.BAD_REQUEST.getCode());
|
||||
@@ -1371,32 +1419,34 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"}, tenantId = "searchexecutions")
|
||||
void searchExecutions() {
|
||||
String tenantId = "searchexecutions";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
PagedResults<?> executions = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/search"), PagedResults.class
|
||||
GET("/api/v1/" + tenantId + "/executions/search"), PagedResults.class
|
||||
);
|
||||
|
||||
assertThat(executions.getTotal()).isEqualTo(0L);
|
||||
|
||||
triggerExecutionInputsFlowExecution(false);
|
||||
triggerExecutionInputsFlowExecution(tenantId, false);
|
||||
|
||||
// + is there to simulate that a space was added (this can be the case from UI autocompletion for eg.)
|
||||
executions = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/search?page=1&size=25&filters[labels][EQUALS][url]="+ENCODED_URL_LABEL_VALUE), PagedResults.class
|
||||
GET("/api/v1/" + tenantId + "/executions/search?page=1&size=25&filters[labels][EQUALS][url]="+ENCODED_URL_LABEL_VALUE), PagedResults.class
|
||||
);
|
||||
|
||||
assertThat(executions.getTotal()).isEqualTo(1L);
|
||||
|
||||
executions = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/search?page=1&size=25&labels=url:"+ENCODED_URL_LABEL_VALUE), PagedResults.class
|
||||
GET("/api/v1/" + tenantId + "/executions/search?page=1&size=25&labels=url:"+ENCODED_URL_LABEL_VALUE), PagedResults.class
|
||||
);
|
||||
|
||||
assertThat(executions.getTotal()).isEqualTo(1L);
|
||||
|
||||
HttpClientResponseException e = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?filters[startDate][EQUALS]=2024-01-07T18:43:11.248%2B01:00&filters[timeRange][EQUALS]=PT12H"))
|
||||
() -> client.toBlocking().retrieve(GET("/api/v1/" + tenantId + "/executions/search?filters[startDate][EQUALS]=2024-01-07T18:43:11.248%2B01:00&filters[timeRange][EQUALS]=PT12H"))
|
||||
);
|
||||
|
||||
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
@@ -1404,39 +1454,33 @@ class ExecutionControllerRunnerTest {
|
||||
assertThat(e.getResponse().getBody(String.class).get()).contains("are mutually exclusive");
|
||||
|
||||
executions = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/search?filters[timeRange][EQUALS]=PT12H"), PagedResults.class
|
||||
GET("/api/v1/" + tenantId + "/executions/search?filters[timeRange][EQUALS]=PT12H"), PagedResults.class
|
||||
);
|
||||
|
||||
assertThat(executions.getTotal()).isEqualTo(1L);
|
||||
|
||||
executions = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/search?timeRange=PT12H"), PagedResults.class
|
||||
GET("/api/v1/" + tenantId + "/executions/search?timeRange=PT12H"), PagedResults.class
|
||||
);
|
||||
|
||||
assertThat(executions.getTotal()).isEqualTo(1L);
|
||||
|
||||
e = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?filters[timeRange][EQUALS]=P1Y"))
|
||||
() -> client.toBlocking().retrieve(GET("/api/v1/" + tenantId + "/executions/search?filters[timeRange][EQUALS]=P1Y"))
|
||||
);
|
||||
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
|
||||
e = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?timeRange=P1Y"))
|
||||
);
|
||||
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
|
||||
e = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?page=1&size=-1"))
|
||||
() -> client.toBlocking().retrieve(GET("/api/v1/" + tenantId + "/executions/search?page=1&size=-1"))
|
||||
);
|
||||
|
||||
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
|
||||
e = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() -> client.toBlocking().retrieve(GET("/api/v1/main/executions/search?page=0"))
|
||||
() -> client.toBlocking().retrieve(GET("/api/v1/" + tenantId + "/executions/search?page=0"))
|
||||
);
|
||||
|
||||
assertThat(e.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
@@ -1537,14 +1581,16 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/minimal.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/minimal.yaml"}, tenantId = "setlabelsonterminatedexecutionsbyquery")
|
||||
void setLabelsOnTerminatedExecutionsByQuery() throws TimeoutException, QueueException {
|
||||
Execution result1 = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
|
||||
Execution result2 = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
|
||||
Execution result3 = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
|
||||
String tenantId = "setlabelsonterminatedexecutionsbyquery";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution result1 = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
|
||||
Execution result2 = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
|
||||
Execution result3 = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
|
||||
|
||||
BulkResponse response = client.toBlocking().retrieve(
|
||||
HttpRequest.POST("/api/v1/main/executions/labels/by-query?namespace=" + result1.getNamespace(),
|
||||
HttpRequest.POST("/api/v1/" + tenantId + "/executions/labels/by-query?namespace=" + result1.getNamespace(),
|
||||
List.of(new Label("key", "value"))
|
||||
),
|
||||
BulkResponse.class
|
||||
@@ -1555,7 +1601,7 @@ class ExecutionControllerRunnerTest {
|
||||
var exception = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() -> client.toBlocking().exchange(HttpRequest.POST(
|
||||
"/api/v1/main/executions/labels/by-query?namespace=" + result1.getNamespace(),
|
||||
"/api/v1/" + tenantId + "/executions/labels/by-query?namespace=" + result1.getNamespace(),
|
||||
List.of(new Label(null, null)))
|
||||
)
|
||||
);
|
||||
@@ -1753,14 +1799,16 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue.yml",
|
||||
"flows/valids/minimal.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml",
|
||||
"flows/valids/minimal.yaml"}, tenantId = "shouldunqueueexecutionaqueuedflow")
|
||||
void shouldUnqueueExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
||||
String tenantId = "shouldunqueueexecutionaqueuedflow";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
// run a first flow so the second is queued
|
||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
runnerUtils.runOneUntilRunning(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
|
||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/unqueue", null));
|
||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result.getId() + "/unqueue", null));
|
||||
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
||||
|
||||
// waiting for the flow to complete successfully
|
||||
@@ -1771,60 +1819,66 @@ class ExecutionControllerRunnerTest {
|
||||
);
|
||||
|
||||
|
||||
var notFound = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/notfound/unqueue", null)));
|
||||
var notFound = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/notfound/unqueue", null)));
|
||||
assertThat(notFound.getStatus().getCode()).isEqualTo(HttpStatus.NOT_FOUND.getCode());
|
||||
|
||||
// pausing an already completed flow will result in errors
|
||||
Execution completed = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
|
||||
Execution completed = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
|
||||
|
||||
var notRunning = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + completed.getId() + "/unqueue", null)));
|
||||
var notRunning = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + completed.getId() + "/unqueue", null)));
|
||||
assertThat(notRunning.getStatus().getCode()).isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue.yml",
|
||||
"flows/valids/minimal.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml",
|
||||
"flows/valids/minimal.yaml"}, tenantId = "shouldunqueueaqueuedflowtocancelledstate")
|
||||
void shouldUnqueueAQueuedFlowToCancelledState() throws QueueException, TimeoutException {
|
||||
String tenantId = "shouldunqueueaqueuedflowtocancelledstate";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
// run a first flow so the second is queued
|
||||
runnerUtils.runOneUntilRunning(TENANT_ID, "io.kestra.tests", "flow-concurrency-queue");
|
||||
Execution result1 = runUntilQueued("io.kestra.tests", "flow-concurrency-queue");
|
||||
runnerUtils.runOneUntilRunning(tenantId, "io.kestra.tests", "flow-concurrency-queue");
|
||||
Execution result1 = runUntilQueued(tenantId, "io.kestra.tests", "flow-concurrency-queue");
|
||||
|
||||
var cancelResponse = client.toBlocking().exchange(
|
||||
HttpRequest.POST("/api/v1/executions/" + result1.getId() + "/unqueue?state=CANCELLED", null)
|
||||
HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result1.getId() + "/unqueue?state=CANCELLED", null)
|
||||
);
|
||||
assertThat(cancelResponse.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
||||
|
||||
Optional<Execution> cancelledExecution = executionRepositoryInterface.findById(TENANT_ID, result1.getId());
|
||||
Optional<Execution> cancelledExecution = executionRepositoryInterface.findById(tenantId, result1.getId());
|
||||
assertThat(cancelledExecution.isPresent()).isTrue();
|
||||
assertThat(cancelledExecution.get().getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "shouldunqueueexecutionbyidsqueuedflows")
|
||||
void shouldUnqueueExecutionByIdsQueuedFlows() throws TimeoutException, QueueException {
|
||||
String tenantId = "shouldunqueueexecutionbyidsqueuedflows";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
// run a first flow so the others are queued
|
||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result1 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result2 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result3 = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
runnerUtils.runOneUntilRunning(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result1 = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result2 = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result3 = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
|
||||
BulkResponse response = client.toBlocking().retrieve(
|
||||
HttpRequest.POST("/api/v1/main/executions/unqueue/by-ids", List.of(result1.getId(), result2.getId(), result3.getId())),
|
||||
HttpRequest.POST("/api/v1/" + tenantId + "/executions/unqueue/by-ids", List.of(result1.getId(), result2.getId(), result3.getId())),
|
||||
BulkResponse.class
|
||||
);
|
||||
assertThat(response.getCount()).isEqualTo(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "shouldforcerunexecutionaqueuedflow")
|
||||
void shouldForceRunExecutionAQueuedFlow() throws QueueException, TimeoutException {
|
||||
String tenantId = "shouldforcerunexecutionaqueuedflow";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
// run a first flow so the second is queued
|
||||
runnerUtils.runOneUntilRunning(TENANT_ID, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result = runUntilQueued(TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
runnerUtils.runOneUntilRunning(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
Execution result = runUntilQueued(tenantId, TESTS_FLOW_NS, "flow-concurrency-queue");
|
||||
|
||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/force-run", null));
|
||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result.getId() + "/force-run", null));
|
||||
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
||||
Optional<Execution> forcedRun = executionRepositoryInterface.findById(TENANT_ID, result.getId());
|
||||
Optional<Execution> forcedRun = executionRepositoryInterface.findById(tenantId, result.getId());
|
||||
assertThat(forcedRun.isPresent()).isTrue();
|
||||
assertThat(forcedRun.get().getState().getCurrent()).isNotEqualTo(State.Type.QUEUED);
|
||||
|
||||
@@ -1850,14 +1904,16 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/minimal.yaml"})
|
||||
void shouldForceRunExecutionACreatedFlow() throws QueueException, TimeoutException {
|
||||
Execution result = this.createExecution(TESTS_FLOW_NS, "minimal");
|
||||
@LoadFlows(value = {"flows/valids/minimal.yaml"}, tenantId = "shouldforcerunexecutionacreatedflow")
|
||||
void shouldForceRunExecutionACreatedFlow() throws QueueException {
|
||||
String tenantId = "shouldforcerunexecutionacreatedflow";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution result = this.createExecution(tenantId, TESTS_FLOW_NS, "minimal");
|
||||
this.executionQueue.emit(result);
|
||||
|
||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/force-run", null));
|
||||
var response = client.toBlocking().exchange(HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result.getId() + "/force-run", null));
|
||||
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
||||
Optional<Execution> forcedRun = executionRepositoryInterface.findById(TENANT_ID, result.getId());
|
||||
Optional<Execution> forcedRun = executionRepositoryInterface.findById(tenantId, result.getId());
|
||||
assertThat(forcedRun.isPresent()).isTrue();
|
||||
assertThat(forcedRun.get().getState().getCurrent()).isNotEqualTo(State.Type.CREATED);
|
||||
}
|
||||
@@ -1978,7 +2034,7 @@ class ExecutionControllerRunnerTest {
|
||||
return client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST(
|
||||
"/api/v1/main/executions/" + execution.getId() + "/eval/" + execution.getTaskRunList().get(index).getId(),
|
||||
"/api/v1/" + execution.getTenantId() + "/executions/" + execution.getId() + "/eval/" + execution.getTaskRunList().get(index).getId(),
|
||||
expression
|
||||
)
|
||||
.contentType(MediaType.TEXT_PLAIN_TYPE),
|
||||
@@ -1987,23 +2043,23 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
|
||||
private Execution triggerExecutionExecution(String namespace, String flowId, MultipartBody requestBody, Boolean wait) {
|
||||
return triggerExecutionExecution(namespace, flowId, requestBody, wait, null);
|
||||
private Execution triggerExecutionExecution(String tenantId, String namespace, String flowId, MultipartBody requestBody, Boolean wait) {
|
||||
return triggerExecutionExecution(tenantId, namespace, flowId, requestBody, wait, null);
|
||||
}
|
||||
|
||||
private Execution triggerExecutionExecution(String namespace, String flowId, MultipartBody requestBody, Boolean wait, String breakpoint) {
|
||||
private Execution triggerExecutionExecution(String tenantId, String namespace, String flowId, MultipartBody requestBody, Boolean wait, String breakpoint) {
|
||||
return client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST("/api/v1/main/executions/" + namespace + "/" + flowId + "?labels=a:label-1&labels=b:label-2&labels=url:" + ENCODED_URL_LABEL_VALUE + (wait ? "&wait=true" : "") + (breakpoint != null ? "&breakpoints=" + breakpoint : ""), requestBody)
|
||||
.POST("/api/v1/" + tenantId + "/executions/" + namespace + "/" + flowId + "?labels=a:label-1&labels=b:label-2&labels=url:" + ENCODED_URL_LABEL_VALUE + (wait ? "&wait=true" : "") + (breakpoint != null ? "&breakpoints=" + breakpoint : ""), requestBody)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
|
||||
Execution.class
|
||||
);
|
||||
}
|
||||
|
||||
private Execution triggerExecutionInputsFlowExecution(Boolean wait) {
|
||||
private Execution triggerExecutionInputsFlowExecution(String tenantId, Boolean wait) {
|
||||
MultipartBody requestBody = createExecutionInputsFlowBody();
|
||||
|
||||
return triggerExecutionExecution(TESTS_FLOW_NS, "inputs", requestBody, wait);
|
||||
return triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "inputs", requestBody, wait);
|
||||
}
|
||||
|
||||
private MultipartBody createExecutionInputsFlowBody() {
|
||||
@@ -2031,17 +2087,17 @@ class ExecutionControllerRunnerTest {
|
||||
.build();
|
||||
}
|
||||
|
||||
private Execution runUntilQueued(String namespace, String flowId) throws TimeoutException, QueueException {
|
||||
return runUntilState(namespace, flowId, State.Type.QUEUED);
|
||||
private Execution runUntilQueued(String tenantId, String namespace, String flowId) throws TimeoutException, QueueException {
|
||||
return runUntilState(tenantId, namespace, flowId, State.Type.QUEUED);
|
||||
}
|
||||
|
||||
private Execution createExecution(String namespace, String flowId) {
|
||||
Flow flow = flowRepositoryInterface.findById(TENANT_ID, namespace, flowId).orElseThrow();
|
||||
private Execution createExecution(String tenantId, String namespace, String flowId) {
|
||||
Flow flow = flowRepositoryInterface.findById(tenantId, namespace, flowId).orElseThrow();
|
||||
return Execution.newExecution(flow, null);
|
||||
}
|
||||
|
||||
private Execution runUntilState(String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
|
||||
Execution execution = this.createExecution(namespace, flowId);
|
||||
private Execution runUntilState(String tenantId, String namespace, String flowId, State.Type state) throws TimeoutException, QueueException {
|
||||
Execution execution = this.createExecution(tenantId, namespace, flowId);
|
||||
return runnerUtils.awaitExecution(
|
||||
it -> execution.getId().equals(it.getId()) && it.getState().getCurrent() == state,
|
||||
throwRunnable(() -> this.executionQueue.emit(execution)),
|
||||
@@ -2086,14 +2142,16 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/minimal.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/minimal.yaml"}, tenantId = "shouldnotallowaddingsystemlabels")
|
||||
void shouldNotAllowAddingSystemLabels() throws QueueException, TimeoutException {
|
||||
Execution result = runnerUtils.runOne(TENANT_ID, TESTS_FLOW_NS, "minimal");
|
||||
String tenantId = "shouldnotallowaddingsystemlabels";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution result = runnerUtils.runOne(tenantId, TESTS_FLOW_NS, "minimal");
|
||||
assertThat(result.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
List<Label> systemLabels = List.of(new Label("system.key", "system-value"));
|
||||
HttpClientResponseException e = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
|
||||
HttpRequest.POST("/api/v1/main/executions/" + result.getId() + "/labels", systemLabels),
|
||||
HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + result.getId() + "/labels", systemLabels),
|
||||
Execution.class
|
||||
));
|
||||
|
||||
@@ -2102,9 +2160,11 @@ class ExecutionControllerRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/minimal.yaml"})
|
||||
void shouldSuspendAtBreakpointThenResume() throws QueueException, TimeoutException, InterruptedException {
|
||||
Execution execution = triggerExecutionExecution(TESTS_FLOW_NS, "minimal", null, false, "date");
|
||||
@LoadFlows(value = {"flows/valids/minimal.yaml"}, tenantId = "shouldsuspendatbreakpointthenresume")
|
||||
void shouldSuspendAtBreakpointThenResume() throws TimeoutException {
|
||||
String tenantId = "shouldsuspendatbreakpointthenresume";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution execution = triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "minimal", null, false, "date");
|
||||
assertThat(execution).isNotNull();
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
@@ -2115,7 +2175,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// resume the suspended execution
|
||||
HttpResponse<Void> resume = client.toBlocking().exchange(
|
||||
HttpRequest.POST("/api/v1/main/executions/" + suspended.getId() + "/resume-from-breakpoint", null),
|
||||
HttpRequest.POST("/api/v1/" + tenantId + "/executions/" + suspended.getId() + "/resume-from-breakpoint", null),
|
||||
Void.class
|
||||
);
|
||||
assertThat(resume.getStatus().getCode()).isEqualTo(HttpStatus.OK.getCode());
|
||||
@@ -2132,15 +2192,20 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
@FlakyTest
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/subflow-parent.yaml", "flows/valids/subflow-child.yaml", "flows/valids/subflow-grand-child.yaml"})
|
||||
@LoadFlows(value = {"flows/valids/subflow-parent.yaml",
|
||||
"flows/valids/subflow-child.yaml",
|
||||
"flows/valids/subflow-grand-child.yaml"},
|
||||
tenantId = "triggerexecutionandfollowdependencies")
|
||||
void triggerExecutionAndFollowDependencies() throws InterruptedException {
|
||||
Execution result = triggerExecutionExecution(TESTS_FLOW_NS, "subflow-parent", null, true);
|
||||
String tenantId = "triggerexecutionandfollowdependencies";
|
||||
when(tenantService.resolveTenant()).thenReturn(tenantId);
|
||||
Execution result = triggerExecutionExecution(tenantId, TESTS_FLOW_NS, "subflow-parent", null, true);
|
||||
|
||||
// without this slight delay, the event stream may miss some 'end' events
|
||||
Thread.sleep(500);
|
||||
|
||||
List<Event<ExecutionStatusEvent>> results = sseClient
|
||||
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow-dependencies?expandAll=true", ExecutionStatusEvent.class)
|
||||
.eventStream("/api/v1/" + tenantId + "/executions/" + result.getId() + "/follow-dependencies?expandAll=true", ExecutionStatusEvent.class)
|
||||
.collectList()
|
||||
.block();
|
||||
|
||||
@@ -2154,7 +2219,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// check that a second call work: calling follow on an already terminated execution.
|
||||
results = sseClient
|
||||
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow-dependencies?expandAll=true", ExecutionStatusEvent.class)
|
||||
.eventStream("/api/v1/" + tenantId + "/executions/" + result.getId() + "/follow-dependencies?expandAll=true", ExecutionStatusEvent.class)
|
||||
.collectList()
|
||||
.block();
|
||||
|
||||
@@ -2168,7 +2233,7 @@ class ExecutionControllerRunnerTest {
|
||||
|
||||
// check that a without expandAll it would return only the immediate dependencies.
|
||||
results = sseClient
|
||||
.eventStream("/api/v1/main/executions/" + result.getId() + "/follow-dependencies", ExecutionStatusEvent.class)
|
||||
.eventStream("/api/v1/" + tenantId + "/executions/" + result.getId() + "/follow-dependencies", ExecutionStatusEvent.class)
|
||||
.collectList()
|
||||
.block();
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.webserver.controllers.api;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
@@ -10,7 +11,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.TaskForExecution;
|
||||
import io.kestra.core.models.triggers.AbstractTriggerForExecution;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.webserver.responses.BulkResponse;
|
||||
@@ -22,10 +22,8 @@ import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.multipart.MultipartBody;
|
||||
import io.micronaut.reactor.http.client.ReactorHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
|
||||
|
||||
@@ -68,15 +66,6 @@ class ExecutionControllerTest {
|
||||
public static final String TESTS_FLOW_NS = "io.kestra.tests";
|
||||
public static final String TESTS_WEBHOOK_KEY = "a-secret-key";
|
||||
|
||||
@SneakyThrows
|
||||
@BeforeEach
|
||||
protected void setup() {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
|
||||
TestsUtils.loads(MAIN_TENANT, repositoryLoader);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getExecutionNotFound() {
|
||||
HttpClientResponseException e = assertThrows(
|
||||
@@ -177,6 +166,7 @@ class ExecutionControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/webhook-dynamic-key.yaml"})
|
||||
void webhookDynamicKey() {
|
||||
Execution execution = client.toBlocking().retrieve(
|
||||
GET(
|
||||
@@ -190,6 +180,7 @@ class ExecutionControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/webhook-secret-key.yaml"})
|
||||
@EnabledIfEnvironmentVariable(named = "SECRET_WEBHOOK_KEY", matches = ".*")
|
||||
void webhookDynamicKeyFromASecret() {
|
||||
Execution execution = client.toBlocking().retrieve(
|
||||
@@ -204,6 +195,7 @@ class ExecutionControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/webhook-with-condition.yaml"})
|
||||
void webhookWithCondition() {
|
||||
record Hello(String hello) {}
|
||||
|
||||
@@ -232,6 +224,7 @@ class ExecutionControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/webhook-inputs.yaml"})
|
||||
void webhookWithInputs() {
|
||||
record Hello(String hello) {}
|
||||
|
||||
@@ -289,6 +282,7 @@ class ExecutionControllerTest {
|
||||
|
||||
@SuppressWarnings("DataFlowIssue")
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/full.yaml"})
|
||||
void getExecutionFlowForExecution() {
|
||||
FlowForExecution result = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/flows/io.kestra.tests/full"),
|
||||
@@ -301,6 +295,7 @@ class ExecutionControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/full.yaml"})
|
||||
void getExecutionFlowForExecutionWithOldUrl() {
|
||||
FlowForExecution result = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/flows/io.kestra.tests/full"),
|
||||
@@ -314,6 +309,7 @@ class ExecutionControllerTest {
|
||||
|
||||
@SuppressWarnings("DataFlowIssue")
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/webhook.yaml"})
|
||||
void getExecutionFlowForExecutionById() {
|
||||
Execution execution = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
@@ -336,24 +332,26 @@ class ExecutionControllerTest {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/minimal.yaml"})
|
||||
void getExecutionDistinctNamespaceExecutables() {
|
||||
List<String> result = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/namespaces"),
|
||||
Argument.of(List.class, String.class)
|
||||
);
|
||||
|
||||
assertThat(result.size()).isGreaterThanOrEqualTo(5);
|
||||
assertThat(result.size()).isGreaterThanOrEqualTo(1);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/webhook.yaml", "flows/valids/minimal.yaml"})
|
||||
void getExecutionFlowFromNamespace() {
|
||||
List<FlowForExecution> result = client.toBlocking().retrieve(
|
||||
GET("/api/v1/main/executions/namespaces/io.kestra.tests/flows"),
|
||||
Argument.of(List.class, FlowForExecution.class)
|
||||
);
|
||||
|
||||
assertThat(result.size()).isGreaterThan(100);
|
||||
assertThat(result.size()).isGreaterThanOrEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -378,6 +376,7 @@ class ExecutionControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/inputs.yaml"})
|
||||
void commaInSingleLabelsValue() {
|
||||
String encodedCommaWithinLabel = URLEncoder.encode("project:foo,bar", StandardCharsets.UTF_8);
|
||||
|
||||
@@ -448,6 +447,7 @@ class ExecutionControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/minimal.yaml"})
|
||||
void scheduleDate() {
|
||||
// given
|
||||
ZonedDateTime now = ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS).plusSeconds(1);
|
||||
@@ -489,6 +489,7 @@ class ExecutionControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/minimal.yaml"})
|
||||
void shouldHaveAnUrlWhenCreated() {
|
||||
// ExecutionController.ExecutionResponse cannot be deserialized because it didn't have any default constructor.
|
||||
// adding it would mean updating the Execution itself, which is too annoying, so for the test we just deserialize to a Map.
|
||||
@@ -504,6 +505,7 @@ class ExecutionControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/minimal.yaml"})
|
||||
void shouldRefuseSystemLabelsWhenCreatingAnExecution() {
|
||||
var error = assertThrows(HttpClientResponseException.class, () -> client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
|
||||
Reference in New Issue
Block a user