feat(core): zombie worker task detection & automatic resubmission (#2081)

close kestra-io/kestra#2055
This commit is contained in:
YannC
2023-09-28 14:36:20 +02:00
committed by GitHub
parent 44a8da8a41
commit c9e6145fc2
51 changed files with 1252 additions and 80 deletions

View File

@@ -46,7 +46,6 @@ jackson:
endpoints: endpoints:
all: all:
port: 8081
enabled: true enabled: true
sensitive: false sensitive: false
health: health:
@@ -127,6 +126,12 @@ kestra:
flowtopologies: flowtopologies:
table: "flow_topologies" table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"
queues: queues:
min-poll-interval: 25ms min-poll-interval: 25ms
@@ -171,3 +176,7 @@ kestra:
uri: https://api.kestra.io/v1/reports/usages uri: https://api.kestra.io/v1/reports/usages
initial-delay: 5m initial-delay: 5m
fixed-delay: 1h fixed-delay: 1h
heartbeat:
frequency: 10s
heartbeat-missed: 3

View File

@@ -1,10 +1,7 @@
package io.kestra.core.queues; package io.kestra.core.queues;
import io.kestra.core.models.Setting; import io.kestra.core.models.Setting;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.*;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template; import io.kestra.core.models.templates.Template;
import io.kestra.core.models.topologies.FlowTopology; import io.kestra.core.models.topologies.FlowTopology;

View File

@@ -11,4 +11,6 @@ public interface WorkerJobQueueInterface extends Closeable {
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer); Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer);
void pause(); void pause();
void cleanup();
} }

View File

@@ -0,0 +1,16 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.WorkerInstance;
import java.util.List;
import java.util.Optional;
public interface WorkerInstanceRepositoryInterface {
Optional<WorkerInstance> findByWorkerUuid(String workerUuid);
List<WorkerInstance> findAll();
void delete(WorkerInstance workerInstance);
WorkerInstance save(WorkerInstance workerInstance);
}

View File

@@ -0,0 +1,13 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.WorkerJobRunning;
import java.util.Optional;
public interface WorkerJobRunningRepositoryInterface {
Optional<WorkerJobRunning> findByTaskRunId(String taskRunId);
void deleteByTaskRunId(String taskRunId);
}

View File

@@ -1,22 +1,23 @@
package io.kestra.core.runners; package io.kestra.core.runners;
import io.micronaut.context.ApplicationContext;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.QueueInterface;
import io.kestra.core.schedulers.AbstractScheduler; import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.utils.ExecutorsUtils; import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.context.ApplicationContext;
import java.io.Closeable;
import java.io.IOException;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Named; import jakarta.inject.Named;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@Slf4j @Slf4j
public class StandAloneRunner implements RunnerInterface, Closeable { public class StandAloneRunner implements RunnerInterface, AutoCloseable {
@Setter private java.util.concurrent.ExecutorService poolExecutor; @Setter private java.util.concurrent.ExecutorService poolExecutor;
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors()); @Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
@Setter protected boolean schedulerEnabled = true; @Setter protected boolean schedulerEnabled = true;
@@ -39,6 +40,8 @@ public class StandAloneRunner implements RunnerInterface, Closeable {
@Inject @Inject
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
private final List<AutoCloseable> servers = new ArrayList<>();
private boolean running = false; private boolean running = false;
@Override @Override
@@ -52,13 +55,18 @@ public class StandAloneRunner implements RunnerInterface, Closeable {
Worker worker = new Worker(applicationContext, workerThread, null); Worker worker = new Worker(applicationContext, workerThread, null);
applicationContext.registerSingleton(worker); applicationContext.registerSingleton(worker);
poolExecutor.execute(worker); poolExecutor.execute(worker);
servers.add(worker);
if (schedulerEnabled) { if (schedulerEnabled) {
poolExecutor.execute(applicationContext.getBean(AbstractScheduler.class)); AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
poolExecutor.execute(scheduler);
servers.add(scheduler);
} }
if (applicationContext.containsBean(IndexerInterface.class)) { if (applicationContext.containsBean(IndexerInterface.class)) {
poolExecutor.execute(applicationContext.getBean(IndexerInterface.class)); IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
poolExecutor.execute(indexer);
servers.add(indexer);
} }
} }
@@ -67,7 +75,8 @@ public class StandAloneRunner implements RunnerInterface, Closeable {
} }
@Override @Override
public void close() throws IOException { public void close() throws Exception {
this.servers.forEach(throwConsumer(AutoCloseable::close));
this.poolExecutor.shutdown(); this.poolExecutor.shutdown();
this.executionQueue.close(); this.executionQueue.close();
this.workerTaskQueue.close(); this.workerTaskQueue.close();

View File

@@ -44,6 +44,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@@ -565,6 +566,8 @@ public class Worker implements Runnable, AutoCloseable {
"worker-shutdown" "worker-shutdown"
).start(); ).start();
AtomicBoolean cleanShutdown = new AtomicBoolean(false);
Await.until( Await.until(
() -> { () -> {
if (this.executors.isTerminated() && this.workerThreadReferences.isEmpty()) { if (this.executors.isTerminated() && this.workerThreadReferences.isEmpty()) {
@@ -577,6 +580,7 @@ public class Worker implements Runnable, AutoCloseable {
log.error("Failed to close the workerTaskResultQueue", e); log.error("Failed to close the workerTaskResultQueue", e);
} }
cleanShutdown.set(true);;
return true; return true;
} }
@@ -590,6 +594,10 @@ public class Worker implements Runnable, AutoCloseable {
Duration.ofSeconds(1) Duration.ofSeconds(1)
); );
if (cleanShutdown.get()) {
workerJobQueue.cleanup();
}
workerJobQueue.close(); workerJobQueue.close();
executionKilledQueue.close(); executionKilledQueue.close();
workerTaskResultQueue.close(); workerTaskResultQueue.close();

View File

@@ -1,17 +1,20 @@
package io.kestra.core.runners; package io.kestra.core.runners;
import lombok.Builder; import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data; import lombok.*;
import lombok.ToString; import lombok.experimental.SuperBuilder;
import javax.validation.constraints.NotNull;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import javax.validation.constraints.NotNull;
@Data @Data
@Builder @SuperBuilder(toBuilder = true)
@ToString @ToString
@NoArgsConstructor
@Getter
public class WorkerInstance { public class WorkerInstance {
@NotNull @NotNull
private UUID workerUuid; private UUID workerUuid;
@@ -25,4 +28,16 @@ public class WorkerInstance {
@Builder.Default @Builder.Default
private List<Integer> partitions = new ArrayList<>(); private List<Integer> partitions = new ArrayList<>();
@Builder.Default
@JsonInclude
private Status status = Status.UP;
@Builder.Default
private Instant heartbeatDate = Instant.now();
public enum Status {
UP, DEAD
}
} }

View File

@@ -12,4 +12,6 @@ public abstract class WorkerJob {
abstract public String getType(); abstract public String getType();
abstract public String uid(); abstract public String uid();
abstract public String taskRunId();
} }

View File

@@ -41,4 +41,9 @@ public class WorkerTask extends WorkerJob {
public String uid() { public String uid() {
return this.taskRun.getTaskId(); return this.taskRun.getTaskId();
} }
@Override
public String taskRunId() {
return this.taskRun.getId();
}
} }

View File

@@ -31,4 +31,9 @@ public class WorkerTrigger extends WorkerJob {
public String uid() { public String uid() {
return triggerContext.uid(); return triggerContext.uid();
} }
@Override
public String taskRunId() {
return triggerContext.uid();
}
} }

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerContext;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;

View File

@@ -11,6 +11,8 @@ import java.util.concurrent.TimeUnit;
public class TestMethodScopedWorker extends Worker { public class TestMethodScopedWorker extends Worker {
public TestMethodScopedWorker(ApplicationContext applicationContext, int thread, String workerGroupKey) { public TestMethodScopedWorker(ApplicationContext applicationContext, int thread, String workerGroupKey) {
super(applicationContext, thread, workerGroupKey); super(applicationContext, thread, workerGroupKey);
applicationContext.registerSingleton(this);
} }
/** /**

View File

@@ -0,0 +1,16 @@
package io.kestra.repository.h2;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2WorkerInstanceRepository extends AbstractJdbcWorkerInstanceRepository {
@Inject
public H2WorkerInstanceRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(WorkerInstance.class, applicationContext));
}
}

View File

@@ -0,0 +1,16 @@
package io.kestra.repository.h2;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@H2RepositoryEnabled
public class H2WorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public H2WorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(WorkerJobRunning.class, applicationContext));
}
}

View File

@@ -1,39 +1,40 @@
package io.kestra.runner.h2; package io.kestra.runner.h2;
import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob; import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either; import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers; import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer; import java.util.function.Consumer;
@Slf4j
public class H2WorkerJobQueue implements WorkerJobQueueInterface { public class H2WorkerJobQueue implements WorkerJobQueueInterface {
QueueInterface<WorkerJob> workerTaskQueue; private final JdbcWorkerJobQueueService jdbcworkerjobQueueService;
@SuppressWarnings("unchecked")
public H2WorkerJobQueue(ApplicationContext applicationContext) { public H2WorkerJobQueue(ApplicationContext applicationContext) {
this.workerTaskQueue = (QueueInterface<WorkerJob>) applicationContext.getBean( this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED)
);
} }
@Override @Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) { public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return workerTaskQueue.receive(consumerGroup, queueType, consumer); return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer);
} }
@Override @Override
public void pause() { public void pause() {
jdbcworkerjobQueueService.pause();
}
@Override
public void cleanup() {
jdbcworkerjobQueueService.cleanup();
} }
@Override @Override
public void close() { public void close() {
jdbcworkerjobQueueService.close();
} }
} }

View File

@@ -0,0 +1,22 @@
/* ----------------------- workerInstance ----------------------- */
CREATE TABLE IF NOT EXISTS worker_instance (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"worker_uuid" VARCHAR(36) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.workerUuid')),
"hostname" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.hostname')),
"port" INT GENERATED ALWAYS AS (JQ_INTEGER("value",'.port')),
"management_port" INT GENERATED ALWAYS AS (JQ_INTEGER("value",'.managementPort')),
"worker_group" VARCHAR(150) GENERATED ALWAYS AS (JQ_STRING("value",'.workerGroup')),
"status" VARCHAR(10) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.status')),
"heartbeat_date" TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSEDATETIME(JQ_STRING("value", '.heartbeatDate'), 'yyyy-MM-dd''T''HH:mm:ss.SSSXXX'))
);
/* ----------------------- worker_job_running ----------------------- */
CREATE TABLE IF NOT EXISTS worker_job_running (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"worker_uuid" VARCHAR(36) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.workerInstance.workerUuid')),
"taskrun_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value",'.taskRun.id'))
);
CREATE INDEX IF NOT EXISTS worker_job_running_worker_uuid ON worker_job_running ("worker_uuid");

View File

@@ -0,0 +1,6 @@
package io.kestra.repository.h2;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepositoryTest;
public class H2WorkerInstanceRepositoryTest extends AbstractJdbcWorkerInstanceRepositoryTest {
}

View File

@@ -15,6 +15,10 @@ flyway:
out-of-order: true out-of-order: true
kestra: kestra:
server-type: STANDALONE
heartbeat:
frequency: 10s
heartbeat-missed: 3
queue: queue:
type: h2 type: h2
repository: repository:
@@ -64,6 +68,12 @@ kestra:
flowtopologies: flowtopologies:
table: "flow_topologies" table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"
queues: queues:
min-poll-interval: 10ms min-poll-interval: 10ms

View File

@@ -0,0 +1,16 @@
package io.kestra.repository.mysql;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@MysqlRepositoryEnabled
public class MysqlWorkerInstanceRepository extends AbstractJdbcWorkerInstanceRepository {
@Inject
public MysqlWorkerInstanceRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(WorkerInstance.class, applicationContext));
}
}

View File

@@ -0,0 +1,16 @@
package io.kestra.repository.mysql;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@MysqlRepositoryEnabled
public class MysqlWorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public MysqlWorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(WorkerJobRunning.class, applicationContext));
}
}

View File

@@ -1,39 +1,40 @@
package io.kestra.runner.mysql; package io.kestra.runner.mysql;
import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob; import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either; import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers; import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer; import java.util.function.Consumer;
@Slf4j
public class MysqlWorkerJobQueue implements WorkerJobQueueInterface { public class MysqlWorkerJobQueue implements WorkerJobQueueInterface {
QueueInterface<WorkerJob> workerTaskQueue; private final JdbcWorkerJobQueueService jdbcworkerjobQueueService;
@SuppressWarnings("unchecked")
public MysqlWorkerJobQueue(ApplicationContext applicationContext) { public MysqlWorkerJobQueue(ApplicationContext applicationContext) {
this.workerTaskQueue = (QueueInterface<WorkerJob>) applicationContext.getBean( this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED)
);
} }
@Override @Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) { public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return workerTaskQueue.receive(consumerGroup, queueType, consumer); return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer);
} }
@Override @Override
public void pause() { public void pause() {
jdbcworkerjobQueueService.pause();
}
@Override
public void cleanup() {
jdbcworkerjobQueueService.cleanup();
} }
@Override @Override
public void close() { public void close() {
jdbcworkerjobQueueService.close();
} }
} }

View File

@@ -0,0 +1,21 @@
/* ----------------------- workerInstance ----------------------- */
CREATE TABLE IF NOT EXISTS worker_instance (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`worker_uuid` VARCHAR(36) GENERATED ALWAYS AS (value ->> '$.workerUuid') STORED NOT NULL,
`hostname` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.hostname') STORED NOT NULL,
`port` INT UNSIGNED GENERATED ALWAYS AS (value ->> '$.port') STORED,
`management_port` INT UNSIGNED GENERATED ALWAYS AS (value ->> '$.managementPort') STORED,
`worker_group` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.workerGroup') STORED,
`status` VARCHAR(10)GENERATED ALWAYS AS (value ->> '$.status') STORED NOT NULL,
`heartbeat_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.heartbeatDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
/* ----------------------- worker_job_running ----------------------- */
CREATE TABLE IF NOT EXISTS worker_job_running (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`worker_uuid` VARCHAR(36) GENERATED ALWAYS AS (value ->> '$.workerInstance.workerUuid') STORED NOT NULL,
`taskrun_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.taskRun.id') STORED NOT NULL,
INDEX ix_worker_uuid (worker_uuid)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@@ -0,0 +1,7 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepositoryTest;
public class MysqlWorkerInstanceRepositoryTest extends AbstractJdbcWorkerInstanceRepositoryTest {
}

View File

@@ -16,6 +16,7 @@ flyway:
out-of-order: true out-of-order: true
kestra: kestra:
server-type: STANDALONE
queue: queue:
type: mysql type: mysql
repository: repository:
@@ -24,6 +25,9 @@ kestra:
type: local type: local
local: local:
base-path: /tmp/unittest base-path: /tmp/unittest
heartbeat:
frequency: 10s
heartbeat-missed: 3
jdbc: jdbc:
tables: tables:
@@ -65,6 +69,12 @@ kestra:
flowtopologies: flowtopologies:
table: "flow_topologies" table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"
queues: queues:
min-poll-interval: 10ms min-poll-interval: 10ms

View File

@@ -0,0 +1,16 @@
package io.kestra.repository.postgres;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresWorkerInstanceRepository extends AbstractJdbcWorkerInstanceRepository {
@Inject
public PostgresWorkerInstanceRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(WorkerInstance.class, applicationContext));
}
}

View File

@@ -0,0 +1,16 @@
package io.kestra.repository.postgres;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@PostgresRepositoryEnabled
public class PostgresWorkerJobRunningRepository extends AbstractJdbcWorkerJobRunningRepository {
@Inject
public PostgresWorkerJobRunningRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(WorkerJobRunning.class, applicationContext));
}
}

View File

@@ -1,39 +1,40 @@
package io.kestra.runner.postgres; package io.kestra.runner.postgres;
import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.WorkerJobQueueInterface; import io.kestra.core.queues.WorkerJobQueueInterface;
import io.kestra.core.runners.WorkerJob; import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either; import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcWorkerJobQueueService;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers; import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer; import java.util.function.Consumer;
@Slf4j
public class PostgresWorkerJobQueue implements WorkerJobQueueInterface { public class PostgresWorkerJobQueue implements WorkerJobQueueInterface {
QueueInterface<WorkerJob> workerTaskQueue; private final JdbcWorkerJobQueueService jdbcworkerjobQueueService;
@SuppressWarnings("unchecked")
public PostgresWorkerJobQueue(ApplicationContext applicationContext) { public PostgresWorkerJobQueue(ApplicationContext applicationContext) {
this.workerTaskQueue = (QueueInterface<WorkerJob>) applicationContext.getBean( this.jdbcworkerjobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED)
);
} }
@Override @Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) { public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
return workerTaskQueue.receive(consumerGroup, queueType, consumer); return jdbcworkerjobQueueService.receive(consumerGroup, queueType, consumer);
} }
@Override @Override
public void pause() { public void pause() {
jdbcworkerjobQueueService.pause();
}
@Override
public void cleanup() {
jdbcworkerjobQueueService.cleanup();
} }
@Override @Override
public void close() { public void close() {
jdbcworkerjobQueueService.close();
} }
} }

View File

@@ -0,0 +1,22 @@
/* ----------------------- workerInstance ----------------------- */
CREATE TABLE IF NOT EXISTS worker_instance (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
worker_uuid VARCHAR(36) NOT NULL GENERATED ALWAYS AS (value ->> 'workerUuid') STORED,
hostname VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'hostname') STORED,
port INTEGER GENERATED ALWAYS AS (CAST(value ->> 'port' AS INTEGER)) STORED,
management_port INTEGER GENERATED ALWAYS AS (CAST(value ->> 'managementPort'AS INTEGER)) STORED,
worker_group VARCHAR(150) GENERATED ALWAYS AS (value ->> 'workerGroup') STORED,
status VARCHAR(10) NOT NULL GENERATED ALWAYS AS (value ->> 'status') STORED,
heartbeat_date TIMESTAMPTZ NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'heartbeatDate')) STORED
);
/* ----------------------- worker_job_running ----------------------- */
CREATE TABLE IF NOT EXISTS worker_job_running (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
worker_uuid VARCHAR(36) NOT NULL GENERATED ALWAYS AS (value -> 'workerInstance' ->> 'workerUuid') STORED,
taskrun_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value -> 'taskRun' ->> 'id') STORED
);
CREATE INDEX IF NOT EXISTS worker_job_running_worker_uuid ON worker_job_running (worker_uuid);

View File

@@ -0,0 +1,7 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepositoryTest;
public class PostgresWorkerInstanceRepositoryTest extends AbstractJdbcWorkerInstanceRepositoryTest {
}

View File

@@ -17,6 +17,7 @@ flyway:
out-of-order: true out-of-order: true
kestra: kestra:
server-type: STANDALONE
queue: queue:
type: postgres type: postgres
repository: repository:
@@ -25,6 +26,9 @@ kestra:
type: local type: local
local: local:
base-path: /tmp/unittest base-path: /tmp/unittest
heartbeat:
frequency: 10s
heartbeat-missed: 3
jdbc: jdbc:
tables: tables:
@@ -66,6 +70,12 @@ kestra:
flowtopologies: flowtopologies:
table: "flow_topologies" table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"
queues: queues:
min-poll-interval: 10ms min-poll-interval: 10ms

View File

@@ -0,0 +1,106 @@
package io.kestra.jdbc;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.WorkerInstanceRepositoryInterface;
import io.kestra.core.runners.*;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcHeartbeat;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Singleton
@Slf4j
public class JdbcWorkerJobQueueService {
private final JdbcQueue<WorkerJob> workerTaskQueue;
private final JdbcHeartbeat jdbcHeartbeat;
private final AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
private final WorkerInstanceRepositoryInterface workerInstanceRepository;
private Runnable queueStop;
@SuppressWarnings("unchecked")
public JdbcWorkerJobQueueService(ApplicationContext applicationContext) {
this.workerTaskQueue = (JdbcQueue<WorkerJob>) applicationContext.getBean(
QueueInterface.class,
Qualifiers.byName(QueueFactoryInterface.WORKERJOB_NAMED)
);
this.jdbcHeartbeat = applicationContext.getBean(JdbcHeartbeat.class);
this.jdbcWorkerJobRunningRepository = applicationContext.getBean(AbstractJdbcWorkerJobRunningRepository.class);
this.workerInstanceRepository = applicationContext.getBean(WorkerInstanceRepositoryInterface.class);
}
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<WorkerJob, DeserializationException>> consumer) {
this.queueStop = workerTaskQueue.receiveTransaction(consumerGroup, queueType, (dslContext, eithers) -> {
WorkerInstance workerInstance = jdbcHeartbeat.get();
eithers.forEach(either -> {
if (either.isRight()) {
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
return;
}
WorkerJob workerJob = either.getLeft();
WorkerJobRunning workerJobRunning;
if (workerJob instanceof WorkerTask workerTask) {
workerJobRunning = WorkerTaskRunning.of(
workerTask,
workerInstance,
0
);
} else if (workerJob instanceof WorkerTrigger workerTrigger) {
workerJobRunning = WorkerTriggerRunning.of(
workerTrigger,
workerInstance,
0
);
} else {
throw new IllegalArgumentException("Message is of type " + workerJob.getClass() + " which should never occurs");
}
jdbcWorkerJobRunningRepository.save(workerJobRunning, dslContext);
if (log.isTraceEnabled()) {
log.trace("Sending a workerJobRunning: {}", workerJobRunning);
}
});
eithers.forEach(consumer);
});
return this.queueStop;
}
public void pause() {
this.stopQueue();
}
private void stopQueue() {
synchronized (this) {
if (this.queueStop != null) {
this.queueStop.run();
this.queueStop = null;
}
}
}
public void cleanup() {
if (jdbcHeartbeat.get() != null) {
synchronized (this) {
workerInstanceRepository.delete(jdbcHeartbeat.get());
}
}
}
public void close() {
this.stopQueue();
}
}

View File

@@ -0,0 +1,166 @@
package io.kestra.jdbc.repository;
import io.kestra.core.repositories.WorkerInstanceRepositoryInterface;
import io.kestra.core.runners.WorkerInstance;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jooq.*;
import org.jooq.impl.DSL;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
@Singleton
@Getter
@Slf4j
public abstract class AbstractJdbcWorkerInstanceRepository extends AbstractJdbcRepository implements WorkerInstanceRepositoryInterface {
protected io.kestra.jdbc.AbstractJdbcRepository<WorkerInstance> jdbcRepository;
public AbstractJdbcWorkerInstanceRepository(io.kestra.jdbc.AbstractJdbcRepository<WorkerInstance> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}
@Value("${kestra.heartbeat.frequency}")
private Duration frequency;
@Value("${kestra.heartbeat.heartbeat-missed}")
private Integer nbMissed;
@Override
public Optional<WorkerInstance> findByWorkerUuid(String workerUuid) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = this.heartbeatSelectQuery(DSL.using(configuration), workerUuid);
return this.jdbcRepository.fetchOne(select);
});
}
public Optional<WorkerInstance> heartbeatCheckUp(String workerUuid) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectForUpdateOfStep<Record1<Object>> select =
this.heartbeatSelectQuery(DSL.using(configuration), workerUuid)
.forUpdate();
Optional<WorkerInstance> workerInstance = this.jdbcRepository.fetchOne(select);
if (workerInstance.isPresent()) {
WorkerInstance updatedWorker = workerInstance.get().toBuilder().status(WorkerInstance.Status.UP).heartbeatDate(Instant.now()).build();
this.save(updatedWorker);
return Optional.of(updatedWorker);
}
return Optional.empty();
});
}
public void heartbeatStatusUpdate(String workerUuid, DSLContext context) {
SelectForUpdateOfStep<Record1<Object>> select =
this.heartbeatSelectQuery(context, workerUuid)
.and(field("status").eq(WorkerInstance.Status.UP.toString()))
// We consider a heartbeat dead if it's older than heartbeat missed times the frequency
.and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(getNbMissed() * getFrequency().getSeconds())))
.forUpdate();
Optional<WorkerInstance> workerInstance = this.jdbcRepository.fetchOne(select);
workerInstance.ifPresent(heartbeat -> {
heartbeat.setStatus(WorkerInstance.Status.DEAD);
log.warn("Detected evicted worker: {}", heartbeat);
this.jdbcRepository.persist(heartbeat, context, this.jdbcRepository.persistFields(heartbeat));
});
}
public void heartbeatsStatusUpdate(DSLContext context) {
this.findAllAlive(context).forEach(heartbeat -> {
this.heartbeatStatusUpdate(heartbeat.getWorkerUuid().toString(), context);
}
);
}
public void lockedWorkersUpdate(Function<DSLContext, Void> function) {
this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
// Update all workers status
heartbeatsStatusUpdate(context);
function.apply(context);
return null;
});
}
public List<WorkerInstance> findAll(DSLContext context) {
return this.jdbcRepository.fetch(this.heartbeatSelectAllQuery(context));
}
public List<WorkerInstance> findAll() {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
return this.jdbcRepository.fetch(this.heartbeatSelectAllQuery(context));
});
}
public List<WorkerInstance> findAllAlive(DSLContext context) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> this.jdbcRepository.fetch(
this.heartbeatSelectAllQuery(context)
.where(field("status").eq(WorkerInstance.Status.UP.toString()))
));
}
public List<WorkerInstance> findAllToDelete(DSLContext context) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> this.jdbcRepository.fetch(
this.heartbeatSelectAllQuery(context)
.where(field("status").eq(WorkerInstance.Status.DEAD.toString()))
.and(field("heartbeat_date").lessThan(Instant.now().minusSeconds(2 * getNbMissed() * getFrequency().getSeconds())))
));
}
public void delete(DSLContext context, WorkerInstance workerInstance) {
this.jdbcRepository.delete(context, workerInstance);
}
@Override
public void delete(WorkerInstance workerInstance) {
this.jdbcRepository.delete(workerInstance);
}
@Override
public WorkerInstance save(WorkerInstance workerInstance) {
this.jdbcRepository.persist(workerInstance, this.jdbcRepository.persistFields(workerInstance));
return workerInstance;
}
private SelectConditionStep<Record1<Object>> heartbeatSelectQuery(DSLContext context, String workerUuid) {
return context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(
field("worker_uuid").eq(workerUuid)
);
}
private SelectJoinStep<Record1<Object>> heartbeatSelectAllQuery(DSLContext dsl) {
return dsl.select(field("value"))
.from(this.jdbcRepository.getTable());
}
}

View File

@@ -0,0 +1,63 @@
package io.kestra.jdbc.repository;
import io.kestra.core.repositories.WorkerJobRunningRepositoryInterface;
import io.kestra.core.runners.WorkerJobRunning;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import java.util.List;
import java.util.Optional;
@Singleton
@Slf4j
public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdbcRepository implements WorkerJobRunningRepositoryInterface {
protected io.kestra.jdbc.AbstractJdbcRepository<WorkerJobRunning> jdbcRepository;
public AbstractJdbcWorkerJobRunningRepository(io.kestra.jdbc.AbstractJdbcRepository<WorkerJobRunning> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}
public WorkerJobRunning save(WorkerJobRunning workerJobRunning, DSLContext context) {
this.jdbcRepository.persist(workerJobRunning, context, this.jdbcRepository.persistFields(workerJobRunning));
return workerJobRunning;
}
@Override
public void deleteByTaskRunId(String taskRunId) {
Optional<WorkerJobRunning> workerJobRunning = this.findByTaskRunId(taskRunId);
workerJobRunning.ifPresent(jobRunning -> this.jdbcRepository.delete(jobRunning));
}
@Override
public Optional<WorkerJobRunning> findByTaskRunId(String taskRunId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select((field("value")))
.from(this.jdbcRepository.getTable())
.where(
field("taskrun_id").eq(taskRunId)
);
return this.jdbcRepository.fetchOne(select);
});
}
public List<WorkerJobRunning> getWorkerJobWithWorkerDead(DSLContext context, List<String> workersToDelete) {
return context
.select(field("value"))
.from(this.jdbcRepository.getTable())
.where(field("worker_uuid").in(workersToDelete))
.forUpdate()
.fetch()
.map(r -> this.jdbcRepository.deserialize(r.get("value").toString())
);
}
}

View File

@@ -14,9 +14,9 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.*;
import io.kestra.core.runners.Executor; import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorService; import io.kestra.core.runners.ExecutorService;
import io.kestra.core.runners.*;
import io.kestra.core.services.*; import io.kestra.core.services.*;
import io.kestra.core.tasks.flows.Template; import io.kestra.core.tasks.flows.Template;
import io.kestra.core.topologies.FlowTopologyService; import io.kestra.core.topologies.FlowTopologyService;
@@ -24,7 +24,10 @@ import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either; import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository; import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository; import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException; import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Named; import jakarta.inject.Named;
@@ -32,6 +35,7 @@ import jakarta.inject.Singleton;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.jooq.DSLContext;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import java.io.IOException; import java.io.IOException;
@@ -48,6 +52,8 @@ import java.util.stream.Stream;
public class JdbcExecutor implements ExecutorInterface { public class JdbcExecutor implements ExecutorInterface {
private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService schedulerHeartbeat = Executors.newSingleThreadScheduledExecutor();
private Boolean isShutdown = false; private Boolean isShutdown = false;
@Inject @Inject
@@ -120,6 +126,9 @@ public class JdbcExecutor implements ExecutorInterface {
@Inject @Inject
private AbstractJdbcFlowTopologyRepository flowTopologyRepository; private AbstractJdbcFlowTopologyRepository flowTopologyRepository;
@Inject
private AbstractJdbcWorkerInstanceRepository workerInstanceRepository;
protected List<Flow> allFlows; protected List<Flow> allFlows;
@Inject @Inject
@@ -132,6 +141,12 @@ public class JdbcExecutor implements ExecutorInterface {
@Inject @Inject
private SkipExecutionService skipExecutionService; private SkipExecutionService skipExecutionService;
@Inject
private AbstractJdbcWorkerJobRunningRepository workerJobRunningRepository;
@Value("${kestra.heartbeat.frequency}")
private Duration frequency;
@SneakyThrows @SneakyThrows
@Override @Override
public void run() { public void run() {
@@ -152,6 +167,13 @@ public class JdbcExecutor implements ExecutorInterface {
TimeUnit.SECONDS TimeUnit.SECONDS
); );
schedulerHeartbeat.scheduleAtFixedRate(
this::workersUpdate,
frequency.toSeconds(),
frequency.toSeconds(),
TimeUnit.SECONDS
);
// look at exception on the main thread // look at exception on the main thread
Thread schedulerDelayThread = new Thread( Thread schedulerDelayThread = new Thread(
() -> { () -> {
@@ -196,6 +218,57 @@ public class JdbcExecutor implements ExecutorInterface {
); );
} }
); );
}
protected void workersUpdate() {
workerInstanceRepository.lockedWorkersUpdate(context -> {
List<WorkerInstance> workersToDelete = workerInstanceRepository
.findAllToDelete(context);
List<String> workersToDeleteUuids = workersToDelete.stream().map(worker -> worker.getWorkerUuid().toString()).collect(Collectors.toList());
// Before deleting a worker, we resubmit all his tasks
workerJobRunningRepository.getWorkerJobWithWorkerDead(context, workersToDeleteUuids)
.forEach(workerJobRunning -> {
if (workerJobRunning instanceof WorkerTaskRunning workerTaskRunning) {
workerTaskQueue.emit(WorkerTask.builder()
.taskRun(workerTaskRunning.getTaskRun())
.task(workerTaskRunning.getTask())
.runContext(workerTaskRunning.getRunContext())
.build()
);
log.warn(
"[namespace: {}] [flow: {}] [execution: {}] [taskrun: {}] WorkerTask is being resend",
workerTaskRunning.getTaskRun().getNamespace(),
workerTaskRunning.getTaskRun().getFlowId(),
workerTaskRunning.getTaskRun().getExecutionId(),
workerTaskRunning.getTaskRun().getId()
);
} else if (workerJobRunning instanceof WorkerTriggerRunning workerTriggerRunning) {
workerTaskQueue.emit(WorkerTrigger.builder()
.trigger(workerTriggerRunning.getTrigger())
.conditionContext(workerTriggerRunning.getConditionContext())
.triggerContext(workerTriggerRunning.getTriggerContext())
.build());
log.warn(
"[namespace: {}] [flow: {}] [trigger: {}] WorkerTrigger is being resend",
workerTriggerRunning.getTriggerContext().getNamespace(),
workerTriggerRunning.getTriggerContext().getFlowId(),
workerTriggerRunning.getTriggerContext().getTriggerId()
);
} else {
throw new IllegalArgumentException("Object is of type " + workerJobRunning.getClass() + " which should never occurs");
}
});
workersToDelete.forEach(worker -> {
workerInstanceRepository.delete(context, worker);
});
return null;
});
} }
private void executionQueue(Either<Execution, DeserializationException> either) { private void executionQueue(Either<Execution, DeserializationException> either) {
@@ -303,7 +376,7 @@ public class JdbcExecutor implements ExecutorInterface {
// multiple condition // multiple condition
if ( if (
conditionService.isTerminatedWithListeners(flow, execution) && conditionService.isTerminatedWithListeners(flow, execution) &&
this.deduplicateFlowTrigger(execution, executorState) this.deduplicateFlowTrigger(execution, executorState)
) { ) {
flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage)) flowTriggerService.computeExecutionsFromFlowTriggers(execution, allFlows, Optional.of(multipleConditionStorage))
.forEach(this.executionQueue::emit); .forEach(this.executionQueue::emit);
@@ -336,7 +409,6 @@ public class JdbcExecutor implements ExecutorInterface {
} }
} }
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) { private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
if (either.isRight()) { if (either.isRight()) {
log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage()); log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage());
@@ -362,6 +434,9 @@ public class JdbcExecutor implements ExecutorInterface {
metricRegistry metricRegistry
.timer(MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message)) .timer(MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message))
.record(message.getTaskRun().getState().getDuration()); .record(message.getTaskRun().getState().getDuration());
log.trace("TaskRun terminated: {}", message.getTaskRun());
workerJobRunningRepository.deleteByTaskRunId(message.getTaskRun().getId());
} }
Executor executor = executionRepository.lock(message.getTaskRun().getExecutionId(), pair -> { Executor executor = executionRepository.lock(message.getTaskRun().getExecutionId(), pair -> {
@@ -448,7 +523,7 @@ public class JdbcExecutor implements ExecutorInterface {
(namespace, id) -> templateExecutorInterface.get().findById(namespace, id).orElse(null) (namespace, id) -> templateExecutorInterface.get().findById(namespace, id).orElse(null)
); );
} catch (InternalException e) { } catch (InternalException e) {
log.warn("Failed to inject template", e); log.warn("Failed to inject template", e);
} }
} }
@@ -562,6 +637,7 @@ public class JdbcExecutor implements ExecutorInterface {
public void close() throws IOException { public void close() throws IOException {
isShutdown = true; isShutdown = true;
schedulerDelay.shutdown(); schedulerDelay.shutdown();
schedulerHeartbeat.shutdown();
executionQueue.close(); executionQueue.close();
workerTaskQueue.close(); workerTaskQueue.close();
workerTaskResultQueue.close(); workerTaskResultQueue.close();

View File

@@ -0,0 +1,90 @@
package io.kestra.jdbc.runner;
import io.kestra.core.runners.Worker;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.UUID;
@Singleton
@JdbcRunnerEnabled
@Requires(property = "kestra.server-type", pattern = "(WORKER|STANDALONE)")
@Slf4j
public class JdbcHeartbeat {
private static final String HOSTNAME;
static {
try {
HOSTNAME = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
@Inject
AbstractJdbcWorkerInstanceRepository workerInstanceRepository;
private volatile WorkerInstance workerInstance;
private final ApplicationContext applicationContext;
public JdbcHeartbeat(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
private void registerWorkerInstance(Worker worker) {
synchronized (this) {
if (workerInstance == null) {
this.workerInstance = WorkerInstance.builder()
.workerUuid(UUID.randomUUID())
.hostname(HOSTNAME)
.port(applicationContext.getEnvironment().getProperty("micronaut.server.port", Integer.class).orElse(8080))
.managementPort(applicationContext.getEnvironment().getProperty("endpoints.all.port", Integer.class).orElse(8081))
.workerGroup(worker.getWorkerGroup())
.build();
if (log.isDebugEnabled()) {
log.debug("Registered WorkerInstance of: {}", workerInstance.getWorkerUuid());
}
this.workerInstanceRepository.save(workerInstance);
}
}
}
@Scheduled(fixedDelay = "${kestra.heartbeat.frequency}")
public void updateHeartbeat() {
if (applicationContext.containsBean(Worker.class) && !applicationContext.getEnvironment().getActiveNames().contains(Environment.TEST)) {
if (workerInstance == null) {
registerWorkerInstance(applicationContext.getBean(Worker.class));
}
if (log.isTraceEnabled()) {
log.trace("Heartbeat of: {}", workerInstance.getWorkerUuid());
}
if (workerInstanceRepository.heartbeatCheckUp(workerInstance.getWorkerUuid().toString()).isEmpty()) {
log.error("heartbeatCheckUp failed, unable to find current instance '{}', Shutting down now!", workerInstance.getWorkerUuid());
Runtime.getRuntime().exit(1);
}
}
}
public WorkerInstance get() {
if (workerInstance == null) {
registerWorkerInstance(applicationContext.getBean(Worker.class));
}
return workerInstance;
}
}

View File

@@ -11,8 +11,8 @@ import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.Either; import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils; import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.JdbcConfiguration; import io.kestra.jdbc.JdbcConfiguration;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository; import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.ConfigurationProperties; import io.micronaut.context.annotation.ConfigurationProperties;
@@ -20,10 +20,11 @@ import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import lombok.Getter; import lombok.Getter;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jooq.*;
import org.jooq.Record; import org.jooq.Record;
import org.jooq.*;
import org.jooq.impl.DSL; import org.jooq.impl.DSL;
import javax.sql.DataSource;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
@@ -35,9 +36,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import javax.sql.DataSource;
@Slf4j @Slf4j
public abstract class JdbcQueue<T> implements QueueInterface<T> { public abstract class JdbcQueue<T> implements QueueInterface<T> {
@@ -45,7 +46,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
private static ExecutorService poolExecutor; private static ExecutorService poolExecutor;
private final QueueService queueService; protected final QueueService queueService;
protected final Class<T> cls; protected final Class<T> cls;
@@ -183,6 +184,31 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
@Override @Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer) { public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer) {
return this.receiveImpl(
consumerGroup,
queueType,
(dslContext, eithers) -> {
eithers.forEach(consumer);
},
false
);
}
public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer) {
return this.receiveImpl(
consumerGroup,
queueType,
consumer,
true
);
}
public Runnable receiveImpl(
String consumerGroup,
Class<?> queueType,
BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer,
Boolean inTransaction
) {
String queueName = queueName(queueType); String queueName = queueName(queueType);
return this.poll(() -> { return this.poll(() -> {
@@ -192,6 +218,9 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
Result<Record> result = this.receiveFetch(ctx, consumerGroup, queueName); Result<Record> result = this.receiveFetch(ctx, consumerGroup, queueName);
if (!result.isEmpty()) { if (!result.isEmpty()) {
if (inTransaction) {
consumer.accept(ctx, this.map(result));
}
this.updateGroupOffsets( this.updateGroupOffsets(
ctx, ctx,
@@ -204,13 +233,15 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
return result; return result;
}); });
this.send(fetch, consumer); if (!inTransaction) {
consumer.accept(null, this.map(fetch));
}
return fetch.size(); return fetch.size();
}); });
} }
private String queueName(Class<?> queueType) { protected String queueName(Class<?> queueType) {
return CaseFormat.UPPER_CAMEL.to( return CaseFormat.UPPER_CAMEL.to(
CaseFormat.LOWER_UNDERSCORE, CaseFormat.LOWER_UNDERSCORE,
queueType.getSimpleName() queueType.getSimpleName()
@@ -218,7 +249,7 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
} }
@SuppressWarnings("BusyWait") @SuppressWarnings("BusyWait")
private Runnable poll(Supplier<Integer> runnable) { protected Runnable poll(Supplier<Integer> runnable) {
AtomicBoolean running = new AtomicBoolean(true); AtomicBoolean running = new AtomicBoolean(true);
AtomicLong sleep = new AtomicLong(configuration.getMaxPollInterval().toMillis()); AtomicLong sleep = new AtomicLong(configuration.getMaxPollInterval().toMillis());
AtomicReference<ZonedDateTime> lastPoll = new AtomicReference<>(ZonedDateTime.now()); AtomicReference<ZonedDateTime> lastPoll = new AtomicReference<>(ZonedDateTime.now());
@@ -254,15 +285,19 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
}; };
} }
private void send(Result<Record> fetch, Consumer<Either<T, DeserializationException>> consumer) { protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
fetch return fetch
.map(record -> { .map(record -> {
try { try {
return Either.<T, DeserializationException>left(JacksonMapper.ofJson().readValue(record.get("value", String.class), cls)); return Either.left(JacksonMapper.ofJson().readValue(record.get("value", String.class), cls));
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
return Either.<T, DeserializationException>right(new DeserializationException(e, record.get("value", String.class))); return Either.right(new DeserializationException(e, record.get("value", String.class)));
} }
}) });
}
protected void send(Result<Record> fetch, Consumer<Either<T, DeserializationException>> consumer) {
this.map(fetch)
.forEach(consumer); .forEach(consumer);
} }

View File

@@ -0,0 +1,147 @@
package io.kestra.jdbc.repository;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
@MicronautTest(transactional = false)
public abstract class AbstractJdbcWorkerInstanceRepositoryTest {
@Inject
protected AbstractJdbcWorkerInstanceRepository workerInstanceRepository;
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
protected JooqDSLContextWrapper dslContextWrapper;
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
@Test
protected void save() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstanceRepository.save(workerInstance);
Optional<WorkerInstance> find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getWorkerUuid(), is(workerInstance.getWorkerUuid()));
}
@Test
protected void delete() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstanceRepository.save(workerInstance);
dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
Optional<WorkerInstance> find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getWorkerUuid(), is(workerInstance.getWorkerUuid()));
workerInstanceRepository.delete(context, workerInstance);
find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(false));
});
}
@Test
protected void findAll() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
WorkerInstance workerInstanceAlive = createWorkerInstance(UUID.randomUUID().toString());
WorkerInstance workerInstanceDead = createWorkerInstance(UUID.randomUUID().toString(), false);
workerInstanceRepository.save(workerInstance);
workerInstanceRepository.save(workerInstanceAlive);
workerInstanceRepository.save(workerInstanceDead);
dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
List<WorkerInstance> finds = workerInstanceRepository.findAll(context);
assertThat(finds.size(), is(3));
finds = workerInstanceRepository.findAllToDelete(context);
assertThat(finds.size(), is(1));
finds = workerInstanceRepository.findAllAlive(context);
assertThat(finds.size(), is(2));
});
}
@Test
protected void find() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstanceRepository.save(workerInstance);
Optional<WorkerInstance> find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getWorkerUuid(), is(workerInstance.getWorkerUuid()));
}
@Test
protected void heartbeatCheckup() throws InterruptedException {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstanceRepository.save(workerInstance);
CountDownLatch queueCount = new CountDownLatch(1);
queueCount.await(15, TimeUnit.SECONDS);
Optional<WorkerInstance> updatedWorkerInstance = workerInstanceRepository.heartbeatCheckUp(workerInstance.getWorkerUuid().toString());
assertThat(updatedWorkerInstance.isPresent(), is(true));
assertThat(updatedWorkerInstance.get().getHeartbeatDate(), greaterThan(workerInstance.getHeartbeatDate()));
}
@Test
protected void heartbeatsStatusUpdate() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstance.setHeartbeatDate(Instant.now().minusSeconds(3600));
workerInstanceRepository.save(workerInstance);
dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
workerInstanceRepository.heartbeatsStatusUpdate(context);
Optional<WorkerInstance> find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getStatus(), is(WorkerInstance.Status.DEAD));
});
}
private WorkerInstance createWorkerInstance(String workerUuid, Boolean alive) {
return WorkerInstance.builder()
.workerUuid(UUID.fromString(workerUuid))
.workerGroup(null)
.managementPort(0)
.hostname("kestra.io")
.partitions(null)
.port(0)
.status(alive ? WorkerInstance.Status.UP : WorkerInstance.Status.DEAD)
.heartbeatDate(alive ? Instant.now() : Instant.now().minusSeconds(3600))
.build();
}
private WorkerInstance createWorkerInstance(String workerUuid) {
return createWorkerInstance(workerUuid, true);
}
}

View File

@@ -32,6 +32,11 @@ public class MemoryWorkerJobQueue implements WorkerJobQueueInterface {
} }
@Override
public void cleanup() {
}
@Override @Override
public void close() { public void close() {

View File

@@ -0,0 +1,62 @@
<template>
<div>
<nav>
<collapse>
<el-form-item>
<refresh-button class="float-right" @refresh="loadData" />
</el-form-item>
</collapse>
</nav>
<el-table
:data="workers"
ref="table"
:default-sort="{prop: 'hostname', order: 'ascending'}"
stripe
table-layout="auto"
fixed
>
<el-table-column prop="workerUuid" sortable :sort-orders="['ascending', 'descending']" :label="$t('id')" />
<el-table-column prop="hostname" sortable :sort-orders="['ascending', 'descending']" :label="$t('hostname')" />
<el-table-column prop="workerGroup" sortable :sort-orders="['ascending', 'descending']" :label="$t('worker group')" />
<el-table-column prop="status" sortable :sort-orders="['ascending', 'descending']" :label="$t('state')" />
<el-table-column prop="heartbeatDate" sortable :sort-orders="['ascending', 'descending']" :label="$t('date')">
<template #default="scope">
<date-ago class-name="text-muted small" :inverted="true" :date="scope.row.heartbeatDate" />
</template>
</el-table-column>
</el-table>
</div>
</template>
<script>
import RouteContext from "../../mixins/routeContext";
import RefreshButton from "../../components/layout/RefreshButton.vue";
import Collapse from "../../components/layout/Collapse.vue";
import DateAgo from "../layout/DateAgo.vue";
export default {
mixins: [RouteContext],
components: {DateAgo, RefreshButton, Collapse},
data() {
return {
workers: undefined,
};
},
created() {
this.loadData();
},
methods: {
loadData() {
this.$store.dispatch("worker/findAll").then(workers => {
this.workers = workers;
});
}
},
computed: {
routeInfo() {
return {
title: this.$t("workers")
}
}
}
};
</script>

View File

@@ -47,6 +47,7 @@
import ViewDashboardVariantOutline from "vue-material-design-icons/ViewDashboardVariantOutline.vue"; import ViewDashboardVariantOutline from "vue-material-design-icons/ViewDashboardVariantOutline.vue";
import TimerCogOutline from "vue-material-design-icons/TimerCogOutline.vue"; import TimerCogOutline from "vue-material-design-icons/TimerCogOutline.vue";
import {mapState} from "vuex"; import {mapState} from "vuex";
import AccountHardHatOutline from "vue-material-design-icons/AccountHardHatOutline.vue";
export default { export default {
components: { components: {
@@ -203,6 +204,14 @@
element: TimerCogOutline, element: TimerCogOutline,
class: "menu-icon" class: "menu-icon"
} }
},
{
href: "/admin/workers",
title: this.$t("workers"),
icon: {
element: AccountHardHatOutline,
class: "menu-icon"
},
} }
] ]
}, },

View File

@@ -17,6 +17,8 @@ import FlowMetrics from "../components/flows/FlowMetrics.vue";
import Blueprints from "override/components/flows/blueprints/Blueprints.vue"; import Blueprints from "override/components/flows/blueprints/Blueprints.vue";
import BlueprintDetail from "../components/flows/blueprints/BlueprintDetail.vue"; import BlueprintDetail from "../components/flows/blueprints/BlueprintDetail.vue";
import Triggers from "../components/admin/Triggers.vue"; import Triggers from "../components/admin/Triggers.vue";
import Workers from "../components/admin/Workers.vue";
export default [ export default [
//Flows //Flows
@@ -56,6 +58,7 @@ export default [
//Admin //Admin
{name: "admin/triggers", path: "/:tenant?/admin/triggers", component: Triggers}, {name: "admin/triggers", path: "/:tenant?/admin/triggers", component: Triggers},
{name: "admin/workers", path: "/:tenant?/admin/workers", component: Workers},
//Errors //Errors
{name: "errors/404-wildcard", path: "/:pathMatch(.*)", component: Errors, props: {code: 404}}, {name: "errors/404-wildcard", path: "/:pathMatch(.*)", component: Errors, props: {code: 404}},

View File

@@ -13,6 +13,7 @@ import stat from "./stat"
import template from "./template" import template from "./template"
import taskrun from "./taskruns" import taskrun from "./taskruns"
import trigger from "./trigger"; import trigger from "./trigger";
import worker from "./worker";
export default { export default {
modules: { modules: {
@@ -30,6 +31,7 @@ export default {
graph, graph,
plugin, plugin,
taskrun, taskrun,
trigger trigger,
worker
} }
} }

13
ui/src/stores/worker.js Normal file
View File

@@ -0,0 +1,13 @@
import {apiUrl} from "override/utils/route";
export default {
namespaced: true,
actions: {
findAll(_, __) {
return this.$http.get(`${apiUrl(this)}/workers`).then(response => {
return response.data;
})
}
}
}

View File

@@ -499,7 +499,12 @@
"add task": "Add a task", "add task": "Add a task",
"unable to generate graph": "An issue occurred while generating the graph which prevent the topology to be displayed.", "unable to generate graph": "An issue occurred while generating the graph which prevent the topology to be displayed.",
"attempts": "Attempt(s)", "attempts": "Attempt(s)",
"flow deleted, you can restore it": "Flow has been deleted and this is a read-only view, but you can still restore it." "flow deleted, you can restore it": "Flow has been deleted and this is a read-only view, but you can still restore it.",
"workers": "Workers",
"worker group": "Worker Group",
"hostname": "Hostname",
"port": "Port",
"management port": "Management port"
}, },
"fr": { "fr": {
"id": "Identifiant", "id": "Identifiant",
@@ -993,7 +998,12 @@
"add task": "Ajouter une tâche", "add task": "Ajouter une tâche",
"unable to generate graph": "Une erreur est survenue lors de la génération du graphe empêchant l'affichage de la topologie.", "unable to generate graph": "Une erreur est survenue lors de la génération du graphe empêchant l'affichage de la topologie.",
"attempts": "Tentative(s)", "attempts": "Tentative(s)",
"flow deleted, you can restore it": "Le flow a été supprimé et ceci est une vue en lecture seule, mais vous pouvez toujours le restaurer." "flow deleted, you can restore it": "Le flow a été supprimé et ceci est une vue en lecture seule, mais vous pouvez toujours le restaurer.",
"workers": "Workers",
"worker group": "Worker Group",
"hostname": "Nom d'hôte",
"port": "Port",
"management port": "Port de gestion"
} }
} }

View File

@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.collectors.Usage; import io.kestra.core.models.collectors.Usage;
import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.TemplateRepositoryInterface; import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.kestra.core.repositories.WorkerInstanceRepositoryInterface;
import io.kestra.core.services.CollectorService; import io.kestra.core.services.CollectorService;
import io.kestra.core.services.InstanceService; import io.kestra.core.services.InstanceService;
import io.kestra.core.utils.VersionProvider; import io.kestra.core.utils.VersionProvider;
@@ -67,7 +68,6 @@ public class MiscController {
.version(versionProvider.getVersion()) .version(versionProvider.getVersion())
.isTaskRunEnabled(executionRepository.isTaskRunEnabled()) .isTaskRunEnabled(executionRepository.isTaskRunEnabled())
.isAnonymousUsageEnabled(this.isAnonymousUsageEnabled) .isAnonymousUsageEnabled(this.isAnonymousUsageEnabled)
.isWorkerInstanceEnabled(false)
.isTemplateEnabled(templateRepository.isPresent()); .isTemplateEnabled(templateRepository.isPresent());
if (this.environmentName != null || this.environmentColor != null) { if (this.environmentName != null || this.environmentColor != null) {
@@ -102,9 +102,6 @@ public class MiscController {
@JsonInclude @JsonInclude
Boolean isAnonymousUsageEnabled; Boolean isAnonymousUsageEnabled;
@JsonInclude
Boolean isWorkerInstanceEnabled;
@JsonInclude @JsonInclude
Boolean isTemplateEnabled; Boolean isTemplateEnabled;

View File

@@ -0,0 +1,29 @@
package io.kestra.webserver.controllers;
import io.kestra.core.repositories.WorkerInstanceRepositoryInterface;
import io.kestra.core.runners.WorkerInstance;
import io.micronaut.context.annotation.Requires;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.swagger.v3.oas.annotations.Operation;
import jakarta.inject.Inject;
import java.util.List;
@Controller("/api/v1/workers")
@Requires(bean = WorkerInstanceRepositoryInterface.class)
public class WorkerInstanceController {
@Inject
private WorkerInstanceRepositoryInterface workerInstanceRepositoryInterface;
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_JSON)
@Operation(tags = {"Workers"}, summary = "Get all workers")
public List<WorkerInstance> findAll() throws HttpStatusException {
return workerInstanceRepositoryInterface.findAll();
}
}

View File

@@ -433,7 +433,7 @@ class ExecutionControllerTest extends JdbcH2ControllerTest {
FileMetas.class FileMetas.class
).blockingFirst(); ).blockingFirst();
assertThat(metas.getSize(), equalTo(2466L)); assertThat(metas.getSize(), equalTo(2767L));
String newExecutionId = IdUtils.create(); String newExecutionId = IdUtils.create();

View File

@@ -30,6 +30,5 @@ class MiscControllerTest extends JdbcH2ControllerTest {
assertThat(response.getUuid(), notNullValue()); assertThat(response.getUuid(), notNullValue());
assertThat(response.getIsTaskRunEnabled(), is(false)); assertThat(response.getIsTaskRunEnabled(), is(false));
assertThat(response.getIsAnonymousUsageEnabled(), is(true)); assertThat(response.getIsAnonymousUsageEnabled(), is(true));
assertThat(response.getIsWorkerInstanceEnabled(), is(false));
} }
} }

View File

@@ -0,0 +1,54 @@
package io.kestra.webserver.controllers;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.kestra.webserver.controllers.h2.JdbcH2ControllerTest;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.rxjava2.http.client.RxHttpClient;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.UUID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
class WorkerInstanceControllerTest extends JdbcH2ControllerTest {
@Inject
@Client("/")
RxHttpClient client;
@Inject
AbstractJdbcWorkerInstanceRepository jdbcWorkerInstanceRepository;
@BeforeEach
protected void init() {
super.setup();
}
@SuppressWarnings("unchecked")
@Test
void list() {
WorkerInstance workerInstance = WorkerInstance.builder()
.workerUuid(UUID.randomUUID())
.workerGroup(null)
.managementPort(0)
.hostname("kestra.io")
.partitions(null)
.port(0)
.status(WorkerInstance.Status.UP)
.build();
jdbcWorkerInstanceRepository.save(workerInstance);
List<WorkerInstance> find = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/workers"), Argument.of(List.class, WorkerInstance.class));
assertThat(find.size(), is(1));
assertThat(find.get(0).getWorkerUuid(), is(workerInstance.getWorkerUuid()));
}
}

View File

@@ -18,10 +18,14 @@ jackson:
FAIL_ON_UNKNOWN_PROPERTIES: false FAIL_ON_UNKNOWN_PROPERTIES: false
kestra: kestra:
server-type: STANDALONE
storage: storage:
type: local type: local
local: local:
base-path: /tmp/unittest base-path: /tmp/unittest
heartbeat:
frequency: 10s
heartbeat-missed: 3
anonymous-usage-report: anonymous-usage-report:
enabled: true enabled: true
uri: https://api.kestra.io/v1/reports/usages uri: https://api.kestra.io/v1/reports/usages
@@ -81,6 +85,12 @@ kestra:
flowtopologies: flowtopologies:
table: "flow_topologies" table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"
datasources: datasources:
h2: h2:
url: jdbc:h2:mem:public;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE url: jdbc:h2:mem:public;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE