mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
fix(jdbc): handle trigger in jdbc heartbeat/resubmit (#2240)
This fix follow the JDBC heartbeat & task resubmit feature recently released in the 0.12
This commit is contained in:
@@ -52,4 +52,6 @@ public interface QueueFactoryInterface {
|
||||
|
||||
WorkerJobQueueInterface workerJobQueue();
|
||||
|
||||
WorkerTriggerResultQueueInterface workerTriggerResultQueue();
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.runners.WorkerTriggerResult;
|
||||
import io.kestra.core.utils.Either;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/*
|
||||
* Required for the QueueFactory, to have common interface with JDBC & Kafka
|
||||
*/
|
||||
public interface WorkerTriggerResultQueueInterface extends Closeable {
|
||||
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer);
|
||||
|
||||
void pause();
|
||||
|
||||
void cleanup();
|
||||
}
|
||||
@@ -6,8 +6,8 @@ import io.kestra.core.runners.WorkerJobRunning;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface WorkerJobRunningRepositoryInterface {
|
||||
Optional<WorkerJobRunning> findByTaskRunId(String taskRunId);
|
||||
Optional<WorkerJobRunning> findByKey(String uid);
|
||||
|
||||
void deleteByTaskRunId(String taskRunId);
|
||||
void deleteByKey(String uid);
|
||||
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable {
|
||||
@Setter private java.util.concurrent.ExecutorService poolExecutor;
|
||||
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
|
||||
@Setter protected boolean schedulerEnabled = true;
|
||||
@Setter protected boolean workerEnabled = true;
|
||||
|
||||
@Inject
|
||||
private ExecutorsUtils executorsUtils;
|
||||
@@ -52,10 +53,12 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable {
|
||||
|
||||
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
|
||||
|
||||
Worker worker = new Worker(applicationContext, workerThread, null);
|
||||
applicationContext.registerSingleton(worker);
|
||||
poolExecutor.execute(worker);
|
||||
servers.add(worker);
|
||||
if(workerEnabled) {
|
||||
Worker worker = new Worker(applicationContext, workerThread, null);
|
||||
applicationContext.registerSingleton(worker);
|
||||
poolExecutor.execute(worker);
|
||||
servers.add(worker);
|
||||
}
|
||||
|
||||
if (schedulerEnabled) {
|
||||
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.runners;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@@ -555,13 +556,18 @@ public class Worker implements Runnable, AutoCloseable {
|
||||
@SuppressWarnings("ResultOfMethodCallIgnored")
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
closeWorker(Duration.ofMinutes(5));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void closeWorker(Duration awaitDuration) throws Exception {
|
||||
workerJobQueue.pause();
|
||||
executionKilledQueue.pause();
|
||||
new Thread(
|
||||
() -> {
|
||||
try {
|
||||
this.executors.shutdown();
|
||||
this.executors.awaitTermination(5, TimeUnit.MINUTES);
|
||||
this.executors.awaitTermination(awaitDuration.toMillis(), TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Fail to shutdown the worker", e);
|
||||
}
|
||||
@@ -573,7 +579,7 @@ public class Worker implements Runnable, AutoCloseable {
|
||||
|
||||
Await.until(
|
||||
() -> {
|
||||
if (this.executors.isTerminated() && this.workerThreadReferences.isEmpty()) {
|
||||
if (this.executors.isTerminated() || this.workerThreadReferences.isEmpty()) {
|
||||
log.info("No more worker threads busy, shutting down!");
|
||||
|
||||
// we ensure that last produce message are send
|
||||
@@ -607,6 +613,11 @@ public class Worker implements Runnable, AutoCloseable {
|
||||
metricEntryQueue.close();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void shutdown() throws IOException {
|
||||
this.executors.shutdownNow();
|
||||
}
|
||||
|
||||
public List<WorkerTask> getWorkerThreadTasks() {
|
||||
return this.workerThreadReferences.stream().map(thread -> thread.workerTask).toList();
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.core.runners;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@@ -30,7 +30,7 @@ public class WorkerTaskRunning extends WorkerJobRunning {
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return this.taskRun.getTaskId();
|
||||
return this.taskRun.getId();
|
||||
}
|
||||
|
||||
public static WorkerTaskRunning of(WorkerTask workerTask, WorkerInstance workerInstance, int partition) {
|
||||
|
||||
@@ -13,26 +13,22 @@ import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.models.triggers.types.Schedule;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.runners.WorkerJob;
|
||||
import io.kestra.core.runners.WorkerTrigger;
|
||||
import io.kestra.core.runners.WorkerTriggerResult;
|
||||
import io.kestra.core.services.ConditionService;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.services.TaskDefaultService;
|
||||
import io.kestra.core.services.WorkerGroupService;
|
||||
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.services.*;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
@@ -41,9 +37,6 @@ import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Stream;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
@Slf4j
|
||||
@Singleton
|
||||
@@ -52,7 +45,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
private final QueueInterface<Execution> executionQueue;
|
||||
private final QueueInterface<Trigger> triggerQueue;
|
||||
private final QueueInterface<WorkerJob> workerTaskQueue;
|
||||
private final QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
|
||||
private final WorkerTriggerResultQueueInterface workerTriggerResultQueue;
|
||||
protected final FlowListenersInterface flowListeners;
|
||||
private final RunContextFactory runContextFactory;
|
||||
private final MetricRegistry metricRegistry;
|
||||
@@ -85,7 +78,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
this.executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
|
||||
this.triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED));
|
||||
this.workerTaskQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED));
|
||||
this.workerTriggerResultQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED));
|
||||
this.workerTriggerResultQueue = applicationContext.getBean(WorkerTriggerResultQueueInterface.class);
|
||||
this.flowListeners = flowListeners;
|
||||
this.runContextFactory = applicationContext.getBean(RunContextFactory.class);
|
||||
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
|
||||
@@ -152,6 +145,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
||||
|
||||
// listen to WorkerTriggerResult from polling triggers
|
||||
this.workerTriggerResultQueue.receive(
|
||||
null,
|
||||
Scheduler.class,
|
||||
either -> {
|
||||
if (either.isRight()) {
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
package io.kestra.core.tasks.test;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.PollingTriggerInterface;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* This trigger is used in unit tests where we need a task that wait a little to be able to check the resubmit of triggers.
|
||||
*/
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class SleepTrigger extends AbstractTrigger implements PollingTriggerInterface {
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
private Long duration;
|
||||
|
||||
@Override
|
||||
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) {
|
||||
// Try catch to avoid flakky test
|
||||
try {
|
||||
Thread.sleep(duration);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getInterval() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
@@ -121,4 +122,10 @@ public class H2QueueFactory implements QueueFactoryInterface {
|
||||
public WorkerJobQueueInterface workerJobQueue() {
|
||||
return new H2WorkerJobQueue(applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
|
||||
return new H2WorkerTriggerResultQueue(applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||
import io.kestra.core.runners.WorkerTriggerResult;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Slf4j
|
||||
public class H2WorkerTriggerResultQueue extends H2Queue<WorkerTriggerResult> implements WorkerTriggerResultQueueInterface {
|
||||
private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService;
|
||||
|
||||
public H2WorkerTriggerResultQueue(ApplicationContext applicationContext) {
|
||||
super(WorkerTriggerResult.class, applicationContext);
|
||||
this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause() {
|
||||
jdbcWorkerTriggerResultQueueService.pause();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
jdbcWorkerTriggerResultQueueService.cleanup();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
jdbcWorkerTriggerResultQueueService.close();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE worker_job_running
|
||||
DROP COLUMN "taskrun_id";
|
||||
@@ -0,0 +1,7 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.jdbc.runner.JdbcHeartbeatTest;
|
||||
|
||||
class H2HeartbeatTest extends JdbcHeartbeatTest {
|
||||
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
@@ -121,4 +122,10 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
|
||||
public WorkerJobQueueInterface workerJobQueue() {
|
||||
return new MysqlWorkerJobQueue(applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
|
||||
return new MysqlWorkerTriggerResultQueue(applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||
import io.kestra.core.runners.WorkerTriggerResult;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Slf4j
|
||||
public class MysqlWorkerTriggerResultQueue extends MysqlQueue<WorkerTriggerResult> implements WorkerTriggerResultQueueInterface {
|
||||
private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService;
|
||||
|
||||
public MysqlWorkerTriggerResultQueue(ApplicationContext applicationContext) {
|
||||
super(WorkerTriggerResult.class, applicationContext);
|
||||
this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause() {
|
||||
jdbcWorkerTriggerResultQueueService.pause();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
jdbcWorkerTriggerResultQueueService.cleanup();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
jdbcWorkerTriggerResultQueueService.close();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE worker_job_running
|
||||
DROP COLUMN taskrun_id;
|
||||
@@ -0,0 +1,7 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.jdbc.runner.JdbcHeartbeatTest;
|
||||
|
||||
class MysqlHeartbeatTest extends JdbcHeartbeatTest {
|
||||
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
@@ -121,4 +122,10 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
|
||||
public WorkerJobQueueInterface workerJobQueue() {
|
||||
return new PostgresWorkerJobQueue(applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
|
||||
return new PostgresWorkerTriggerResultQueue(applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||
import io.kestra.core.runners.WorkerTriggerResult;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.jdbc.JdbcWorkerTriggerResultQueueService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Slf4j
|
||||
public class PostgresWorkerTriggerResultQueue implements WorkerTriggerResultQueueInterface {
|
||||
private final JdbcWorkerTriggerResultQueueService jdbcWorkerTriggerResultQueueService;
|
||||
|
||||
public PostgresWorkerTriggerResultQueue(ApplicationContext applicationContext) {
|
||||
this.jdbcWorkerTriggerResultQueueService = applicationContext.getBean(JdbcWorkerTriggerResultQueueService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||
return jdbcWorkerTriggerResultQueueService.receive(consumerGroup, queueType, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause() {
|
||||
jdbcWorkerTriggerResultQueueService.pause();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
jdbcWorkerTriggerResultQueueService.cleanup();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
jdbcWorkerTriggerResultQueueService.close();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE worker_job_running
|
||||
DROP COLUMN taskrun_id;
|
||||
@@ -0,0 +1,7 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.jdbc.runner.JdbcHeartbeatTest;
|
||||
|
||||
class PostgresHeartbeatTest extends JdbcHeartbeatTest {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package io.kestra.jdbc;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.WorkerTriggerResult;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
|
||||
import io.kestra.jdbc.runner.JdbcQueue;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class JdbcWorkerTriggerResultQueueService {
|
||||
private final JdbcQueue<WorkerTriggerResult> workerTriggerResultQueue;
|
||||
@Inject
|
||||
private AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
|
||||
private Runnable queueStop;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public JdbcWorkerTriggerResultQueueService(ApplicationContext applicationContext) {
|
||||
this.workerTriggerResultQueue = (JdbcQueue<WorkerTriggerResult>) applicationContext.getBean(
|
||||
QueueInterface.class,
|
||||
Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED));
|
||||
}
|
||||
|
||||
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||
this.queueStop = workerTriggerResultQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
|
||||
eithers.forEach(either -> {
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
WorkerTriggerResult workerTriggerResult = either.getLeft();
|
||||
jdbcWorkerJobRunningRepository.deleteByKey(workerTriggerResult.getTriggerContext().uid());
|
||||
|
||||
});
|
||||
eithers.forEach(consumer);
|
||||
});
|
||||
return this.queueStop;
|
||||
}
|
||||
|
||||
public void pause() {
|
||||
this.stopQueue();
|
||||
}
|
||||
|
||||
private void stopQueue() {
|
||||
synchronized (this) {
|
||||
if (this.queueStop != null) {
|
||||
this.queueStop.run();
|
||||
this.queueStop = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void cleanup() { }
|
||||
|
||||
public void close() {
|
||||
this.stopQueue();
|
||||
}
|
||||
}
|
||||
@@ -27,13 +27,13 @@ public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdb
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteByTaskRunId(String taskRunId) {
|
||||
Optional<WorkerJobRunning> workerJobRunning = this.findByTaskRunId(taskRunId);
|
||||
public void deleteByKey(String uid) {
|
||||
Optional<WorkerJobRunning> workerJobRunning = this.findByKey(uid);
|
||||
workerJobRunning.ifPresent(jobRunning -> this.jdbcRepository.delete(jobRunning));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<WorkerJobRunning> findByTaskRunId(String taskRunId) {
|
||||
public Optional<WorkerJobRunning> findByKey(String uid) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
@@ -42,7 +42,7 @@ public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdb
|
||||
.select((field("value")))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(
|
||||
field("taskrun_id").eq(taskRunId)
|
||||
field("key").eq(uid)
|
||||
);
|
||||
|
||||
return this.jdbcRepository.fetchOne(select);
|
||||
|
||||
@@ -35,7 +35,6 @@ import jakarta.inject.Singleton;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.jooq.DSLContext;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -477,6 +476,9 @@ public class JdbcExecutor implements ExecutorInterface {
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message))
|
||||
.record(message.getTaskRun().getState().getDuration());
|
||||
|
||||
log.trace("TaskRun terminated: {}", message.getTaskRun());
|
||||
workerJobRunningRepository.deleteByKey(message.getTaskRun().getId());
|
||||
}
|
||||
|
||||
// join worker result
|
||||
|
||||
@@ -70,7 +70,7 @@ public class JdbcHeartbeat {
|
||||
}
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Heartbeat of: {}", workerInstance.getWorkerUuid());
|
||||
log.error("Heartbeat of: {}", workerInstance.getWorkerUuid());
|
||||
}
|
||||
|
||||
if (workerInstanceRepository.heartbeatCheckUp(workerInstance.getWorkerUuid().toString()).isEmpty()) {
|
||||
|
||||
201
jdbc/src/test/java/io/kestra/jdbc/runner/JdbcHeartbeatTest.java
Normal file
201
jdbc/src/test/java/io/kestra/jdbc/runner/JdbcHeartbeatTest.java
Normal file
@@ -0,0 +1,201 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State.Type;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.tasks.test.Sleep;
|
||||
import io.kestra.core.tasks.test.SleepTrigger;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.JdbcTestUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@MicronautTest(transactional = false, environments = "heartbeat")
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
|
||||
@Property(name = "kestra.server-type", value = "EXECUTOR")
|
||||
public abstract class JdbcHeartbeatTest {
|
||||
@Inject
|
||||
private StandAloneRunner runner;
|
||||
|
||||
@Inject
|
||||
private RunnerUtils runnerUtils;
|
||||
|
||||
@Inject
|
||||
private LocalFlowRepositoryLoader repositoryLoader;
|
||||
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
JdbcTestUtils jdbcTestUtils;
|
||||
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
|
||||
QueueInterface<WorkerJob> workerJobQueue;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
|
||||
QueueInterface<WorkerTaskResult> workerTaskResultQueue;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)
|
||||
QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
|
||||
|
||||
@BeforeAll
|
||||
void init() throws IOException, URISyntaxException {
|
||||
jdbcTestUtils.drop();
|
||||
jdbcTestUtils.migrate();
|
||||
|
||||
TestsUtils.loads(repositoryLoader);
|
||||
}
|
||||
|
||||
@Test
|
||||
void taskResubmit() throws Exception {
|
||||
CountDownLatch runningLatch = new CountDownLatch(1);
|
||||
CountDownLatch resubmitLatch = new CountDownLatch(1);
|
||||
|
||||
Worker worker = new Worker(applicationContext, 8, null);
|
||||
applicationContext.registerSingleton(worker);
|
||||
worker.run();
|
||||
runner.setSchedulerEnabled(false);
|
||||
runner.setWorkerEnabled(false);
|
||||
runner.run();
|
||||
|
||||
AtomicReference<WorkerTaskResult> workerTaskResult = new AtomicReference<>(null);
|
||||
workerTaskResultQueue.receive(either -> {
|
||||
workerTaskResult.set(either.getLeft());
|
||||
|
||||
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.SUCCESS) {
|
||||
resubmitLatch.countDown();
|
||||
}
|
||||
|
||||
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.RUNNING) {
|
||||
runningLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
workerJobQueue.emit(workerTask(1500));
|
||||
runningLatch.await(2, TimeUnit.SECONDS);
|
||||
worker.shutdown();
|
||||
|
||||
Worker newWorker = new Worker(applicationContext, 8, null);
|
||||
applicationContext.registerSingleton(newWorker);
|
||||
newWorker.run();
|
||||
resubmitLatch.await(15, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(workerTaskResult.get().getTaskRun().getState().getCurrent(), is(Type.SUCCESS));
|
||||
}
|
||||
|
||||
@Test
|
||||
void triggerResubmit() throws Exception {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
Worker worker = new Worker(applicationContext, 8, null);
|
||||
applicationContext.registerSingleton(worker);
|
||||
worker.run();
|
||||
runner.setSchedulerEnabled(false);
|
||||
runner.setWorkerEnabled(false);
|
||||
runner.run();
|
||||
|
||||
AtomicReference<WorkerTriggerResult> workerTriggerResult = new AtomicReference<>(null);
|
||||
workerTriggerResultQueue.receive(either -> {
|
||||
workerTriggerResult.set(either.getLeft());
|
||||
});
|
||||
|
||||
workerJobQueue.emit(workerTrigger(7000));
|
||||
countDownLatch.await(2, TimeUnit.SECONDS);
|
||||
worker.shutdown();
|
||||
|
||||
Worker newWorker = new Worker(applicationContext, 8, null);
|
||||
applicationContext.registerSingleton(newWorker);
|
||||
newWorker.run();
|
||||
countDownLatch.await(12, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(workerTriggerResult.get().getSuccess(), is(true));
|
||||
}
|
||||
|
||||
private WorkerTask workerTask(long sleepDuration) {
|
||||
Sleep bash = Sleep.builder()
|
||||
.type(Sleep.class.getName())
|
||||
.id("unit-test")
|
||||
.duration(sleepDuration)
|
||||
.build();
|
||||
|
||||
Execution execution = TestsUtils.mockExecution(flowBuilder(sleepDuration), ImmutableMap.of());
|
||||
|
||||
ResolvedTask resolvedTask = ResolvedTask.of(bash);
|
||||
|
||||
return WorkerTask.builder()
|
||||
.runContext(runContextFactory.of(ImmutableMap.of("key", "value")))
|
||||
.task(bash)
|
||||
.taskRun(TaskRun.of(execution, resolvedTask))
|
||||
.build();
|
||||
}
|
||||
|
||||
private WorkerTrigger workerTrigger(long sleepDuration) {
|
||||
SleepTrigger trigger = SleepTrigger.builder()
|
||||
.type(SleepTrigger.class.getName())
|
||||
.id("unit-test")
|
||||
.duration(sleepDuration)
|
||||
.build();
|
||||
|
||||
Map.Entry<ConditionContext, TriggerContext> mockedTrigger = TestsUtils.mockTrigger(runContextFactory, trigger);
|
||||
|
||||
return WorkerTrigger.builder()
|
||||
.trigger(trigger)
|
||||
.triggerContext(mockedTrigger.getValue())
|
||||
.conditionContext(mockedTrigger.getKey())
|
||||
.build();
|
||||
}
|
||||
|
||||
private Flow flowBuilder(long sleepDuration) {
|
||||
Sleep bash = Sleep.builder()
|
||||
.type(Sleep.class.getName())
|
||||
.id("unit-test")
|
||||
.duration(sleepDuration)
|
||||
.build();
|
||||
|
||||
SleepTrigger trigger = SleepTrigger.builder()
|
||||
.type(SleepTrigger.class.getName())
|
||||
.id("unit-test")
|
||||
.duration(sleepDuration)
|
||||
.build();
|
||||
|
||||
return Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unit-test")
|
||||
.tasks(Collections.singletonList(bash))
|
||||
.triggers(Collections.singletonList(trigger))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
4
jdbc/src/test/resources/application-heartbeat.yml
Normal file
4
jdbc/src/test/resources/application-heartbeat.yml
Normal file
@@ -0,0 +1,4 @@
|
||||
kestra:
|
||||
heartbeat:
|
||||
frequency: 3s
|
||||
heartbeat-missed: 1
|
||||
@@ -2,6 +2,7 @@ package io.kestra.runner.memory;
|
||||
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.context.annotation.Prototype;
|
||||
@@ -122,4 +123,10 @@ public class MemoryQueueFactory implements QueueFactoryInterface {
|
||||
public WorkerJobQueueInterface workerJobQueue() {
|
||||
return new MemoryWorkerJobQueue(applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
public WorkerTriggerResultQueueInterface workerTriggerResultQueue() {
|
||||
return new MemoryWorkerTriggerResultQueue(applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
package io.kestra.runner.memory;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.runners.StandAloneRunner;
|
||||
import io.kestra.core.runners.WorkerJob;
|
||||
import io.kestra.core.runners.WorkerTaskResult;
|
||||
import io.kestra.core.utils.Await;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Slf4j
|
||||
@Singleton
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package io.kestra.runner.memory;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.queues.WorkerTriggerResultQueueInterface;
|
||||
import io.kestra.core.runners.WorkerTriggerResult;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class MemoryWorkerTriggerResultQueue implements WorkerTriggerResultQueueInterface {
|
||||
QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public MemoryWorkerTriggerResultQueue(ApplicationContext applicationContext) {
|
||||
this.workerTriggerResultQueue = (QueueInterface<WorkerTriggerResult>) applicationContext.getBean(
|
||||
QueueInterface.class,
|
||||
Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)
|
||||
);
|
||||
}
|
||||
|
||||
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerTriggerResult, DeserializationException>> consumer) {
|
||||
return workerTriggerResultQueue.receive(consumerGroup, queueType, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pause() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user