mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
fix(system): compilation issue
This commit is contained in:
@@ -3,6 +3,7 @@ package io.kestra.core.runners;
|
||||
import io.kestra.core.junit.annotations.FlakyTest;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
@@ -89,4 +90,10 @@ public abstract class AbstractRunnerConcurrencyTest {
|
||||
void flowConcurrencyQueueKilled() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueKilled("flow-concurrency-queue-killed");
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-queued-protection")
|
||||
void flowConcurrencyQueuedProtection() throws QueueException, InterruptedException {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueuedProtection("flow-concurrency-queued-protection");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,9 +10,11 @@ import io.kestra.core.models.flows.State.Type;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ConcurrencyLimitRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.executor.ConcurrencyLimitStateStore;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
@@ -52,6 +54,9 @@ public class FlowConcurrencyCaseTest {
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
@Inject
|
||||
private ConcurrencyLimitRepositoryInterface concurrencyLimitRepository;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.KILL_NAMED)
|
||||
protected QueueInterface<ExecutionKilled> killQueue;
|
||||
@@ -329,6 +334,33 @@ public class FlowConcurrencyCaseTest {
|
||||
}
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueuedProtection(String tenantId) throws QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
|
||||
Flow flow = flowRepository
|
||||
.findById(tenantId, NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().isQueued(), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
|
||||
|
||||
// manually update the concurrency count so that queued protection kicks in and no new execution would be popped
|
||||
ConcurrencyLimit concurrencyLimit = concurrencyLimitRepository.findById(tenantId, NAMESPACE, "flow-concurrency-queue").orElseThrow();
|
||||
concurrencyLimit = concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1);
|
||||
concurrencyLimitRepository.update(concurrencyLimit);
|
||||
|
||||
Execution executionResult1 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(State.Type.SUCCESS), execution1);
|
||||
assertThat(executionResult1.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
// we wait for a few ms and checked that the second execution is still queued
|
||||
Thread.sleep(500);
|
||||
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().isQueued(), execution2);
|
||||
assertThat(executionResult2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
|
||||
|
||||
// we manually reset the concurrency count to avoid messing with any other tests
|
||||
concurrencyLimitRepository.update(concurrencyLimit.withRunning(concurrencyLimit.getRunning() - 1));
|
||||
}
|
||||
|
||||
private URI storageUpload(String tenantId) throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
|
||||
public class H2RunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||
public class H2RunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
|
||||
public class MysqlRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||
public class MysqlRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
|
||||
public class PostgresRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
|
||||
public class PostgresRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest {
|
||||
}
|
||||
|
||||
@@ -1,64 +0,0 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.AbstractRunnerConcurrencyTest;
|
||||
import io.kestra.core.runners.ConcurrencyLimit;
|
||||
import io.kestra.core.runners.TestRunnerUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public abstract class JdbcConcurrencyRunnerTest extends AbstractRunnerConcurrencyTest {
|
||||
public static final String NAMESPACE = "io.kestra.tests";
|
||||
|
||||
@Inject
|
||||
private AbstractJdbcConcurrencyLimitStorage concurrencyLimitStorage;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
private TestRunnerUtils runnerUtils;
|
||||
|
||||
@Test
|
||||
@LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-queued-protection")
|
||||
void flowConcurrencyQueuedProtection() throws QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
|
||||
Flow flow = flowRepository
|
||||
.findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(State.Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
|
||||
|
||||
// manually update the concurrency count so that queued protection kicks in and no new execution would be popped
|
||||
ConcurrencyLimit concurrencyLimit = concurrencyLimitStorage.findById("flow-concurrency-queued-protection", NAMESPACE, "flow-concurrency-queue").orElseThrow();
|
||||
concurrencyLimit = concurrencyLimit.withRunning(concurrencyLimit.getRunning() + 1);
|
||||
concurrencyLimitStorage.update(concurrencyLimit);
|
||||
|
||||
Execution executionResult1 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(State.Type.SUCCESS), execution1);
|
||||
assertThat(executionResult1.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
// we wait for a few ms and checked that the second execution is still queued
|
||||
Thread.sleep(500);
|
||||
Execution executionResult2 = executionRepository.findById("flow-concurrency-queued-protection", execution2.getId()).orElseThrow();
|
||||
assertThat(executionResult2.getState().getCurrent()).isEqualTo(State.Type.QUEUED);
|
||||
|
||||
// we manually reset the concurrency count to avoid messing with any other tests
|
||||
concurrencyLimitStorage.update(concurrencyLimit.withRunning(concurrencyLimit.getRunning() - 1));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user