fix(test): use a separate tenant for each test

This commit is contained in:
Loïc Mathieu
2025-12-12 11:03:26 +01:00
parent c0f6298484
commit eae5eb80cb
4 changed files with 9 additions and 10 deletions

View File

@@ -50,9 +50,9 @@ public abstract class AbstractRunnerConcurrencyTest {
} }
@Test @Test
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-fail.yml"}, tenantId = "concurrency-queue-fail") @LoadFlows(value = {"flows/valids/flow-concurrency-queue-fail.yml"}, tenantId = "concurrency-queue-restarted")
protected void concurrencyQueueRestarted() throws Exception { protected void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted("concurrency-queue-fail"); flowConcurrencyCaseTest.flowConcurrencyQueueRestarted("concurrency-queue-restarted");
} }
@Test @Test

View File

@@ -215,7 +215,7 @@ public class FlowConcurrencyCaseTest {
List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel"); List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel");
assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED); assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED);
// run another execution to be sure that everything work (purge is correctly done) // run another execution to be sure that everything works (purge is correctly done)
Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow"); Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS); assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel"); runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");

View File

@@ -8,4 +8,4 @@ concurrency:
tasks: tasks:
- id: sleep - id: sleep
type: io.kestra.plugin.core.flow.Sleep type: io.kestra.plugin.core.flow.Sleep
duration: PT10S duration: PT2S

View File

@@ -16,7 +16,6 @@ import org.junit.jupiter.api.Test;
import java.time.Duration; import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public abstract class JdbcConcurrencyRunnerTest extends AbstractRunnerConcurrencyTest { public abstract class JdbcConcurrencyRunnerTest extends AbstractRunnerConcurrencyTest {
@@ -35,19 +34,19 @@ public abstract class JdbcConcurrencyRunnerTest extends AbstractRunnerConcurrenc
private TestRunnerUtils runnerUtils; private TestRunnerUtils runnerUtils;
@Test @Test
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"}) @LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-queued-protection")
void flowConcurrencyQueuedProtection() throws QueueException, InterruptedException { void flowConcurrencyQueuedProtection() throws QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30)); Execution execution1 = runnerUtils.runOneUntilRunning("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
assertThat(execution1.getState().isRunning()).isTrue(); assertThat(execution1.getState().isRunning()).isTrue();
Flow flow = flowRepository Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", Optional.empty()) .findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", Optional.empty())
.orElseThrow(); .orElseThrow();
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(State.Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty())); Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(State.Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.QUEUED); assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
// manually update the concurrency count so that queued protection kicks in and no new execution would be popped // manually update the concurrency count so that queued protection kicks in and no new execution would be popped
ConcurrencyLimit concurrencyLimit = concurrencyLimitStorage.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue").orElseThrow(); ConcurrencyLimit concurrencyLimit = concurrencyLimitStorage.findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue").orElseThrow();
concurrencyLimit = concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1); concurrencyLimit = concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1);
concurrencyLimitStorage.update(concurrencyLimit); concurrencyLimitStorage.update(concurrencyLimit);
@@ -56,7 +55,7 @@ public abstract class JdbcConcurrencyRunnerTest extends AbstractRunnerConcurrenc
// we wait for a few ms and checked that the second execution is still queued // we wait for a few ms and checked that the second execution is still queued
Thread.sleep(500); Thread.sleep(500);
Execution executionResult2 = executionRepository.findById(MAIN_TENANT, execution2.getId()).orElseThrow(); Execution executionResult2 = executionRepository.findById("flow-concurrency-queued-protection", execution2.getId()).orElseThrow();
assertThat(executionResult2.getState().getCurrent()).isEqualTo(State.Type.QUEUED); assertThat(executionResult2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
// we manually reset the concurrency count to avoid messing with any other tests // we manually reset the concurrency count to avoid messing with any other tests