fix(tests): use a different tenant for each concurrency test

This commit is contained in:
Loïc Mathieu
2025-12-11 10:28:17 +01:00
parent 8eae8aba72
commit 687cefdfb9
2 changed files with 64 additions and 64 deletions

View File

@@ -10,82 +10,83 @@ import org.junit.jupiter.api.TestInstance;
@KestraTest(startRunner = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractRunnerConcurrencyTest {
public static final String TENANT_1 = "tenant1";
@Inject
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel.yml"}, tenantId = "concurrency-cancel")
void concurrencyCancel() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancel();
flowConcurrencyCaseTest.flowConcurrencyCancel("concurrency-cancel");
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-fail.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-fail.yml"}, tenantId = "concurrency-fail")
void concurrencyFail() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyFail();
flowConcurrencyCaseTest.flowConcurrencyFail("concurrency-fail");
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "concurrency-queue")
void concurrencyQueue() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueue();
flowConcurrencyCaseTest.flowConcurrencyQueue("concurrency-queue");
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-pause.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-pause.yml"}, tenantId = "concurrency-queue-pause")
protected void concurrencyQueuePause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueuePause();
flowConcurrencyCaseTest.flowConcurrencyQueuePause("concurrency-queue-pause");
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel-pause.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-cancel-pause.yml"}, tenantId = "concurrency-cancel-pause")
protected void concurrencyCancelPause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
flowConcurrencyCaseTest.flowConcurrencyCancelPause("concurrency-cancel-pause");
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-with-for-each-item")
protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem("flow-concurrency-with-for-each-item");
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-fail.yml"}, tenantId = "concurrency-queue-fail")
protected void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted("concurrency-queue-fail");
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-after-execution.yml"}, tenantId = "concurrency-queue-after-execution")
void concurrencyQueueAfterExecution() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution("concurrency-queue-after-execution");
}
@Test
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = TENANT_1)
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = "flow-concurrency-subflow")
void flowConcurrencySubflow() throws Exception {
flowConcurrencyCaseTest.flowConcurrencySubflow(TENANT_1);
flowConcurrencyCaseTest.flowConcurrencySubflow("flow-concurrency-subflow");
}
@Test
@FlakyTest(description = "Only flaky in CI")
@LoadFlows({"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"})
@LoadFlows(
value = {"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"},
tenantId = "flow-concurrency-parallel-subflow-kill"
)
protected void flowConcurrencyParallelSubflowKill() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill();
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill("flow-concurrency-parallel-subflow-kill");
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-killed")
void flowConcurrencyKilled() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyKilled();
flowConcurrencyCaseTest.flowConcurrencyKilled("flow-concurrency-killed");
}
@Test
@FlakyTest(description = "Only flaky in CI")
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-queue-killed")
void flowConcurrencyQueueKilled() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueKilled();
flowConcurrencyCaseTest.flowConcurrencyQueueKilled("flow-concurrency-queue-killed");
}
}

View File

@@ -31,7 +31,6 @@ import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@Singleton
@@ -57,12 +56,12 @@ public class FlowConcurrencyCaseTest {
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
public void flowConcurrencyCancel() throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
public void flowConcurrencyCancel(String tenantId) throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
try {
List<Execution> shouldFailExecutions = List.of(
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel"),
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel")
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel"),
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel")
);
assertThat(execution1.getState().isRunning()).isTrue();
@@ -73,12 +72,12 @@ public class FlowConcurrencyCaseTest {
}
}
public void flowConcurrencyFail() throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
public void flowConcurrencyFail(String tenantId) throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
try {
List<Execution> shouldFailExecutions = List.of(
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail"),
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail")
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail"),
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail")
);
assertThat(execution1.getState().isRunning()).isTrue();
@@ -89,10 +88,10 @@ public class FlowConcurrencyCaseTest {
}
}
public void flowConcurrencyQueue() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
public void flowConcurrencyQueue(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", Optional.empty())
.findById(tenantId, NAMESPACE, "flow-concurrency-queue", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
@@ -108,10 +107,10 @@ public class FlowConcurrencyCaseTest {
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyQueuePause() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause");
public void flowConcurrencyQueuePause(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-queue-pause");
Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
@@ -127,10 +126,10 @@ public class FlowConcurrencyCaseTest {
assertThat(secondExecutionResult.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyCancelPause() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause");
public void flowConcurrencyCancelPause(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-cancel-pause");
Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
.findById(tenantId, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.CANCELLED), execution2);
@@ -166,11 +165,11 @@ public class FlowConcurrencyCaseTest {
.toList()).contains(Type.QUEUED);
}
public void flowConcurrencyQueueRestarted() throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE,
public void flowConcurrencyQueueRestarted(String tenantId) throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE,
"flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
@@ -193,10 +192,10 @@ public class FlowConcurrencyCaseTest {
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyQueueAfterExecution() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
public void flowConcurrencyQueueAfterExecution(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
@@ -222,9 +221,9 @@ public class FlowConcurrencyCaseTest {
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");
}
public void flowConcurrencyParallelSubflowKill() throws QueueException {
Execution parent = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
public void flowConcurrencyParallelSubflowKill(String tenantId) throws QueueException {
Execution parent = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
// Kill the parent
killQueue.emit(ExecutionKilledExecution
@@ -232,7 +231,7 @@ public class FlowConcurrencyCaseTest {
.state(ExecutionKilled.State.REQUESTED)
.executionId(parent.getId())
.isOnKillCascade(true)
.tenantId(MAIN_TENANT)
.tenantId(tenantId)
.build()
);
@@ -242,11 +241,11 @@ public class FlowConcurrencyCaseTest {
assertThat(terminated.getTaskRunList()).isNull();
}
public void flowConcurrencyKilled() throws QueueException, InterruptedException {
public void flowConcurrencyKilled(String tenantId) throws QueueException, InterruptedException {
Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.orElseThrow();
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
@@ -261,7 +260,7 @@ public class FlowConcurrencyCaseTest {
.state(ExecutionKilled.State.REQUESTED)
.executionId(execution1.getId())
.isOnKillCascade(true)
.tenantId(MAIN_TENANT)
.tenantId(tenantId)
.build()
);
@@ -284,15 +283,15 @@ public class FlowConcurrencyCaseTest {
runnerUtils.killExecution(execution3);
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
}
}
public void flowConcurrencyQueueKilled() throws QueueException, InterruptedException {
public void flowConcurrencyQueueKilled(String tenantId) throws QueueException, InterruptedException {
Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.orElseThrow();
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
@@ -307,7 +306,7 @@ public class FlowConcurrencyCaseTest {
.state(ExecutionKilled.State.REQUESTED)
.executionId(execution2.getId())
.isOnKillCascade(true)
.tenantId(MAIN_TENANT)
.tenantId(tenantId)
.build()
);
@@ -326,7 +325,7 @@ public class FlowConcurrencyCaseTest {
runnerUtils.killExecution(execution3);
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed");
runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
}
}