chore(core): fix bean lifecycle to properly handle context close (#3318)

Fix: #3318
This commit is contained in:
Florian Hussonnois
2024-03-19 11:42:52 +01:00
committed by Florian Hussonnois
parent dd35a56505
commit 5e48602477
87 changed files with 596 additions and 424 deletions

View File

@@ -104,15 +104,14 @@ health:
# Kill Kestra running process
kill:
docker compose -f ./docker-compose-ci.yml down;
PID=$$(ps aux | grep java | grep 'kestra' | grep -v 'grep' | awk '{print $$2}'); \
if [ ! -z "$$PID" ]; then \
echo "Killing Kestra process (pid=$$PID)."; \
kill -9 $$PID; \
kill $$PID; \
else \
echo "No Kestra process to kill."; \
fi
docker compose -f ./docker-compose-ci.yml down;
# Default configuration for using Kestra with Postgres as backend.
define KESTRA_POSTGRES_CONFIGURATION =

View File

@@ -163,7 +163,6 @@ abstract public class AbstractCommand implements Callable<Integer> {
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
log.warn("Receiving shutdown ! Try to graceful exit");
try {
run.run();
} catch (Exception e) {

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.services.SkipExecutionService;
@@ -41,14 +42,13 @@ public class ExecutorCommand extends AbstractServerCommand {
this.skipExecutionService.setSkipExecutions(skipExecutions);
super.call();
this.shutdownHook(() -> KestraContext.getContext().shutdown());
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
executorService.run();
log.info("Executor started");
this.shutdownHook(executorService::close);
Await.until(() -> !this.applicationContext.isRunning());
return 0;

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.IndexerInterface;
import io.kestra.core.utils.Await;
@@ -30,14 +31,13 @@ public class IndexerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
super.call();
this.shutdownHook(() -> KestraContext.getContext().shutdown());
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
indexer.run();
log.info("Indexer started");
this.shutdownHook(indexer::close);
Await.until(() -> !this.applicationContext.isRunning());
return 0;

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.utils.Await;
@@ -30,14 +31,12 @@ public class SchedulerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
super.call();
this.shutdownHook(() -> KestraContext.getContext().shutdown());
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
scheduler.run();
log.info("Scheduler started");
this.shutdownHook(scheduler::close);
Await.until(() -> !this.applicationContext.isRunning());
return 0;

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.StandAloneRunner;
@@ -51,8 +52,8 @@ public class StandAloneCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipExecutions(skipExecutions);
super.call();
this.shutdownHook(() -> KestraContext.getContext().shutdown());
if (flowPath != null) {
try {
@@ -73,8 +74,6 @@ public class StandAloneCommand extends AbstractServerCommand {
standAloneRunner.run();
this.shutdownHook(standAloneRunner::close);
Await.until(() -> !this.applicationContext.isRunning());
return 0;

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
@@ -29,15 +30,9 @@ public class WebServerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
super.call();
log.info("Webserver started");
this.shutdownHook(() -> {
this.applicationContext.close();
});
this.shutdownHook(() -> KestraContext.getContext().shutdown());
Await.until(() -> !this.applicationContext.isRunning());
return 0;
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Worker;
import io.kestra.core.utils.Await;
@@ -10,6 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.util.Map;
import java.util.UUID;
@CommandLine.Command(
name = "worker",
@@ -17,6 +19,7 @@ import java.util.Map;
)
@Slf4j
public class WorkerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@@ -36,12 +39,14 @@ public class WorkerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
super.call();
this.shutdownHook(() -> KestraContext.getContext().shutdown());
if (this.workerGroupKey != null && !this.workerGroupKey.matches("[a-zA-Z0-9_-]+")) {
throw new IllegalArgumentException("The --worker-group option must match the [a-zA-Z0-9_-]+ pattern");
}
Worker worker = new Worker(applicationContext, this.thread, this.workerGroupKey);
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
String workerID = UUID.randomUUID().toString();
Worker worker = applicationContext.createBean(Worker.class, workerID, this.thread, this.workerGroupKey);
applicationContext.registerSingleton(worker);
worker.run();
@@ -53,8 +58,6 @@ public class WorkerCommand extends AbstractServerCommand {
log.info("Worker started with {} thread(s)", this.thread);
}
this.shutdownHook(worker::close);
Await.until(() -> !this.applicationContext.isRunning());
return 0;

View File

@@ -1,8 +1,10 @@
package io.kestra.core.contexts;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.ApplicationContextConfiguration;
import io.micronaut.context.DefaultApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.io.service.SoftServiceLoader;
import io.micronaut.inject.BeanDefinitionReference;
@@ -17,15 +19,27 @@ import java.util.List;
public class KestraApplicationContext extends DefaultApplicationContext {
private final PluginRegistry pluginRegistry;
private ApplicationContext delegate;
public PluginRegistry getPluginRegistry() {
return pluginRegistry;
}
public KestraApplicationContext(@NonNull ApplicationContextConfiguration configuration, PluginRegistry pluginRegistry) {
public KestraApplicationContext(@NonNull ApplicationContext delegate,
@NonNull ApplicationContextConfiguration configuration,
PluginRegistry pluginRegistry) {
super(configuration);
this.delegate = delegate;
this.pluginRegistry = pluginRegistry;
}
/** {@inheritDoc} **/
@Override
public Environment getEnvironment() {
return delegate.getEnvironment();
}
/** {@inheritDoc} **/
@Override
protected @NonNull List<BeanDefinitionReference> resolveBeanDefinitionReferences() {
List<BeanDefinitionReference> resolvedBeanReferences = super.resolveBeanDefinitionReferences();
@@ -38,7 +52,6 @@ public class KestraApplicationContext extends DefaultApplicationContext {
definitions.collectAll(resolvedBeanReferences, BeanDefinitionReference::isPresent);
});
}
return resolvedBeanReferences;
}
}

View File

@@ -22,11 +22,6 @@ public class KestraApplicationContextBuilder extends DefaultApplicationContextBu
@Override
public ApplicationContext build() {
ApplicationContext defaultApplicationContext = super.build();
DefaultApplicationContext applicationContext = new KestraApplicationContext(this, this.pluginRegistry);
applicationContext.setEnvironment(defaultApplicationContext.getEnvironment());
return applicationContext;
return new KestraApplicationContext(super.build(), this, this.pluginRegistry);
}
}

View File

@@ -7,6 +7,8 @@ import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
@@ -16,6 +18,8 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public abstract class KestraContext {
private static final Logger log = LoggerFactory.getLogger(KestraContext.class);
private static final AtomicReference<KestraContext> INSTANCE = new AtomicReference<>();
// Properties
@@ -56,9 +60,9 @@ public abstract class KestraContext {
public abstract String getVersion();
/**
* Stops Kestra.
* Shutdowns the Kestra application.
*/
public void exit(int status) {
public void shutdown() {
// noop
}
@@ -97,9 +101,10 @@ public abstract class KestraContext {
/** {@inheritDoc} **/
@Override
public void exit(int status) {
public void shutdown() {
log.info("Kestra server - Shutdown initiated");
applicationContext.close();
Runtime.getRuntime().exit(status);
log.info("Kestra server - Shutdown completed");
}
/** {@inheritDoc} **/

View File

@@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@@ -34,12 +35,13 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable {
this.running = true;
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
if (workerEnabled) {
Worker worker = new Worker(applicationContext, workerThread, null);
applicationContext.registerSingleton(worker);
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
String workerID = UUID.randomUUID().toString();
Worker worker = applicationContext.createBean(Worker.class, workerID, workerThread, null);
applicationContext.registerSingleton(worker); //
poolExecutor.execute(worker);
servers.add(worker);
}
@@ -63,7 +65,6 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable {
@Override
public void close() throws Exception {
this.servers.forEach(throwConsumer(AutoCloseable::close));
this.poolExecutor.shutdown();
}
}

View File

@@ -35,10 +35,14 @@ import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.Hashing;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import lombok.Getter;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
@@ -80,20 +84,39 @@ public class Worker implements Service, Runnable, AutoCloseable {
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();
private static final String SERVICE_PROPS_WORKER_GROUP = "worker.group";
private final ApplicationContext applicationContext;
private final WorkerJobQueueInterface workerJobQueue;
private final QueueInterface<WorkerTaskResult> workerTaskResultQueue;
private final QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
private final QueueInterface<ExecutionKilled> executionKilledQueue;
private final QueueInterface<MetricEntry> metricEntryQueue;
private final MetricRegistry metricRegistry;
private final ServerConfig serverConfig;
@Inject
private ApplicationContext applicationContext;
@Inject
private WorkerJobQueueInterface workerJobQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)
private QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
private QueueInterface<ExecutionKilled> executionKilledQueue;
@Inject
@Named(QueueFactoryInterface.METRIC_QUEUE)
private QueueInterface<MetricEntry> metricEntryQueue;
@Inject
private MetricRegistry metricRegistry;
@Inject
private ServerConfig serverConfig;
@Inject
private LogService logService;
private final Set<String> killedExecution = ConcurrentHashMap.newKeySet();
// package private to allow its usage within tests
final ExecutorService executors;
@Getter
private final Map<Long, AtomicInteger> metricRunningCount = new ConcurrentHashMap<>();
@@ -110,52 +133,34 @@ public class Worker implements Service, Runnable, AutoCloseable {
@Getter
private final String workerGroup;
private final LogService logService;
private final String id;
// package private to allow its usage within tests
final ExecutorService executorService;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicReference<ServiceState> state = new AtomicReference<>();
@SuppressWarnings("unchecked")
/**
* Creates a new {@link Worker} instance.
*
* @param workerId The worker service ID.
* @param numThreads The worker num threads.
* @param workerGroupKey The worker group (EE).
*/
@Inject
public Worker(ApplicationContext applicationContext, int thread, String workerGroupKey) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
this(applicationContext, thread, workerGroupKey, UUID.randomUUID().toString());
}
@VisibleForTesting
public Worker(ApplicationContext applicationContext, int thread, String workerGroupKey, String id) {
this.id = id;
this.applicationContext = applicationContext;
this.workerJobQueue = applicationContext.getBean(WorkerJobQueueInterface.class);
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.workerTaskResultQueue = (QueueInterface<WorkerTaskResult>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
);
this.workerTriggerResultQueue = (QueueInterface<WorkerTriggerResult>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)
);
this.executionKilledQueue = (QueueInterface<ExecutionKilled>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.KILL_NAMED)
);
this.metricEntryQueue = (QueueInterface<MetricEntry>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.METRIC_QUEUE)
);
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
ExecutorsUtils executorsUtils = applicationContext.getBean(ExecutorsUtils.class);
this.executors = executorsUtils.maxCachedThreadPool(thread, "worker");
WorkerGroupService workerGroupService = applicationContext.getBean(WorkerGroupService.class);
public Worker(@Parameter String workerId,
@Parameter Integer numThreads,
@Nullable @Parameter String workerGroupKey,
ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher,
WorkerGroupService workerGroupService,
ExecutorsUtils executorsUtils
) {
this.id = workerId;
this.workerGroup = workerGroupService.resolveGroupFromKey(workerGroupKey);
this.logService = applicationContext.getBean(LogService.class);
this.serverConfig = applicationContext.getBean(ServerConfig.class);
this.eventPublisher = eventPublisher;
this.executorService = executorsUtils.maxCachedThreadPool(numThreads, "worker");
setState(ServiceState.CREATED);
}
@@ -184,7 +189,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
this.workerGroup,
Worker.class,
either -> {
executors.execute(() -> {
executorService.execute(() -> {
if (either.isRight()) {
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
handleDeserializationError(either.getRight());
@@ -628,13 +633,17 @@ public class Worker implements Service, Runnable, AutoCloseable {
));
}
/** {@inheritDoc} **/
@PreDestroy
@Override
public void close() throws Exception {
closeWorker(serverConfig.terminationGracePeriod());
if (shutdown.compareAndSet(false, true)) {
closeWorker(serverConfig.terminationGracePeriod());
}
}
@VisibleForTesting
public void closeWorker(Duration timeout) throws Exception {
public void closeWorker(final Duration timeout) {
log.info("Terminating.");
setState(ServiceState.TERMINATING);
workerJobQueue.pause();
@@ -644,7 +653,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
terminatedGracefully = waitForTasksCompletion(timeout);
} else {
log.info("Terminating now and skip waiting for tasks completions.");
this.executors.shutdownNow();
this.executorService.shutdownNow();
closeWorkerTaskResultQueue();
terminatedGracefully = false;
}
@@ -658,8 +667,8 @@ public class Worker implements Service, Runnable, AutoCloseable {
new Thread(
() -> {
try {
this.executors.shutdown();
this.executors.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
this.executorService.shutdown();
this.executorService.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Fail to shutdown the worker", e);
}
@@ -670,8 +679,8 @@ public class Worker implements Service, Runnable, AutoCloseable {
final AtomicBoolean cleanShutdown = new AtomicBoolean(false);
Await.until(
() -> {
if (this.executors.isTerminated() || this.workerThreadReferences.isEmpty()) {
log.info("No more worker threads busy, shutting down!");
if (this.executorService.isTerminated() || this.workerThreadReferences.isEmpty()) {
log.info("All working threads are terminated.");
// we ensure that last produce message are send
closeWorkerTaskResultQueue();
@@ -680,10 +689,9 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
log.warn(
"Worker still has {} thread(s) running, waiting all threads to terminate before shutdown!",
"Waiting for all worker threads to terminate (remaining: {}).",
this.workerThreadReferences.size()
);
return false;
},
Duration.ofSeconds(1)
@@ -700,8 +708,8 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
@VisibleForTesting
public void shutdown() throws IOException {
this.executors.shutdownNow();
public void shutdown() {
this.executorService.shutdownNow();
}
public List<WorkerTask> getWorkerThreadTasks() {

View File

@@ -45,6 +45,7 @@ import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -80,6 +81,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private volatile Map<String, FlowWithPollingTriggerNextDate> schedulableNextDate = new ConcurrentHashMap<>();
private final String id = IdUtils.create();
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicReference<ServiceState> state = new AtomicReference<>();
private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
@@ -685,12 +689,17 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.workerTaskQueue.emit(workerGroupService.resolveGroupFromJob(workerTrigger), workerTrigger);
}
/** {@inheritDoc} **/
@Override
@PreDestroy
public void close() {
setState(ServiceState.TERMINATING);
this.scheduleExecutor.shutdown();
setState(ServiceState.TERMINATED_GRACEFULLY);
if (shutdown.compareAndSet(false, true)) {
log.info("Terminating.");
setState(ServiceState.TERMINATING);
this.scheduleExecutor.shutdown();
setState(ServiceState.TERMINATED_GRACEFULLY);
log.info("Scheduler closed ({}).", state.get().name().toLowerCase());
}
}
@SuperBuilder(toBuilder = true)

View File

@@ -338,7 +338,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
if (state.equals(Service.ServiceState.NOT_RUNNING) || state.equals(Service.ServiceState.EMPTY)) {
service.skipGracefulTermination(true);
}
KestraContext.getContext().exit(1);
KestraContext.getContext().shutdown();
return Optional.empty();
}

View File

@@ -1,26 +1,38 @@
package io.kestra.core.runners;
import io.micronaut.context.ApplicationContext;
import java.util.concurrent.TimeUnit;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.annotation.Prototype;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
/**
* This worker is a special worker which won't close every queue allowing it to be ran and closed within a test without
* preventing the Micronaut context to be used for further tests with queues still being up
*/
@Prototype
public class TestMethodScopedWorker extends Worker {
public TestMethodScopedWorker(ApplicationContext applicationContext, int thread, String workerGroupKey) {
super(applicationContext, thread, workerGroupKey);
applicationContext.registerSingleton(this);
@Inject
public TestMethodScopedWorker(@Parameter String workerId,
@Parameter Integer numThreads,
@Nullable @Parameter String workerGroupKey,
ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher,
WorkerGroupService workerGroupService,
ExecutorsUtils executorsUtils
) {
super(workerId, numThreads, workerGroupKey, eventPublisher, workerGroupService, executorsUtils);
}
/**
* Override is done to prevent closing the queue. However please note that this is not failsafe because we ideally need
* Override is done to prevent closing the queue. However, please note that this is not failsafe because we ideally need
* to stop worker's subscriptions to every queue before cutting of the executors pool.
*/
@Override
public void close() throws InterruptedException {
this.executors.shutdownNow();
public void close() {
shutdown();
}
}

View File

@@ -63,7 +63,7 @@ class WorkerTest {
@Test
void success() throws TimeoutException {
Worker worker = new Worker(applicationContext, 8, null);
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
worker.run();
AtomicReference<WorkerTaskResult> workerTaskResult = new AtomicReference<>(null);
@@ -82,13 +82,13 @@ class WorkerTest {
@Test
void workerGroup() {
Worker worker = new Worker(applicationContext, 8, "toto");
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, "toto");
assertThat(worker.getWorkerGroup(), nullValue());
}
@Test
void failOnWorkerTaskWithFlowable() throws TimeoutException, JsonProcessingException {
Worker worker = new Worker(applicationContext, 8, null);
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
worker.run();
AtomicReference<WorkerTaskResult> workerTaskResult = new AtomicReference<>(null);
@@ -142,7 +142,7 @@ class WorkerTest {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
workerTaskLogQueue.receive(either -> logs.add(either.getLeft()));
Worker worker = new Worker(applicationContext, 8, null);
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
worker.run();
List<WorkerTaskResult> workerTaskResult = new ArrayList<>();

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -79,7 +80,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
applicationContext,
flowListenersServiceSpy,
triggerState);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)) {
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
Execution execution = either.getLeft();

View File

@@ -5,14 +5,15 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import io.kestra.core.tasks.executions.Fail;
import io.kestra.core.tasks.test.PollingTrigger;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -53,7 +54,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
try (
AbstractScheduler scheduler = scheduler(flowListenersServiceSpy);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
) {
AtomicReference<Execution> last = new AtomicReference<>();
@@ -93,7 +94,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
try (
AbstractScheduler scheduler = scheduler(flowListenersServiceSpy);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
) {
AtomicReference<Execution> last = new AtomicReference<>();
@@ -121,7 +122,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
// Assert that the trigger is now disabled.
// It needs to await on assertion as it will be disabled AFTER we receive a success execution.
Trigger trigger = Trigger.of(flow, pollingTrigger);
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getDisabled()).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(10));
Await.until(() -> this.triggerState.findLast(trigger).map(TriggerContext::getDisabled).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(10));
}
}
@@ -147,7 +148,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
try (
AbstractScheduler scheduler = scheduler(flowListenersServiceSpy);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
) {
AtomicReference<Execution> last = new AtomicReference<>();

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -84,7 +85,7 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
flowListenersServiceSpy,
triggerState
);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
) {
// start the worker as it execute polling triggers
worker.run();

View File

@@ -5,6 +5,7 @@ import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
@@ -14,8 +15,10 @@ import java.util.Map;
@H2RepositoryEnabled
public class H2ExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public H2ExecutionRepository(ApplicationContext applicationContext, AbstractJdbcExecutorStateStorage executorStateStorage) {
super(new H2Repository<>(Execution.class, applicationContext), applicationContext, executorStateStorage);
public H2ExecutionRepository(@Named("executions") H2Repository<Execution> repository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage) {
super(repository, applicationContext, executorStateStorage);
}
@Override

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
@@ -13,8 +14,9 @@ import java.util.Map;
@H2RepositoryEnabled
public class H2FlowRepository extends AbstractJdbcFlowRepository {
@Inject
public H2FlowRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(Flow.class, applicationContext), applicationContext);
public H2FlowRepository(@Named("flows") H2Repository<Flow> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}
@Override

View File

@@ -2,15 +2,15 @@ package io.kestra.repository.h2;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2FlowTopologyRepository extends AbstractJdbcFlowTopologyRepository {
@Inject
public H2FlowTopologyRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(FlowTopology.class, applicationContext));
public H2FlowTopologyRepository(@Named("flowtopologies") H2Repository<FlowTopology> repository) {
super(repository);
}
}

View File

@@ -2,8 +2,8 @@ package io.kestra.repository.h2;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
@@ -13,8 +13,8 @@ import java.util.List;
@H2RepositoryEnabled
public class H2LogRepository extends AbstractJdbcLogRepository {
@Inject
public H2LogRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(LogEntry.class, applicationContext));
public H2LogRepository(@Named("logs") H2Repository<LogEntry> repository) {
super(repository);
}
@Override

View File

@@ -2,16 +2,16 @@ package io.kestra.repository.h2;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2MetricRepository extends AbstractJdbcMetricRepository {
@Inject
public H2MetricRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(MetricEntry.class, applicationContext));
public H2MetricRepository(@Named("metrics") H2Repository<MetricEntry> repository) {
super(repository);
}
}

View File

@@ -1,12 +1,23 @@
package io.kestra.repository.h2;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.jdbc.JdbcTableConfig;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import lombok.SneakyThrows;
import org.jooq.*;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.LikeEscapeStep;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.Result;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import java.util.Arrays;
@@ -16,11 +27,18 @@ import java.util.Map;
import java.util.stream.Collectors;
import jakarta.annotation.Nullable;
@H2RepositoryEnabled
@EachBean(JdbcTableConfig.class)
public class H2Repository<T> extends io.kestra.jdbc.AbstractJdbcRepository<T> {
public H2Repository(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
@Inject
public H2Repository(@Parameter JdbcTableConfig jdbcTableConfig,
QueueService queueService,
JooqDSLContextWrapper dslContextWrapper) {
super(jdbcTableConfig, queueService, dslContextWrapper);
}
@Override
@SneakyThrows
public void persist(T entity, DSLContext context, @Nullable Map<Field<Object>, Object> fields) {
Map<Field<Object>, Object> finalFields = fields == null ? this.persistFields(entity) : fields;

View File

@@ -2,15 +2,15 @@ package io.kestra.repository.h2;
import io.kestra.core.server.ServiceInstance;
import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2ServiceInstanceRepository extends AbstractJdbcServiceInstanceRepository {
@Inject
public H2ServiceInstanceRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(ServiceInstance.class, applicationContext));
public H2ServiceInstanceRepository(@Named("serviceinstance") H2Repository<ServiceInstance> repository) {
super(repository);
}
}

View File

@@ -4,13 +4,15 @@ import io.kestra.core.models.Setting;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2SettingRepository extends AbstractJdbcSettingRepository {
@Inject
public H2SettingRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(Setting.class, applicationContext), applicationContext);
public H2SettingRepository(@Named("settings") H2Repository<Setting> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}
}

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
@@ -15,8 +16,9 @@ import java.util.List;
@TemplateEnabled
public class H2TemplateRepository extends AbstractJdbcTemplateRepository {
@Inject
public H2TemplateRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(Template.class, applicationContext), applicationContext);
public H2TemplateRepository(@Named("templates") H2Repository<Template> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}
@Override

View File

@@ -2,15 +2,15 @@ package io.kestra.repository.h2;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2TriggerRepository extends AbstractJdbcTriggerRepository {
@Inject
public H2TriggerRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(Trigger.class, applicationContext));
public H2TriggerRepository(@Named("triggers") H2Repository<Trigger> repository) {
super(repository);
}
}

View File

@@ -2,15 +2,15 @@ package io.kestra.repository.h2;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2WorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public H2WorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(WorkerJobRunning.class, applicationContext));
public H2WorkerJobRunningRepository(@Named("workerjobrunning") H2Repository<WorkerJobRunning> repository) {
super(repository);
}
}

View File

@@ -4,12 +4,13 @@ import io.kestra.core.runners.ExecutionDelay;
import io.kestra.jdbc.runner.AbstractJdbcExecutionDelayStorage;
import io.kestra.repository.h2.H2Repository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutionDelayStorage extends AbstractJdbcExecutionDelayStorage {
public H2ExecutionDelayStorage(ApplicationContext applicationContext) {
super(new H2Repository<>(ExecutionDelay.class, applicationContext));
public H2ExecutionDelayStorage(@Named("executordelayed") H2Repository<ExecutionDelay> repository) {
super(repository);
}
}

View File

@@ -3,13 +3,13 @@ package io.kestra.runner.h2;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.repository.h2.H2Repository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutionQueuedStorage extends AbstractJdbcExecutionQueuedStorage {
public H2ExecutionQueuedStorage(ApplicationContext applicationContext) {
super(new H2Repository<>(ExecutionQueued.class, applicationContext));
public H2ExecutionQueuedStorage(@Named("executionqueued") H2Repository<ExecutionQueued> repository) {
super(repository);
}
}

View File

@@ -1,15 +1,15 @@
package io.kestra.runner.h2;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.core.runners.ExecutorState;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.repository.h2.H2Repository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
public H2ExecutorStateStorage(ApplicationContext applicationContext) {
super(new H2Repository<>(ExecutorState.class, applicationContext));
public H2ExecutorStateStorage(@Named("executorstate") H2Repository<ExecutorState> repository) {
super(repository);
}
}

View File

@@ -4,12 +4,13 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.runner.AbstractJdbcMultipleConditionStorage;
import io.kestra.repository.h2.H2Repository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2MultipleConditionStorage extends AbstractJdbcMultipleConditionStorage {
public H2MultipleConditionStorage(ApplicationContext applicationContext) {
super(new H2Repository<>(MultipleConditionWindow.class, applicationContext));
public H2MultipleConditionStorage(@Named("multipleconditions") H2Repository<MultipleConditionWindow> repository) {
super(repository);
}
}

View File

@@ -4,12 +4,13 @@ import io.kestra.core.runners.SubflowExecution;
import io.kestra.jdbc.runner.AbstractJdbcSubflowExecutionStorage;
import io.kestra.repository.h2.H2Repository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2SubflowExecutionStorage extends AbstractJdbcSubflowExecutionStorage {
public H2SubflowExecutionStorage(ApplicationContext applicationContext) {
super(new H2Repository<>(SubflowExecution.class, applicationContext));
public H2SubflowExecutionStorage(@Named("subflow-executions") H2Repository<SubflowExecution<?>> repository) {
super(repository);
}
}

View File

@@ -5,8 +5,10 @@ import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionSto
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.repository.h2.H2Repository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.BeforeEach;
import java.util.List;
@@ -18,8 +20,12 @@ class H2MultipleConditionStorageTest extends AbstractMultipleConditionStorageTes
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
@Named("multipleconditions")
H2Repository<MultipleConditionWindow> repository;
protected MultipleConditionStorageInterface multipleConditionStorage() {
return new H2MultipleConditionStorage(applicationContext);
return new H2MultipleConditionStorage(repository);
}
protected void save(MultipleConditionStorageInterface multipleConditionStorage, Flow flow, List<MultipleConditionWindow> multipleConditionWindows) {

View File

@@ -5,6 +5,7 @@ import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.Field;
@@ -16,8 +17,10 @@ import java.util.Map;
@MysqlRepositoryEnabled
public class MysqlExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public MysqlExecutionRepository(ApplicationContext applicationContext, AbstractJdbcExecutorStateStorage executorStateStorage) {
super(new MysqlRepository<>(Execution.class, applicationContext), applicationContext, executorStateStorage);
public MysqlExecutionRepository(@Named("executions") MysqlRepository<Execution> repository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage) {
super(repository, applicationContext, executorStateStorage);
}
@Override

View File

@@ -4,18 +4,19 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.*;
import org.jooq.impl.DSL;
import org.jooq.Condition;
import java.util.*;
import java.util.Map;
@Singleton
@MysqlRepositoryEnabled
public class MysqlFlowRepository extends AbstractJdbcFlowRepository {
@Inject
public MysqlFlowRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Flow.class, applicationContext), applicationContext);
public MysqlFlowRepository(@Named("flows") MysqlRepository<Flow> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}
@Override

View File

@@ -2,15 +2,15 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlRepositoryEnabled
public class MysqlFlowTopologyRepository extends AbstractJdbcFlowTopologyRepository {
@Inject
public MysqlFlowTopologyRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(FlowTopology.class, applicationContext));
public MysqlFlowTopologyRepository(@Named("flowtopologies") MysqlRepository<FlowTopology> repository) {
super(repository);
}
}

View File

@@ -2,8 +2,8 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
@@ -13,8 +13,8 @@ import java.util.Arrays;
@MysqlRepositoryEnabled
public class MysqlLogRepository extends AbstractJdbcLogRepository {
@Inject
public MysqlLogRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(LogEntry.class, applicationContext));
public MysqlLogRepository(@Named("logs") MysqlRepository<LogEntry> repository) {
super(repository);
}
@Override

View File

@@ -2,8 +2,8 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Field;
@@ -13,8 +13,8 @@ import java.sql.Timestamp;
@MysqlRepositoryEnabled
public class MysqlMetricRepository extends AbstractJdbcMetricRepository {
@Inject
public MysqlMetricRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(MetricEntry.class, applicationContext));
public MysqlMetricRepository(@Named("metrics") MysqlRepository<MetricEntry> repository) {
super(repository);
}
@Override

View File

@@ -1,11 +1,20 @@
package io.kestra.repository.mysql;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.kestra.jdbc.JdbcTableConfig;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.data.model.Pageable;
import org.jooq.*;
import jakarta.inject.Inject;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import java.sql.Timestamp;
@@ -13,13 +22,20 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class MysqlRepository<T> extends AbstractJdbcRepository<T> {
public MysqlRepository(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
@MysqlRepositoryEnabled
@EachBean(JdbcTableConfig.class)
public class MysqlRepository<T> extends AbstractJdbcRepository<T> {
@Inject
public MysqlRepository(@Parameter JdbcTableConfig jdbcTableConfig,
QueueService queueService,
JooqDSLContextWrapper dslContextWrapper) {
super(jdbcTableConfig, queueService, dslContextWrapper);
this.table = DSL.table(DSL.quotedName(this.getTable().getName()));
}
/** {@inheritDoc} **/
@Override
public Condition fullTextCondition(List<String> fields, String query) {
if (query == null || query.equals("*")) {
return DSL.trueCondition();
@@ -31,7 +47,7 @@ public class MysqlRepository<T> extends AbstractJdbcRepository<T> {
.map(s -> "+" + s + "*")
.collect(Collectors.joining(" "));
if (match.length() == 0) {
if (match.isEmpty()) {
return DSL.falseCondition();
}

View File

@@ -2,16 +2,16 @@ package io.kestra.repository.mysql;
import io.kestra.core.server.ServiceInstance;
import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlRepositoryEnabled
public class MysqlServiceInstanceRepository extends AbstractJdbcServiceInstanceRepository {
@Inject
public MysqlServiceInstanceRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(ServiceInstance.class, applicationContext));
public MysqlServiceInstanceRepository(@Named("serviceinstance") MysqlRepository<ServiceInstance> repository) {
super(repository);
}
}

View File

@@ -1,17 +1,18 @@
package io.kestra.repository.mysql;
import io.kestra.core.models.Setting;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlRepositoryEnabled
public class MysqlSettingRepository extends AbstractJdbcSettingRepository {
@Inject
public MysqlSettingRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Setting.class, applicationContext), applicationContext);
}}
public MysqlSettingRepository(@Named("settings") MysqlRepository<Setting> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}
}

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
@@ -15,8 +16,9 @@ import java.util.Arrays;
@TemplateEnabled
public class MysqlTemplateRepository extends AbstractJdbcTemplateRepository {
@Inject
public MysqlTemplateRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Template.class, applicationContext), applicationContext);
public MysqlTemplateRepository(@Named("templates") MysqlRepository<Template> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}
@Override

View File

@@ -2,8 +2,8 @@ package io.kestra.repository.mysql;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.impl.DSL;
@@ -14,8 +14,8 @@ import java.util.List;
@MysqlRepositoryEnabled
public class MysqlTriggerRepository extends AbstractJdbcTriggerRepository {
@Inject
public MysqlTriggerRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Trigger.class, applicationContext));
public MysqlTriggerRepository(@Named("triggers") MysqlRepository<Trigger> repository) {
super(repository);
}
@Override

View File

@@ -2,15 +2,15 @@ package io.kestra.repository.mysql;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlRepositoryEnabled
public class MysqlWorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public MysqlWorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(WorkerJobRunning.class, applicationContext));
public MysqlWorkerJobRunningRepository(@Named("workerjobrunning") MysqlRepository<WorkerJobRunning> repository) {
super(repository);
}
}

View File

@@ -4,12 +4,13 @@ import io.kestra.core.runners.ExecutionDelay;
import io.kestra.jdbc.runner.AbstractJdbcExecutionDelayStorage;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutionDelayStorage extends AbstractJdbcExecutionDelayStorage {
public MysqlExecutionDelayStorage(ApplicationContext applicationContext) {
super(new MysqlRepository<>(ExecutionDelay.class, applicationContext));
public MysqlExecutionDelayStorage(@Named("executordelayed") MysqlRepository<ExecutionDelay> repository) {
super(repository);
}
}

View File

@@ -3,13 +3,13 @@ package io.kestra.runner.mysql;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutionQueuedStorage extends AbstractJdbcExecutionQueuedStorage {
public MysqlExecutionQueuedStorage(ApplicationContext applicationContext) {
super(new MysqlRepository<>(ExecutionQueued.class, applicationContext));
public MysqlExecutionQueuedStorage(@Named("executionqueued") MysqlRepository<ExecutionQueued> repository) {
super(repository);
}
}

View File

@@ -4,12 +4,13 @@ import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.core.runners.ExecutorState;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
public MysqlExecutorStateStorage(ApplicationContext applicationContext) {
super(new MysqlRepository<>(ExecutorState.class, applicationContext));
public MysqlExecutorStateStorage(@Named("executorstate") MysqlRepository<ExecutorState> repository) {
super(repository);
}
}

View File

@@ -3,13 +3,13 @@ package io.kestra.runner.mysql;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.runner.AbstractJdbcMultipleConditionStorage;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlMultipleConditionStorage extends AbstractJdbcMultipleConditionStorage {
public MysqlMultipleConditionStorage(ApplicationContext applicationContext) {
super(new MysqlRepository<>(MultipleConditionWindow.class, applicationContext));
public MysqlMultipleConditionStorage(@Named("multipleconditions") MysqlRepository<MultipleConditionWindow> repository) {
super(repository);
}
}

View File

@@ -4,12 +4,13 @@ import io.kestra.core.runners.SubflowExecution;
import io.kestra.jdbc.runner.AbstractJdbcSubflowExecutionStorage;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlSubflowExecutionStorage extends AbstractJdbcSubflowExecutionStorage {
public MysqlSubflowExecutionStorage(ApplicationContext applicationContext) {
super(new MysqlRepository<>(SubflowExecution.class, applicationContext));
public MysqlSubflowExecutionStorage(@Named("subflow-executions") MysqlRepository<SubflowExecution<?>> repository) {
super(repository);
}
}

View File

@@ -5,8 +5,10 @@ import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionSto
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.BeforeEach;
import java.util.List;
@@ -18,8 +20,12 @@ class MysqlMultipleConditionStorageTest extends AbstractMultipleConditionStorage
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
@Named("multipleconditions")
MysqlRepository<MultipleConditionWindow> repository;
protected MultipleConditionStorageInterface multipleConditionStorage() {
return new MysqlMultipleConditionStorage(applicationContext);
return new MysqlMultipleConditionStorage(repository);
}
protected void save(MultipleConditionStorageInterface multipleConditionStorage, Flow flow, List<MultipleConditionWindow> multipleConditionWindows) {

View File

@@ -6,6 +6,7 @@ import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.impl.DSL;
@@ -19,8 +20,10 @@ import java.util.stream.Collectors;
@PostgresRepositoryEnabled
public class PostgresExecutionRepository extends AbstractJdbcExecutionRepository {
@Inject
public PostgresExecutionRepository(ApplicationContext applicationContext, AbstractJdbcExecutorStateStorage executorStateStorage) {
super(new PostgresRepository<>(Execution.class, applicationContext), applicationContext, executorStateStorage);
public PostgresExecutionRepository(@Named("executions") PostgresRepository<Execution> repository,
ApplicationContext applicationContext,
AbstractJdbcExecutorStateStorage executorStateStorage) {
super(repository, applicationContext, executorStateStorage);
}
@Override

View File

@@ -3,7 +3,6 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.executions.Execution;
import io.kestra.jdbc.AbstractJdbcRepository;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.util.ArrayList;

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcFlowRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
@@ -13,8 +14,9 @@ import java.util.Map;
@PostgresRepositoryEnabled
public class PostgresFlowRepository extends AbstractJdbcFlowRepository {
@Inject
public PostgresFlowRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Flow.class, applicationContext), applicationContext);
public PostgresFlowRepository(@Named("flows") PostgresRepository<Flow> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}
@Override

View File

@@ -3,7 +3,6 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.AbstractJdbcRepository;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.util.ArrayList;

View File

@@ -2,15 +2,15 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresFlowTopologyRepository extends AbstractJdbcFlowTopologyRepository {
@Inject
public PostgresFlowTopologyRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(FlowTopology.class, applicationContext));
public PostgresFlowTopologyRepository(@Named("flowtopologies") PostgresRepository<FlowTopology> repository) {
super(repository);
}
}

View File

@@ -1,10 +1,9 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.jdbc.repository.AbstractJdbcLogRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.impl.DSL;
@@ -18,8 +17,8 @@ import java.util.stream.Collectors;
@PostgresRepositoryEnabled
public class PostgresLogRepository extends AbstractJdbcLogRepository {
@Inject
public PostgresLogRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(LogEntry.class, applicationContext));
public PostgresLogRepository(@Named("logs") PostgresRepository<LogEntry> repository) {
super(repository);
}
@Override

View File

@@ -2,16 +2,16 @@ package io.kestra.repository.postgres;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.jdbc.repository.AbstractJdbcMetricRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresMetricRepository extends AbstractJdbcMetricRepository {
@Inject
public PostgresMetricRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(MetricEntry.class, applicationContext));
public PostgresMetricRepository(@Named("metrics") PostgresRepository<MetricEntry> repository) {
super(repository);
}
}

View File

@@ -1,22 +1,39 @@
package io.kestra.repository.postgres;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.JdbcTableConfig;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import lombok.SneakyThrows;
import org.jooq.*;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.Result;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import java.util.List;
import java.util.Map;
import jakarta.annotation.Nullable;
@PostgresRepositoryEnabled
@EachBean(JdbcTableConfig.class)
public class PostgresRepository<T> extends io.kestra.jdbc.AbstractJdbcRepository<T> {
public PostgresRepository(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
@Inject
public PostgresRepository(@Parameter JdbcTableConfig jdbcTableConfig,
QueueService queueService,
JooqDSLContextWrapper dslContextWrapper) {
super(jdbcTableConfig, queueService, dslContextWrapper);
}
@Override

View File

@@ -2,15 +2,15 @@ package io.kestra.repository.postgres;
import io.kestra.core.server.ServiceInstance;
import io.kestra.jdbc.repository.AbstractJdbcServiceInstanceRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresServiceInstanceRepository extends AbstractJdbcServiceInstanceRepository {
@Inject
public PostgresServiceInstanceRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(ServiceInstance.class, applicationContext));
public PostgresServiceInstanceRepository(@Named("serviceinstance") PostgresRepository<ServiceInstance> repository) {
super(repository);
}
}

View File

@@ -4,13 +4,15 @@ import io.kestra.core.models.Setting;
import io.kestra.jdbc.repository.AbstractJdbcSettingRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresSettingRepository extends AbstractJdbcSettingRepository {
@Inject
public PostgresSettingRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Setting.class, applicationContext), applicationContext);
public PostgresSettingRepository(@Named("settings") PostgresRepository<Setting> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}
}

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.jdbc.repository.AbstractJdbcTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Condition;
@@ -15,8 +16,9 @@ import java.util.Collections;
@TemplateEnabled
public class PostgresTemplateRepository extends AbstractJdbcTemplateRepository {
@Inject
public PostgresTemplateRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Template.class, applicationContext), applicationContext);
public PostgresTemplateRepository(@Named("templates") PostgresRepository<Template> repository,
ApplicationContext applicationContext) {
super(repository, applicationContext);
}
@Override

View File

@@ -1,17 +1,16 @@
package io.kestra.repository.postgres;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.jdbc.repository.AbstractJdbcTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresTriggerRepository extends AbstractJdbcTriggerRepository {
@Inject
public PostgresTriggerRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Trigger.class, applicationContext));
public PostgresTriggerRepository(@Named("triggers") PostgresRepository<Trigger> repository) {
super(repository);
}
}

View File

@@ -2,15 +2,15 @@ package io.kestra.repository.postgres;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresWorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public PostgresWorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(WorkerJobRunning.class, applicationContext));
public PostgresWorkerJobRunningRepository(@Named("workerjobrunning") PostgresRepository<WorkerJobRunning> repository) {
super(repository);
}
}

View File

@@ -3,13 +3,13 @@ package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutionDelay;
import io.kestra.jdbc.runner.AbstractJdbcExecutionDelayStorage;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutionDelayStorage extends AbstractJdbcExecutionDelayStorage {
public PostgresExecutionDelayStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(ExecutionDelay.class, applicationContext));
public PostgresExecutionDelayStorage(@Named("executordelayed") PostgresRepository<ExecutionDelay> repository) {
super(repository);
}
}

View File

@@ -3,13 +3,13 @@ package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutionQueuedStorage extends AbstractJdbcExecutionQueuedStorage {
public PostgresExecutionQueuedStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(ExecutionQueued.class, applicationContext));
public PostgresExecutionQueuedStorage(@Named("executionqueued") PostgresRepository<ExecutionQueued> repository) {
super(repository);
}
}

View File

@@ -1,15 +1,15 @@
package io.kestra.runner.postgres;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.core.runners.ExecutorState;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutorStateStorage extends AbstractJdbcExecutorStateStorage {
public PostgresExecutorStateStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(ExecutorState.class, applicationContext));
public PostgresExecutorStateStorage(@Named("executorstate") PostgresRepository<ExecutorState> repository) {
super(repository);
}
}

View File

@@ -3,19 +3,13 @@ package io.kestra.runner.postgres;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.runner.AbstractJdbcMultipleConditionStorage;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import java.time.ZonedDateTime;
import java.util.List;
@Singleton
@PostgresQueueEnabled
public class PostgresMultipleConditionStorage extends AbstractJdbcMultipleConditionStorage {
public PostgresMultipleConditionStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(MultipleConditionWindow.class, applicationContext));
public PostgresMultipleConditionStorage(@Named("multipleconditions") PostgresRepository<MultipleConditionWindow> repository) {
super(repository);
}
}

View File

@@ -3,13 +3,13 @@ package io.kestra.runner.postgres;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.jdbc.runner.AbstractJdbcSubflowExecutionStorage;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresSubflowExecutionStorage extends AbstractJdbcSubflowExecutionStorage {
public PostgresSubflowExecutionStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(SubflowExecution.class, applicationContext));
public PostgresSubflowExecutionStorage(@Named("subflow-executions") PostgresRepository<SubflowExecution<?>> repository) {
super(repository);
}
}

View File

@@ -5,8 +5,10 @@ import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionSto
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.BeforeEach;
import java.util.List;
@@ -18,12 +20,16 @@ class PostgresMultipleConditionStorageTest extends AbstractMultipleConditionStor
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
@Named("multipleconditions")
PostgresRepository<MultipleConditionWindow> repository;
protected MultipleConditionStorageInterface multipleConditionStorage() {
return new PostgresMultipleConditionStorage(applicationContext);
return new PostgresMultipleConditionStorage(repository);
}
protected void save(MultipleConditionStorageInterface multipleConditionStorage, Flow flow, List<MultipleConditionWindow> multipleConditionWindows) {
((PostgresMultipleConditionStorage) multipleConditionStorage).save(multipleConditionWindows);
multipleConditionStorage.save(multipleConditionWindows);
}

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.executions.metrics.MetricAggregation;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import lombok.Getter;
@@ -47,16 +46,13 @@ public abstract class AbstractJdbcRepository<T> {
protected Table<Record> table;
public AbstractJdbcRepository(
Class<T> cls,
ApplicationContext applicationContext
) {
this.cls = cls;
this.queueService = applicationContext.getBean(QueueService.class);
this.dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
JdbcConfiguration jdbcConfiguration = applicationContext.getBean(JdbcConfiguration.class);
this.table = DSL.table(jdbcConfiguration.tableConfig(cls).getTable());
JdbcTableConfig tableConfig,
QueueService queueService,
JooqDSLContextWrapper dslContextWrapper) {
this.cls = (Class<T>) tableConfig.cls();
this.queueService = queueService;
this.dslContextWrapper = dslContextWrapper;
this.table = DSL.table(tableConfig.table());
}
abstract public Condition fullTextCondition(List<String> fields, String query);

View File

@@ -2,16 +2,13 @@ package io.kestra.jdbc;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import lombok.Getter;
import io.micronaut.core.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
@EachProperty("kestra.jdbc.tables")
@Getter
public class JdbcTableConfig {
String name;
Class<?> cls;
String table;
public JdbcTableConfig(@Parameter String name) {
this.name = name;
}
public record JdbcTableConfig(
@Parameter String name,
@Nullable Class<?> cls,
@NotNull String table
) {
}

View File

@@ -8,14 +8,14 @@ import java.util.List;
@ConfigurationProperties("kestra.jdbc")
@Getter
public class JdbcConfiguration {
public class JdbcTableConfigs {
@Inject
private List<JdbcTableConfig> tableConfigs;
public JdbcTableConfig tableConfig(String name) {
return this.tableConfigs
.stream()
.filter(tableConfig -> tableConfig.getName().equals(name))
.filter(tableConfig -> tableConfig.name().equals(name))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Unable to find table config for name '" + name + "'"));
}
@@ -23,7 +23,7 @@ public class JdbcConfiguration {
public JdbcTableConfig tableConfig(Class<?> cls) {
return this.tableConfigs
.stream()
.filter(tableConfig -> tableConfig.getCls() == cls)
.filter(tableConfig -> tableConfig.cls() == cls)
.findFirst()
.orElseThrow(() -> new IllegalStateException("Unable to find table config for class '" + cls.getName() + "'"));
}

View File

@@ -1,7 +1,6 @@
package io.kestra.jdbc;
import io.micronaut.context.annotation.Factory;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jooq.conf.RenderKeywordCase;
import org.jooq.conf.Settings;

View File

@@ -13,7 +13,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Singleton
public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRepository implements FlowTopologyRepositoryInterface, JdbcIndexerInterface<FlowTopology> {
protected final io.kestra.jdbc.AbstractJdbcRepository<FlowTopology> jdbcRepository;

View File

@@ -1,14 +1,14 @@
package io.kestra.jdbc.runner;
import io.kestra.core.queues.QueueException;
import io.kestra.jdbc.JdbcTableConfig;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.JdbcConfiguration;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -26,17 +26,17 @@ import java.time.ZonedDateTime;
public class JdbcCleaner {
private final JooqDSLContextWrapper dslContextWrapper;
private final Configuration configuration;
protected final Table<Record> queueTable;
@Inject
public JdbcCleaner(ApplicationContext applicationContext) {
JdbcConfiguration jdbcConfiguration = applicationContext.getBean(JdbcConfiguration.class);
public JdbcCleaner(@Named("queues") JdbcTableConfig jdbcTableConfig,
JooqDSLContextWrapper dslContextWrapper,
Configuration configuration
) {
this.dslContextWrapper = dslContextWrapper;
this.configuration = configuration;
this.dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
this.configuration = applicationContext.getBean(Configuration.class);
this.queueTable = DSL.table(jdbcConfiguration.tableConfig("queues").getTable());
this.queueTable = DSL.table(jdbcTableConfig.table());
}
public void deleteQueue() throws QueueException {

View File

@@ -1,6 +1,7 @@
package io.kestra.jdbc.runner;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
@@ -41,7 +42,14 @@ import io.kestra.core.runners.WorkerTrigger;
import io.kestra.core.runners.WorkerTriggerRunning;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.services.*;
import io.kestra.core.services.AbstractFlowTriggerService;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.LogService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.tasks.flows.ForEachItem;
import io.kestra.core.tasks.flows.Template;
import io.kestra.core.topologies.FlowTopologyService;
@@ -55,6 +63,7 @@ import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@@ -73,6 +82,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -85,14 +95,9 @@ public class JdbcExecutor implements ExecutorInterface, Service {
private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();
private volatile boolean isShutdown = false;
@Inject
private ApplicationContext applicationContext;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private AbstractJdbcExecutionRepository executionRepository;
@@ -112,6 +117,18 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
@Inject
@Named(QueueFactoryInterface.FLOW_NAMED)
private QueueInterface<Flow> flowQueue;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@Inject
@Named(QueueFactoryInterface.SUBFLOWEXECUTIONRESULT_NAMED)
private QueueInterface<SubflowExecutionResult> subflowExecutionResultQueue;
@Inject
private RunContextFactory runContextFactory;
@@ -158,15 +175,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Inject
private FlowTopologyService flowTopologyService;
@Inject
private AbstractJdbcFlowTopologyRepository flowTopologyRepository;
protected List<Flow> allFlows;
@Inject
@Named(QueueFactoryInterface.FLOW_NAMED)
private QueueInterface<Flow> flowQueue;
@Inject
private WorkerGroupService workerGroupService;
@@ -176,26 +186,43 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Inject
private AbstractJdbcWorkerJobRunningRepository workerJobRunningRepository;
@Inject
@Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue;
@Inject
@Named(QueueFactoryInterface.SUBFLOWEXECUTIONRESULT_NAMED)
private QueueInterface<SubflowExecutionResult> subflowExecutionResultQueue;
@Inject
private LogService logService;
@Inject
private JdbcServiceLivenessCoordinator serviceLivenessCoordinator;
private final FlowRepositoryInterface flowRepository;
@Inject
private ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
private final JdbcServiceLivenessCoordinator serviceLivenessCoordinator;
private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
private final AbstractJdbcFlowTopologyRepository flowTopologyRepository;
private final String id = IdUtils.create();
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicReference<ServiceState> state = new AtomicReference<>();
/**
* Creates a new {@link JdbcExecutor} instance. Both constructor and field injection are used
* to force Micronaut to respect order when invoking pre-destroy order.
*
* @param serviceLivenessCoordinator The {@link JdbcServiceLivenessCoordinator}.
* @param flowRepository The {@link FlowRepositoryInterface}.
* @param flowTopologyRepository The {@link AbstractJdbcFlowTopologyRepository}.
* @param eventPublisher The {@link ApplicationEventPublisher}.
*/
@Inject
public JdbcExecutor(final JdbcServiceLivenessCoordinator serviceLivenessCoordinator,
final FlowRepositoryInterface flowRepository,
final AbstractJdbcFlowTopologyRepository flowTopologyRepository,
final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher) {
this.serviceLivenessCoordinator = serviceLivenessCoordinator;
this.flowRepository = flowRepository;
this.flowTopologyRepository = flowTopologyRepository;
this.eventPublisher = eventPublisher;
}
@SneakyThrows
@Override
public void run() {
@@ -230,13 +257,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
} catch (ExecutionException | InterruptedException e) {
if (e.getCause().getClass() != CannotCreateTransactionException.class) {
log.error("Executor fatal exception in the scheduledDelay thread", e);
try {
close();
applicationContext.stop();
} catch (IOException ioe) {
log.error("Unable to properly close the executor", ioe);
}
close();
KestraContext.getContext().shutdown();
}
}
},
@@ -818,14 +840,15 @@ public class JdbcExecutor implements ExecutorInterface, Service {
return taskDefaultService.injectDefaults(flow, execution);
}
/** ExecutionDelay is currently two type of execution :
/**
* ExecutionDelay is currently two type of execution :
* <br/>
* - Paused flow that will be restart after an interval/timeout
* - Paused flow that will be restarted after an interval/timeout
* <br/>
* - Failed flow that will be retried after an interval
**/
private void executionDelaySend() {
if (isShutdown) {
**/
private void executionDelaySend() {
if (shutdown.get()) {
return;
}
@@ -844,7 +867,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
);
executor = executor.withExecution(markAsExecution, "pausedRestart");
// Handle failed tasks
// Handle failed tasks
} else if (executor.getExecution().findTaskRunByTaskRunId(executionDelay.getTaskRunId()).getState().getCurrent().equals(State.Type.FAILED)) {
Execution newAttempt = executionService.retry(
pair.getKey(),
@@ -930,13 +953,19 @@ public class JdbcExecutor implements ExecutorInterface, Service {
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public void close() throws IOException {
setState(ServiceState.TERMINATING);
isShutdown = true;
scheduledDelay.shutdown();
setState(ServiceState.TERMINATED_GRACEFULLY);
@PreDestroy
public void close() {
if (shutdown.compareAndSet(false, true)) {
log.info("Terminating.");
setState(ServiceState.TERMINATING);
scheduledDelay.shutdown();
setState(ServiceState.TERMINATED_GRACEFULLY);
log.info("Executor closed ({}).", state.get().name().toLowerCase());
}
}
private void setState(final ServiceState state) {
@@ -944,19 +973,25 @@ public class JdbcExecutor implements ExecutorInterface, Service {
eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public String getId() {
return id;
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public ServiceType getType() {
return ServiceType.EXECUTOR;
}
/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public ServiceState getState() {
return state.get();

View File

@@ -10,7 +10,7 @@ import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcConfiguration;
import io.kestra.jdbc.JdbcTableConfigs;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
@@ -20,11 +20,14 @@ import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.*;
import org.jooq.Result;
import org.jooq.Table;
import org.jooq.impl.DSL;
import javax.sql.DataSource;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
@@ -52,8 +55,6 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
protected final JooqDSLContextWrapper dslContextWrapper;
protected final DataSource dataSource;
protected final Configuration configuration;
protected final Table<Record> table;
@@ -69,12 +70,11 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
this.queueService = applicationContext.getBean(QueueService.class);
this.cls = cls;
this.dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
this.dataSource = applicationContext.getBean(DataSource.class);
this.configuration = applicationContext.getBean(Configuration.class);
JdbcConfiguration jdbcConfiguration = applicationContext.getBean(JdbcConfiguration.class);
JdbcTableConfigs jdbcTableConfigs = applicationContext.getBean(JdbcTableConfigs.class);
this.table = DSL.table(jdbcConfiguration.tableConfig("queues").getTable());
this.table = DSL.table(jdbcTableConfigs.tableConfig("queues").table());
this.jdbcQueueIndexer = applicationContext.getBean(JdbcQueueIndexer.class);
}

View File

@@ -3,7 +3,7 @@ package io.kestra.jdbc.runner;
import io.kestra.core.runners.DeserializationIssuesCaseTest;
import io.kestra.core.runners.StandAloneRunner;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcConfiguration;
import io.kestra.jdbc.JdbcTableConfigs;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
@@ -33,7 +33,7 @@ public abstract class AbstractJdbcDeserializationIssuesTest {
private JooqDSLContextWrapper dslContextWrapper;
@Inject
private JdbcConfiguration jdbcConfiguration;
private JdbcTableConfigs jdbcTableConfigs;
@Inject
private StandAloneRunner runner;
@@ -64,7 +64,7 @@ public abstract class AbstractJdbcDeserializationIssuesTest {
private void sendToQueue(DeserializationIssuesCaseTest.QueueMessage queueMessage) {
Table<Record> table = DSL.table(jdbcConfiguration.tableConfig("queues").getTable());
Table<Record> table = DSL.table(jdbcTableConfigs.tableConfig("queues").table());
Map<Field<Object>, Object> fields = fields(queueMessage);

View File

@@ -103,7 +103,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
CountDownLatch resubmitLatch = new CountDownLatch(1);
// create first worker
Worker worker = new Worker(applicationContext, 1, null, IdUtils.create());
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
worker.run();
runner.setSchedulerEnabled(false);
@@ -127,7 +127,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
worker.shutdown(); // stop processing task
// create second worker (this will revoke previously one).
Worker newWorker = new Worker(applicationContext, 1, null, IdUtils.create());
Worker newWorker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
newWorker.run();
resubmitLatch.await(30, TimeUnit.SECONDS);
newWorker.shutdown();
@@ -138,8 +138,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
void taskResubmitSkipExecution() throws Exception {
CountDownLatch runningLatch = new CountDownLatch(1);
Worker worker = new Worker(applicationContext, 8, null);
applicationContext.registerSingleton(worker);
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
worker.run();
runner.setSchedulerEnabled(false);
runner.setWorkerEnabled(false);
@@ -165,8 +164,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
runningLatch.await(2, TimeUnit.SECONDS);
worker.shutdown();
Worker newWorker = new Worker(applicationContext, 8, null);
applicationContext.registerSingleton(newWorker);
Worker newWorker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
newWorker.run();
// wait a little to be sure there is no resubmit
@@ -180,8 +178,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
void shouldReEmitTriggerWhenWorkerIsDetectedAsNonResponding() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
Worker worker = new Worker(applicationContext, 1, null);
applicationContext.registerSingleton(worker);
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
worker.run();
runner.setSchedulerEnabled(false);
runner.setWorkerEnabled(false);
@@ -202,7 +199,7 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
);
worker.shutdown();
Worker newWorker = new Worker(applicationContext, 1, null);
Worker newWorker = applicationContext.createBean(Worker.class, IdUtils.create(), 1, null);
applicationContext.registerSingleton(newWorker);
newWorker.run();

View File

@@ -88,13 +88,13 @@ class JdbcServiceLivenessManagerTest {
Mockito.when(repository.mayTransitionServiceTo(any(ServiceInstance.class), any(Service.ServiceState.class))).thenThrow(new RuntimeException());
serviceLivenessManager.run(now.plus(Duration.ofSeconds(2))); // FAIL
Mockito.verify(context, Mockito.never()).exit(Mockito.eq(1));
Mockito.verify(context, Mockito.never()).shutdown();
serviceLivenessManager.run(now.plus(Duration.ofSeconds(4))); // FAIL
Mockito.verify(context, Mockito.never()).exit(Mockito.eq(1));
Mockito.verify(context, Mockito.never()).shutdown();
// Then
serviceLivenessManager.run(now.plus(Duration.ofSeconds(6))); // TIMEOUT
Mockito.verify(context, Mockito.times(1)).exit(Mockito.eq(1));
Mockito.verify(context, Mockito.times(1)).shutdown();
}
}

View File

@@ -1,22 +1,25 @@
package io.kestra.runner.memory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.Hashing;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.utils.Either;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.queues.QueueService;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ExecutorsUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
@@ -24,20 +27,19 @@ import java.util.function.Consumer;
@Slf4j
public class MemoryQueue<T> implements QueueInterface<T> {
private static final ObjectMapper mapper = JacksonMapper.ofJson();
private static ExecutorService poolExecutor;
private final ExecutorService executorService;
private final QueueService queueService;
private final Class<T> cls;
private final Map<String, List<Consumer<Either<T, DeserializationException>>>> queues = new ConcurrentHashMap<>();
public MemoryQueue(Class<T> cls, ApplicationContext applicationContext) {
if (poolExecutor == null) {
ExecutorsUtils executorsUtils = applicationContext.getBean(ExecutorsUtils.class);
poolExecutor = executorsUtils.cachedThreadPool("memory-queue");
}
this.queueService = applicationContext.getBean(QueueService.class);
public MemoryQueue(final Class<T> cls,
final QueueService queueService,
final ExecutorService executorService) {
this.executorService = executorService;
this.queueService = queueService;
this.cls = cls;
}
@@ -57,11 +59,11 @@ public class MemoryQueue<T> implements QueueInterface<T> {
this.queues
.forEach((consumerGroup, consumers) -> {
poolExecutor.execute(() -> {
executorService.execute(() -> {
Consumer<Either<T, DeserializationException>> consumer;
synchronized (this) {
if (consumers.size() == 0) {
if (consumers.isEmpty()) {
log.debug("No consumer connected on queue '" + this.cls.getName() + "'");
return;
} else {
@@ -124,7 +126,7 @@ public class MemoryQueue<T> implements QueueInterface<T> {
synchronized (this) {
this.queues.get(queueName).remove(index);
if (this.queues.get(queueName).size() == 0) {
if (this.queues.get(queueName).isEmpty()) {
this.queues.remove(queueName);
}
}
@@ -146,8 +148,8 @@ public class MemoryQueue<T> implements QueueInterface<T> {
@Override
public void close() throws IOException {
if (!poolExecutor.isShutdown()) {
poolExecutor.shutdown();
if (!executorService.isShutdown()) {
executorService.shutdown();
}
}
}

View File

@@ -1,8 +1,17 @@
package io.kestra.runner.memory;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueService;
import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Prototype;
@@ -16,7 +25,6 @@ import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.*;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -29,14 +37,23 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
@Factory
@MemoryQueueEnabled
public class MemoryQueueFactory implements QueueFactoryInterface {
private static final String QUEUE_NAME = "memory-queue";
@Inject
ApplicationContext applicationContext;
@Inject
ExecutorsUtils executorsUtils;
@Inject
QueueService queueService;
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_NAMED)
public QueueInterface<Execution> execution() {
return new MemoryQueue<>(Execution.class, applicationContext);
return createQueueForType(Execution.class);
}
@Override
@@ -50,77 +67,77 @@ public class MemoryQueueFactory implements QueueFactoryInterface {
@Singleton
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
public QueueInterface<WorkerJob> workerJob() {
return new MemoryQueue<>(WorkerJob.class, applicationContext);
return createQueueForType(WorkerJob.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
public QueueInterface<WorkerTaskResult> workerTaskResult() {
return new MemoryQueue<>(WorkerTaskResult.class, applicationContext);
return createQueueForType(WorkerTaskResult.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)
public QueueInterface<WorkerTriggerResult> workerTriggerResult() {
return new MemoryQueue<>(WorkerTriggerResult.class, applicationContext);
return createQueueForType(WorkerTriggerResult.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
public QueueInterface<LogEntry> logEntry() {
return new MemoryQueue<>(LogEntry.class, applicationContext);
return createQueueForType(LogEntry.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.METRIC_QUEUE)
public QueueInterface<MetricEntry> metricEntry() {
return new MemoryQueue<>(MetricEntry.class, applicationContext);
return createQueueForType(MetricEntry.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.FLOW_NAMED)
public QueueInterface<Flow> flow() {
return new MemoryQueue<>(Flow.class, applicationContext);
return createQueueForType(Flow.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.KILL_NAMED)
public QueueInterface<ExecutionKilled> kill() {
return new MemoryQueue<>(ExecutionKilled.class, applicationContext);
return createQueueForType(ExecutionKilled.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.TEMPLATE_NAMED)
public QueueInterface<Template> template() {
return new MemoryQueue<>(Template.class, applicationContext);
return createQueueForType(Template.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.WORKERINSTANCE_NAMED)
public QueueInterface<WorkerInstance> workerInstance() {
return new MemoryQueue<>(WorkerInstance.class, applicationContext);
return createQueueForType(WorkerInstance.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.WORKERJOBRUNNING_NAMED)
public QueueInterface<WorkerJobRunning> workerJobRunning() {
return new MemoryQueue<>(WorkerJobRunning.class, applicationContext);
return createQueueForType(WorkerJobRunning.class);
}
@Override
@Singleton
@Named(QueueFactoryInterface.TRIGGER_NAMED)
public QueueInterface<Trigger> trigger() {
return new MemoryQueue<>(Trigger.class, applicationContext);
return createQueueForType(Trigger.class);
}
@Override
@@ -139,11 +156,15 @@ public class MemoryQueueFactory implements QueueFactoryInterface {
@Singleton
@Named(QueueFactoryInterface.SUBFLOWEXECUTIONRESULT_NAMED)
public QueueInterface<SubflowExecutionResult> subflowExecutionResult() {
return new MemoryQueue<>(SubflowExecutionResult.class, applicationContext);
return createQueueForType(SubflowExecutionResult.class);
}
@PreDestroy
void closeAllQueue() throws IOException {
this.applicationContext.getBeansOfType(MemoryQueue.class).forEach(throwConsumer(queue -> queue.close()));
this.applicationContext.getBeansOfType(MemoryQueue.class).forEach(throwConsumer(MemoryQueue::close));
}
private <T> MemoryQueue<T> createQueueForType(final Class<T> type) {
return new MemoryQueue<>(type, queueService, executorsUtils.cachedThreadPool(QUEUE_NAME));
}
}

View File

@@ -66,10 +66,6 @@ public final class WebserverService implements Service {
@EventListener
public void onServeShutdown(ServerShutdownEvent event) {
setState(ServiceState.TERMINATING);
}
@PreDestroy
public void preDestroy() {
setState(ServiceState.TERMINATED_GRACEFULLY);
}
}