mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
1 Commits
dependabot
...
feat/worke
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ea8d86516 |
@@ -34,6 +34,7 @@ dependencies {
|
|||||||
implementation project(":storage-local")
|
implementation project(":storage-local")
|
||||||
|
|
||||||
implementation project(":webserver")
|
implementation project(":webserver")
|
||||||
|
implementation project(":worker")
|
||||||
|
|
||||||
//test
|
//test
|
||||||
testImplementation "org.wiremock:wiremock-jetty12"
|
testImplementation "org.wiremock:wiremock-jetty12"
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import picocli.CommandLine;
|
|||||||
WebServerCommand.class,
|
WebServerCommand.class,
|
||||||
WorkerCommand.class,
|
WorkerCommand.class,
|
||||||
LocalCommand.class,
|
LocalCommand.class,
|
||||||
|
WorkerAgentCommand.class,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import io.kestra.core.runners.StandAloneRunner;
|
|||||||
import io.kestra.core.services.SkipExecutionService;
|
import io.kestra.core.services.SkipExecutionService;
|
||||||
import io.kestra.core.services.StartExecutorService;
|
import io.kestra.core.services.StartExecutorService;
|
||||||
import io.kestra.core.utils.Await;
|
import io.kestra.core.utils.Await;
|
||||||
|
import io.kestra.controller.Controller;
|
||||||
import io.micronaut.context.ApplicationContext;
|
import io.micronaut.context.ApplicationContext;
|
||||||
import jakarta.annotation.Nullable;
|
import jakarta.annotation.Nullable;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
@@ -110,6 +111,8 @@ public class StandAloneCommand extends AbstractServerCommand {
|
|||||||
}
|
}
|
||||||
|
|
||||||
StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class);
|
StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class);
|
||||||
|
|
||||||
|
Controller controller = applicationContext.getBean(Controller.class);
|
||||||
|
|
||||||
if (this.workerThread == 0) {
|
if (this.workerThread == 0) {
|
||||||
standAloneRunner.setWorkerEnabled(false);
|
standAloneRunner.setWorkerEnabled(false);
|
||||||
|
|||||||
@@ -0,0 +1,59 @@
|
|||||||
|
package io.kestra.cli.commands.servers;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import io.kestra.core.contexts.KestraContext;
|
||||||
|
import io.kestra.core.models.ServerType;
|
||||||
|
import io.kestra.core.utils.Await;
|
||||||
|
import io.kestra.worker.Worker;
|
||||||
|
import io.micronaut.context.ApplicationContext;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import picocli.CommandLine;
|
||||||
|
import picocli.CommandLine.Option;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@CommandLine.Command(
|
||||||
|
name = "worker-agent",
|
||||||
|
description = "Start the Kestra worker"
|
||||||
|
)
|
||||||
|
public class WorkerAgentCommand extends AbstractServerCommand {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private ApplicationContext applicationContext;
|
||||||
|
|
||||||
|
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
|
||||||
|
private int thread = defaultWorkerThread();
|
||||||
|
|
||||||
|
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")
|
||||||
|
private String workerGroupKey = null;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public static Map<String, Object> propertiesOverrides() {
|
||||||
|
return ImmutableMap.of(
|
||||||
|
"kestra.server-type", ServerType.WORKER_AGENT
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
|
||||||
|
KestraContext.getContext().injectWorkerConfigs(thread, workerGroupKey);
|
||||||
|
|
||||||
|
super.call();
|
||||||
|
|
||||||
|
if (this.workerGroupKey != null && !this.workerGroupKey.matches("[a-zA-Z0-9_-]+")) {
|
||||||
|
throw new IllegalArgumentException("The --worker-group option must match the [a-zA-Z0-9_-]+ pattern");
|
||||||
|
}
|
||||||
|
|
||||||
|
Worker worker = applicationContext.getBean(Worker.class);
|
||||||
|
worker.start(thread, workerGroupKey);
|
||||||
|
|
||||||
|
Await.until(() -> !this.applicationContext.isRunning());
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String workerGroupKey() {
|
||||||
|
return workerGroupKey;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -50,7 +50,6 @@ micronaut:
|
|||||||
caches:
|
caches:
|
||||||
default:
|
default:
|
||||||
maximum-weight: 10485760
|
maximum-weight: 10485760
|
||||||
|
|
||||||
http:
|
http:
|
||||||
client:
|
client:
|
||||||
read-idle-timeout: 60s
|
read-idle-timeout: 60s
|
||||||
@@ -93,7 +92,13 @@ jackson:
|
|||||||
deserialization:
|
deserialization:
|
||||||
FAIL_ON_UNKNOWN_PROPERTIES: false
|
FAIL_ON_UNKNOWN_PROPERTIES: false
|
||||||
|
|
||||||
endpoints:
|
# Disable Micronaut GRPC
|
||||||
|
grpc:
|
||||||
|
server:
|
||||||
|
enabled: false
|
||||||
|
health:
|
||||||
|
enabled: false
|
||||||
|
endpoints:
|
||||||
all:
|
all:
|
||||||
port: 8081
|
port: 8081
|
||||||
enabled: true
|
enabled: true
|
||||||
|
|||||||
@@ -15,6 +15,8 @@ import jakarta.inject.Singleton;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.ArrayUtils;
|
import org.apache.commons.lang3.ArrayUtils;
|
||||||
|
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class MetricRegistry {
|
public class MetricRegistry {
|
||||||
@@ -180,11 +182,26 @@ public class MetricRegistry {
|
|||||||
* statement.
|
* statement.
|
||||||
*/
|
*/
|
||||||
public <T extends Number> T gauge(String name, String description, T number, String... tags) {
|
public <T extends Number> T gauge(String name, String description, T number, String... tags) {
|
||||||
Gauge.builder(metricName(name), () -> number)
|
registerGauge(name, description, () -> number, tags);
|
||||||
|
return number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a gauge that reports the value of the {@link Number}.
|
||||||
|
*
|
||||||
|
* @param name Name of the gauge being registered.
|
||||||
|
* @param description The metric description
|
||||||
|
* @param supplier A function that yields a double value for the gauge.
|
||||||
|
* @param tags Sequence of dimensions for breaking down the name.
|
||||||
|
* @param <T> The type of the number from which the gauge value is extracted.
|
||||||
|
* @return The number that was passed in so the registration can be done as part of an assignment
|
||||||
|
* statement.
|
||||||
|
*/
|
||||||
|
public <T extends Number> Gauge registerGauge(String name, String description, Supplier<T> supplier, String... tags) {
|
||||||
|
return Gauge.builder(metricName(name),supplier)
|
||||||
.description(description)
|
.description(description)
|
||||||
.tags(tags)
|
.tags(tags)
|
||||||
.register(this.meterRegistry);
|
.register(this.meterRegistry);
|
||||||
return number;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
package io.kestra.core.models;
|
package io.kestra.core.models;
|
||||||
|
|
||||||
public enum ServerType {
|
public enum ServerType {
|
||||||
EXECUTOR,
|
EXECUTOR,
|
||||||
INDEXER,
|
INDEXER,
|
||||||
SCHEDULER,
|
SCHEDULER,
|
||||||
STANDALONE,
|
STANDALONE,
|
||||||
WEBSERVER,
|
WEBSERVER,
|
||||||
WORKER,
|
WORKER,
|
||||||
|
WORKER_AGENT,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,4 +9,5 @@ import java.util.function.Consumer;
|
|||||||
public interface WorkerJobQueueInterface extends QueueInterface<WorkerJob> {
|
public interface WorkerJobQueueInterface extends QueueInterface<WorkerJob> {
|
||||||
|
|
||||||
Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer);
|
Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import io.kestra.core.utils.Exceptions;
|
|||||||
import io.opentelemetry.api.trace.Span;
|
import io.opentelemetry.api.trace.Span;
|
||||||
import io.opentelemetry.api.trace.StatusCode;
|
import io.opentelemetry.api.trace.StatusCode;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
import lombok.Synchronized;
|
import lombok.Synchronized;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
@@ -32,6 +33,7 @@ public abstract class AbstractWorkerCallable implements Callable<State.Type> {
|
|||||||
String uid;
|
String uid;
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
|
@Setter
|
||||||
Throwable exception;
|
Throwable exception;
|
||||||
|
|
||||||
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
|
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
|
||||||
@@ -80,7 +82,7 @@ public abstract class AbstractWorkerCallable implements Callable<State.Type> {
|
|||||||
*
|
*
|
||||||
* @see WorkerJobLifecycle#stop()
|
* @see WorkerJobLifecycle#stop()
|
||||||
*/
|
*/
|
||||||
protected abstract void signalStop();
|
public abstract void signalStop();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for this worker task to complete stopping.
|
* Wait for this worker task to complete stopping.
|
||||||
|
|||||||
@@ -17,4 +17,6 @@ public interface WorkerJobRunningStateStore {
|
|||||||
* @param key the key of the worker job to be deleted.
|
* @param key the key of the worker job to be deleted.
|
||||||
*/
|
*/
|
||||||
void deleteByKey(String key);
|
void deleteByKey(String key);
|
||||||
|
|
||||||
|
void put(WorkerJobRunning workerJobRunning);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ public class WorkerTaskCallable extends AbstractWorkerCallable {
|
|||||||
@Getter
|
@Getter
|
||||||
Output taskOutput;
|
Output taskOutput;
|
||||||
|
|
||||||
WorkerTaskCallable(WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry) {
|
public WorkerTaskCallable(WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry) {
|
||||||
super(runContext, task.getClass().getName(), workerTask.uid(), task.getClass().getClassLoader());
|
super(runContext, task.getClass().getName(), workerTask.uid(), task.getClass().getClassLoader());
|
||||||
this.workerTask = workerTask;
|
this.workerTask = workerTask;
|
||||||
this.task = task;
|
this.task = task;
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ public class WorkerTriggerCallable extends AbstractWorkerTriggerCallable {
|
|||||||
@Getter
|
@Getter
|
||||||
Optional<Execution> evaluate;
|
Optional<Execution> evaluate;
|
||||||
|
|
||||||
WorkerTriggerCallable(RunContext runContext, WorkerTrigger workerTrigger, PollingTriggerInterface pollingTrigger) {
|
public WorkerTriggerCallable(RunContext runContext, WorkerTrigger workerTrigger, PollingTriggerInterface pollingTrigger) {
|
||||||
super(runContext, pollingTrigger.getClass().getName(), workerTrigger);
|
super(runContext, pollingTrigger.getClass().getName(), workerTrigger);
|
||||||
this.pollingTrigger = pollingTrigger;
|
this.pollingTrigger = pollingTrigger;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ public class WorkerTriggerRealtimeCallable extends AbstractWorkerTriggerCallable
|
|||||||
Consumer<? super Throwable> onError;
|
Consumer<? super Throwable> onError;
|
||||||
Consumer<Execution> onNext;
|
Consumer<Execution> onNext;
|
||||||
|
|
||||||
WorkerTriggerRealtimeCallable(
|
public WorkerTriggerRealtimeCallable(
|
||||||
RunContext runContext,
|
RunContext runContext,
|
||||||
WorkerTrigger workerTrigger,
|
WorkerTrigger workerTrigger,
|
||||||
RealtimeTriggerInterface realtimeTrigger,
|
RealtimeTriggerInterface realtimeTrigger,
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ import java.util.stream.Collectors;
|
|||||||
/**
|
/**
|
||||||
* Runtime information about a Kestra's service (e.g., WORKER, EXECUTOR, etc.).
|
* Runtime information about a Kestra's service (e.g., WORKER, EXECUTOR, etc.).
|
||||||
*
|
*
|
||||||
* @param uid The service unique identifier.
|
* @param uid The service unique identifier.
|
||||||
* @param type The service type.
|
* @param type The service type.
|
||||||
* @param state The state of the service.
|
* @param state The state of the service.
|
||||||
* @param server The server running this service.
|
* @param server The server running this service.
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ public enum ServiceType {
|
|||||||
SCHEDULER,
|
SCHEDULER,
|
||||||
WEBSERVER,
|
WEBSERVER,
|
||||||
WORKER,
|
WORKER,
|
||||||
|
WORKER_AGENT,
|
||||||
|
CONTROLLER,
|
||||||
INVALID;
|
INVALID;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package io.kestra.runner.postgres;
|
package io.kestra.runner.postgres;
|
||||||
|
|
||||||
import io.kestra.core.exceptions.DeserializationException;
|
import io.kestra.core.exceptions.DeserializationException;
|
||||||
|
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||||
import io.kestra.core.runners.WorkerJob;
|
import io.kestra.core.runners.WorkerJob;
|
||||||
import io.kestra.core.queues.WorkerJobQueueInterface;
|
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||||
import io.kestra.core.utils.Either;
|
import io.kestra.core.utils.Either;
|
||||||
@@ -17,7 +18,7 @@ import java.util.function.Consumer;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class PostgresWorkerJobQueue extends PostgresQueue<WorkerJob> implements WorkerJobQueueInterface {
|
public class PostgresWorkerJobQueue extends PostgresQueue<WorkerJob> implements WorkerJobQueueInterface {
|
||||||
private final JdbcWorkerJobQueueService jdbcWorkerJobQueueService;
|
private final JdbcWorkerJobQueueService jdbcWorkerJobQueueService;
|
||||||
|
|
||||||
public PostgresWorkerJobQueue(ApplicationContext applicationContext) {
|
public PostgresWorkerJobQueue(ApplicationContext applicationContext) {
|
||||||
super(WorkerJob.class, applicationContext);
|
super(WorkerJob.class, applicationContext);
|
||||||
this.jdbcWorkerJobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
|
this.jdbcWorkerJobQueueService = applicationContext.getBean(JdbcWorkerJobQueueService.class);
|
||||||
|
|||||||
@@ -37,6 +37,11 @@ public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdb
|
|||||||
.execute()
|
.execute()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(WorkerJobRunning workerJobRunning) {
|
||||||
|
this.jdbcRepository.persist(workerJobRunning);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<WorkerJobRunning> findByKey(String uid) {
|
public Optional<WorkerJobRunning> findByKey(String uid) {
|
||||||
|
|||||||
@@ -23,5 +23,6 @@ include 'model'
|
|||||||
include 'processor'
|
include 'processor'
|
||||||
include 'script'
|
include 'script'
|
||||||
include 'e2e-tests'
|
include 'e2e-tests'
|
||||||
|
include 'worker'
|
||||||
|
|
||||||
include 'jmh-benchmarks'
|
include 'jmh-benchmarks'
|
||||||
42
worker/build.gradle
Normal file
42
worker/build.gradle
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
plugins {
|
||||||
|
id 'com.google.protobuf' version '0.9.4'
|
||||||
|
}
|
||||||
|
|
||||||
|
ext {
|
||||||
|
grpcVersion = '1.71.0'
|
||||||
|
protobufVersion = '3.25.1'
|
||||||
|
}
|
||||||
|
|
||||||
|
configurations {
|
||||||
|
tests
|
||||||
|
implementation.extendsFrom(micronaut)
|
||||||
|
}
|
||||||
|
|
||||||
|
dependencies {
|
||||||
|
// Kestra
|
||||||
|
implementation project(':core')
|
||||||
|
annotationProcessor project(':processor')
|
||||||
|
|
||||||
|
// gRPC
|
||||||
|
implementation("io.micronaut.grpc:micronaut-grpc-server-runtime")
|
||||||
|
implementation("io.micronaut.grpc:micronaut-grpc-client-runtime")
|
||||||
|
}
|
||||||
|
|
||||||
|
protobuf {
|
||||||
|
protoc {
|
||||||
|
artifact = "com.google.protobuf:protoc:$protobufVersion"
|
||||||
|
}
|
||||||
|
plugins {
|
||||||
|
grpc {
|
||||||
|
artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
generateProtoTasks {
|
||||||
|
all()*.plugins {
|
||||||
|
grpc {
|
||||||
|
// avoid issues javax packages
|
||||||
|
option '@generated=omit'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
71
worker/src/main/java/io/kestra/controller/Controller.java
Normal file
71
worker/src/main/java/io/kestra/controller/Controller.java
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
package io.kestra.controller;
|
||||||
|
|
||||||
|
import io.grpc.Grpc;
|
||||||
|
import io.grpc.InsecureServerCredentials;
|
||||||
|
import io.grpc.Server;
|
||||||
|
import io.kestra.controller.grpc.server.LivenessControllerService;
|
||||||
|
import io.kestra.controller.grpc.server.WorkerControllerService;
|
||||||
|
import io.kestra.core.server.Service;
|
||||||
|
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||||
|
import io.kestra.core.server.ServiceType;
|
||||||
|
import io.kestra.server.AbstractService;
|
||||||
|
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
public class Controller extends AbstractService implements Service {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(Controller.class);
|
||||||
|
|
||||||
|
private Server server;
|
||||||
|
|
||||||
|
private final WorkerControllerService workerControllerService;
|
||||||
|
private final LivenessControllerService livenessControllerService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public Controller(
|
||||||
|
WorkerControllerService workerControllerService,
|
||||||
|
LivenessControllerService livenessControllerService,
|
||||||
|
ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher) {
|
||||||
|
super(ServiceType.CONTROLLER, eventPublisher);
|
||||||
|
this.workerControllerService = workerControllerService;
|
||||||
|
this.livenessControllerService = livenessControllerService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void start() throws IOException {
|
||||||
|
if (getState() != ServiceState.CREATED) {
|
||||||
|
throw new IllegalStateException("Controller is already started or stopped");
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Starting Controller");
|
||||||
|
/* The port on which the server should run */
|
||||||
|
int port = 9096; // TODO to externalize
|
||||||
|
server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
|
||||||
|
.addService(workerControllerService)
|
||||||
|
.addService(livenessControllerService)
|
||||||
|
.build()
|
||||||
|
.start();
|
||||||
|
log.info("Controller started, listening on {}", port);
|
||||||
|
setState(ServiceState.RUNNING);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServiceState doStop() throws InterruptedException {
|
||||||
|
if (server != null && !server.isTerminated()) {
|
||||||
|
shutdownServerAndWait();
|
||||||
|
}
|
||||||
|
return ServiceState.TERMINATED_GRACEFULLY;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shutdownServerAndWait() throws InterruptedException {
|
||||||
|
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,60 @@
|
|||||||
|
package io.kestra.controller;
|
||||||
|
|
||||||
|
import io.kestra.controller.grpc.HeartbeatRequest;
|
||||||
|
import io.kestra.controller.grpc.HeartbeatResponse;
|
||||||
|
import io.kestra.controller.grpc.LivenessControllerServiceGrpc.LivenessControllerServiceBlockingStub;
|
||||||
|
import io.kestra.controller.messages.HeartbeatMessage;
|
||||||
|
import io.kestra.controller.messages.HeartbeatMessageReply;
|
||||||
|
import io.kestra.core.contexts.KestraContext;
|
||||||
|
import io.kestra.core.server.Service;
|
||||||
|
import io.kestra.core.server.ServiceInstance;
|
||||||
|
import io.kestra.core.server.ServiceLivenessUpdater;
|
||||||
|
import io.kestra.core.server.ServiceStateTransition;
|
||||||
|
import io.kestra.server.grpc.RequestOrResponseHeader;
|
||||||
|
import io.kestra.server.internals.MessageFormat;
|
||||||
|
import io.kestra.server.internals.MessageFormats;
|
||||||
|
import io.micronaut.context.annotation.Secondary;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class GrpcServiceLivenessUpdater implements ServiceLivenessUpdater {
|
||||||
|
|
||||||
|
private final LivenessControllerServiceBlockingStub client;
|
||||||
|
|
||||||
|
public GrpcServiceLivenessUpdater(final LivenessControllerServiceBlockingStub client) {
|
||||||
|
this.client = Objects.requireNonNull(client, "client must not be null.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} **/
|
||||||
|
@Override
|
||||||
|
public void update(ServiceInstance service) {
|
||||||
|
update(service, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} **/
|
||||||
|
@Override
|
||||||
|
public ServiceStateTransition.Response update(ServiceInstance instance, Service.ServiceState newState, String reason) {
|
||||||
|
HeartbeatResponse response = client.heartbeat(HeartbeatRequest
|
||||||
|
.newBuilder()
|
||||||
|
.setHeader(RequestOrResponseHeader
|
||||||
|
.newBuilder()
|
||||||
|
.setClientId(instance.uid())
|
||||||
|
.setClientVersion(KestraContext.getContext().getVersion())
|
||||||
|
.setMessageFormat(MessageFormats.JSON.name())
|
||||||
|
.setCorrelationId(UUID.randomUUID().toString())
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.setMessage(MessageFormats.JSON.toByteString(new HeartbeatMessage(instance, newState, reason)))
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
|
HeartbeatMessageReply messageReply = MessageFormat
|
||||||
|
.resolve(response.getHeader().getMessageFormat())
|
||||||
|
.fromByteString(response.getMessage(), HeartbeatMessageReply.class);
|
||||||
|
|
||||||
|
return new ServiceStateTransition.Response(messageReply.result(), messageReply.instance());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
package io.kestra.controller.grpc.client;
|
||||||
|
|
||||||
|
import io.kestra.controller.grpc.LivenessControllerServiceGrpc;
|
||||||
|
import io.kestra.server.GrpcChannelProvider;
|
||||||
|
import io.micronaut.context.annotation.Bean;
|
||||||
|
import io.micronaut.context.annotation.Factory;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
|
@Factory
|
||||||
|
public class GrpcClientBeanFactory {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Singleton
|
||||||
|
public LivenessControllerServiceGrpc.LivenessControllerServiceBlockingStub workerServiceStub(GrpcChannelProvider grpcChannelProvider) {
|
||||||
|
return LivenessControllerServiceGrpc.newBlockingStub(grpcChannelProvider.createOrGetDefault());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,54 @@
|
|||||||
|
package io.kestra.controller.grpc.server;
|
||||||
|
|
||||||
|
import io.grpc.stub.StreamObserver;
|
||||||
|
import io.kestra.controller.grpc.HeartbeatRequest;
|
||||||
|
import io.kestra.controller.grpc.HeartbeatResponse;
|
||||||
|
import io.kestra.controller.grpc.LivenessControllerServiceGrpc;
|
||||||
|
import io.kestra.controller.messages.HeartbeatMessage;
|
||||||
|
import io.kestra.controller.messages.HeartbeatMessageReply;
|
||||||
|
import io.kestra.core.server.ServiceLivenessUpdater;
|
||||||
|
import io.kestra.core.server.ServiceStateTransition;
|
||||||
|
import io.kestra.server.internals.MessageFormat;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
public class LivenessControllerService extends LivenessControllerServiceGrpc.LivenessControllerServiceImplBase {
|
||||||
|
|
||||||
|
private final ServiceLivenessUpdater serviceLivenessUpdater;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public LivenessControllerService(ServiceLivenessUpdater serviceLivenessUpdater) {
|
||||||
|
this.serviceLivenessUpdater = serviceLivenessUpdater;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
|
||||||
|
final MessageFormat messageFormat = MessageFormat.resolve(request.getHeader().getMessageFormat());
|
||||||
|
|
||||||
|
HeartbeatMessage message = messageFormat
|
||||||
|
.fromByteString(request.getMessage(), HeartbeatMessage.class);
|
||||||
|
|
||||||
|
ServiceStateTransition.Response response;
|
||||||
|
if (message.newState() != null) {
|
||||||
|
response = serviceLivenessUpdater.update(message.instance(), message.newState(), message.reason());
|
||||||
|
} else {
|
||||||
|
serviceLivenessUpdater.update(message.instance());
|
||||||
|
response = new ServiceStateTransition.Response(ServiceStateTransition.Result.SUCCEEDED, message.instance());
|
||||||
|
}
|
||||||
|
|
||||||
|
responseObserver.onNext(HeartbeatResponse
|
||||||
|
.newBuilder()
|
||||||
|
.setHeader(request.getHeader())
|
||||||
|
.setMessage(messageFormat.toByteString(new HeartbeatMessageReply(
|
||||||
|
response.instance(),
|
||||||
|
response.result()
|
||||||
|
)))
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,130 @@
|
|||||||
|
package io.kestra.controller.grpc.server;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import io.grpc.stub.ServerCallStreamObserver;
|
||||||
|
import io.grpc.stub.StreamObserver;
|
||||||
|
import io.kestra.core.queues.QueueException;
|
||||||
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
|
import io.kestra.core.queues.QueueInterface;
|
||||||
|
import io.kestra.core.queues.WorkerJobQueueInterface;
|
||||||
|
import io.kestra.core.runners.Worker;
|
||||||
|
import io.kestra.core.runners.WorkerInstance;
|
||||||
|
import io.kestra.core.runners.WorkerJob;
|
||||||
|
import io.kestra.core.runners.WorkerJobRunningStateStore;
|
||||||
|
import io.kestra.core.runners.WorkerTask;
|
||||||
|
import io.kestra.core.runners.WorkerTaskResult;
|
||||||
|
import io.kestra.core.runners.WorkerTaskRunning;
|
||||||
|
import io.kestra.core.runners.WorkerTrigger;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerRunning;
|
||||||
|
import io.kestra.server.internals.BatchMessage;
|
||||||
|
import io.kestra.server.internals.MessageFormat;
|
||||||
|
import io.kestra.worker.grpc.FetchWorkerJobRequest;
|
||||||
|
import io.kestra.worker.grpc.FetchWorkerJobResponse;
|
||||||
|
import io.kestra.worker.grpc.WorkerControllerServiceGrpc;
|
||||||
|
import io.kestra.worker.grpc.WorkerJobResultsRequest;
|
||||||
|
import io.kestra.worker.grpc.WorkerJobResultsResponse;
|
||||||
|
import io.kestra.worker.messages.FetchWorkerJobMessage;
|
||||||
|
import io.kestra.worker.messages.WorkerJobBatchMessage;
|
||||||
|
import jakarta.annotation.PreDestroy;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Named;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
@Slf4j
|
||||||
|
public class WorkerControllerService extends WorkerControllerServiceGrpc.WorkerControllerServiceImplBase {
|
||||||
|
|
||||||
|
public static final TypeReference<BatchMessage<WorkerTaskResult>> WORKER_TASK_RESULT_BATCH_MESSAGE_TYPE_REFERENCE = new TypeReference<>() {
|
||||||
|
};
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
|
||||||
|
private WorkerJobQueueInterface workerJobQueue;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
|
||||||
|
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private WorkerJobRunningStateStore workerJobRunningStateStore;
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<String, Runnable> disposables = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void fetchWorkerJobsStream(FetchWorkerJobRequest request, StreamObserver<FetchWorkerJobResponse> responseObserver) {
|
||||||
|
final MessageFormat messageFormat = MessageFormat.resolve(request.getHeader().getMessageFormat());
|
||||||
|
|
||||||
|
FetchWorkerJobMessage message = messageFormat.fromByteString(request.getMessage(), FetchWorkerJobMessage.class);
|
||||||
|
|
||||||
|
|
||||||
|
ServerCallStreamObserver<FetchWorkerJobResponse> serverObserver = (ServerCallStreamObserver<FetchWorkerJobResponse>) responseObserver;
|
||||||
|
|
||||||
|
log.info("Received worker-job request from worker [{}]", message.workerId());
|
||||||
|
serverObserver.setOnCancelHandler(() -> {
|
||||||
|
log.info("Worker [{}] disconnected or cancelled", message.workerId());
|
||||||
|
Optional.ofNullable(disposables.remove(message.workerId())).ifPresent(Runnable::run);
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
// Currently consumer thread is managed directly by the WorkerJobQueue.
|
||||||
|
// It could be preferable that the WorkerControllerServer start a polling thread
|
||||||
|
// for consuming the workerJobQueue (e.g., via a poll method) to be able to manage it more properly on cancel.
|
||||||
|
Runnable stopReceiving = this.workerJobQueue.receive(message.workerGroup(), Worker.class, either -> {
|
||||||
|
if (either.isRight()) {
|
||||||
|
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
WorkerJob job = either.getLeft();
|
||||||
|
log.info("Sending job [{}] to worker [{}]", job.uid(), message.workerId()); // TODO change to debug
|
||||||
|
serverObserver.onNext(FetchWorkerJobResponse
|
||||||
|
.newBuilder()
|
||||||
|
.setHeader(request.getHeader())
|
||||||
|
.setMessage(messageFormat.toByteString(new WorkerJobBatchMessage(List.of(job))))
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
|
WorkerInstance workerInstance = new WorkerInstance(message.workerId(), message.workerGroup());
|
||||||
|
if (job instanceof WorkerTask workerTask) {
|
||||||
|
workerJobRunningStateStore.put(WorkerTaskRunning.of(workerTask, workerInstance, -1));
|
||||||
|
} else if (job instanceof WorkerTrigger workerTrigger) {
|
||||||
|
workerJobRunningStateStore.put(WorkerTriggerRunning.of(workerTrigger, workerInstance, -1));
|
||||||
|
} else {
|
||||||
|
log.error("Message is of type [{}] which should never occurs", job);
|
||||||
|
}
|
||||||
|
|
||||||
|
}, false);
|
||||||
|
disposables.put(message.workerId(), () -> {
|
||||||
|
stopReceiving.run();
|
||||||
|
serverObserver.onCompleted();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendWorkerJobResults(WorkerJobResultsRequest request, StreamObserver<WorkerJobResultsResponse> responseObserver) {
|
||||||
|
final MessageFormat messageFormat = MessageFormat.resolve(request.getHeader().getMessageFormat());
|
||||||
|
BatchMessage<WorkerTaskResult> message = messageFormat.fromByteString(request.getMessage(), WORKER_TASK_RESULT_BATCH_MESSAGE_TYPE_REFERENCE);
|
||||||
|
message.records().forEach(workerTaskResult -> {
|
||||||
|
try {
|
||||||
|
workerTaskResultQueue.emit(workerTaskResult);
|
||||||
|
} catch (QueueException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
responseObserver.onNext(WorkerJobResultsResponse
|
||||||
|
.newBuilder()
|
||||||
|
.setHeader(request.getHeader())
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void close() {
|
||||||
|
this.disposables.values().forEach(Runnable::run);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
package io.kestra.controller.messages;
|
||||||
|
|
||||||
|
import io.kestra.core.server.Service;
|
||||||
|
import io.kestra.core.server.ServiceInstance;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Message for {@link io.kestra.controller.grpc.HeartbeatRequest}.
|
||||||
|
*/
|
||||||
|
public record HeartbeatMessage(
|
||||||
|
ServiceInstance instance,
|
||||||
|
Service.ServiceState newState,
|
||||||
|
String reason
|
||||||
|
) {
|
||||||
|
}
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
package io.kestra.controller.messages;
|
||||||
|
|
||||||
|
|
||||||
|
import io.kestra.core.server.ServiceInstance;
|
||||||
|
import io.kestra.core.server.ServiceStateTransition;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Message for {@link io.kestra.controller.grpc.HeartbeatResponse}.
|
||||||
|
*/
|
||||||
|
public record HeartbeatMessageReply(
|
||||||
|
ServiceInstance instance,
|
||||||
|
ServiceStateTransition.Result result
|
||||||
|
) {
|
||||||
|
}
|
||||||
87
worker/src/main/java/io/kestra/server/AbstractService.java
Normal file
87
worker/src/main/java/io/kestra/server/AbstractService.java
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
package io.kestra.server;
|
||||||
|
|
||||||
|
import io.kestra.core.server.Service;
|
||||||
|
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||||
|
import io.kestra.core.server.ServiceType;
|
||||||
|
import io.kestra.core.utils.IdUtils;
|
||||||
|
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||||
|
import jakarta.annotation.PreDestroy;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
// TODO add it kestra 1.x
|
||||||
|
@Slf4j
|
||||||
|
public class AbstractService implements Service {
|
||||||
|
|
||||||
|
private final String id;
|
||||||
|
private final ServiceType serviceType;
|
||||||
|
private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;
|
||||||
|
|
||||||
|
private final AtomicReference<ServiceState> state = new AtomicReference<>();
|
||||||
|
private final AtomicBoolean stopped = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
public AbstractService(ServiceType serviceType, ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher) {
|
||||||
|
this.id = IdUtils.create();
|
||||||
|
this.serviceType = serviceType;
|
||||||
|
this.eventPublisher = eventPublisher;
|
||||||
|
setState(ServiceState.CREATED);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setState(final ServiceState state) {
|
||||||
|
this.state.set(state);
|
||||||
|
this.eventPublisher.publishEvent(new ServiceStateChangeEvent(this, getProperties()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServiceType getType() {
|
||||||
|
return serviceType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServiceState getState() {
|
||||||
|
return state.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<String, Object> getProperties() {
|
||||||
|
return Map.of();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void stop() {
|
||||||
|
if (stopped.compareAndSet(false, true)) {
|
||||||
|
setState(ServiceState.TERMINATING);
|
||||||
|
log.info("Terminating");
|
||||||
|
try {
|
||||||
|
ServiceState serviceState = doStop();
|
||||||
|
setState(serviceState);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("Error while stopping service [{}]", this.getClass().getSimpleName(), e);
|
||||||
|
if (e instanceof InterruptedException) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
setState(ServiceState.TERMINATED_FORCED);
|
||||||
|
}
|
||||||
|
log.info("Service [{}] stopped {}", this.getClass().getSimpleName(), getState());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ServiceState doStop() throws Exception {
|
||||||
|
return ServiceState.TERMINATED_GRACEFULLY;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,78 @@
|
|||||||
|
package io.kestra.server;
|
||||||
|
|
||||||
|
import io.grpc.Channel;
|
||||||
|
import io.grpc.ManagedChannel;
|
||||||
|
import io.grpc.ManagedChannelBuilder;
|
||||||
|
import io.kestra.core.contexts.KestraContext;
|
||||||
|
import jakarta.annotation.PreDestroy;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
@Slf4j
|
||||||
|
public class GrpcChannelProvider {
|
||||||
|
|
||||||
|
private volatile ManagedChannel defaultChannel;
|
||||||
|
private volatile ExecutorService defaultExecutorService;
|
||||||
|
|
||||||
|
private final AtomicBoolean stopped = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a shared gRPC Channel.
|
||||||
|
* <p>
|
||||||
|
* This method will create the channel if necessary.
|
||||||
|
*
|
||||||
|
* @return the {@link Channel}
|
||||||
|
*/
|
||||||
|
public Channel createOrGetDefault() {
|
||||||
|
// TODO externalize all config
|
||||||
|
if (this.defaultChannel == null) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (this.defaultChannel == null) {
|
||||||
|
defaultExecutorService = Executors.newSingleThreadExecutor();
|
||||||
|
defaultChannel = ManagedChannelBuilder.forAddress("localhost", 9096)
|
||||||
|
.usePlaintext()
|
||||||
|
.enableRetry()
|
||||||
|
.maxRetryAttempts(10)
|
||||||
|
.userAgent(getUserAgent())
|
||||||
|
.keepAliveTime(1, TimeUnit.HOURS)
|
||||||
|
.keepAliveWithoutCalls(true)
|
||||||
|
.executor(defaultExecutorService)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return defaultChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void close() {
|
||||||
|
if (!stopped.compareAndSet(false, true)) {
|
||||||
|
return; // Method called twice
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.defaultChannel != null && !this.defaultChannel.isShutdown()) {
|
||||||
|
try {
|
||||||
|
shutdownServerAndWait();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("Error while stopping default gRPC channel", e);
|
||||||
|
if (e instanceof InterruptedException)
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
this.defaultExecutorService.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shutdownServerAndWait() throws InterruptedException {
|
||||||
|
this.defaultChannel.shutdown().awaitTermination(30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getUserAgent() {
|
||||||
|
return "Kestra/" + KestraContext.getContext().getVersion();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
package io.kestra.server.internals;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A generic bath message.
|
||||||
|
*
|
||||||
|
* @param records the records of the batch.
|
||||||
|
* @param <T> the record type.
|
||||||
|
*/
|
||||||
|
public record BatchMessage<T>(
|
||||||
|
List<T> records
|
||||||
|
) {
|
||||||
|
|
||||||
|
public static <T> BatchMessage<T> of(T... records) {
|
||||||
|
return new BatchMessage<>(Arrays.asList(records));
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<T> records() {
|
||||||
|
return Optional.ofNullable(records).orElse(List.of());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
package io.kestra.server.internals;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import io.kestra.core.utils.Enums;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a specific message format.
|
||||||
|
* <p>
|
||||||
|
* Each gRPC message contain a generic byte array `message` field.
|
||||||
|
*/
|
||||||
|
public interface MessageFormat {
|
||||||
|
|
||||||
|
<T> T fromByteString(ByteString data, Class<T> type);
|
||||||
|
|
||||||
|
<T> T fromByteString(ByteString data, TypeReference<T> type);
|
||||||
|
|
||||||
|
ByteString toByteString(Object value);
|
||||||
|
|
||||||
|
static MessageFormat resolve(final String format) {
|
||||||
|
try {
|
||||||
|
return Enums.getForNameIgnoreCase(format, MessageFormats.class);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
return MessageFormats.JSON; // default
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,64 @@
|
|||||||
|
package io.kestra.server.internals;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import io.kestra.core.serializers.JacksonMapper;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Supported formats for serialized messages in Protocol Buffer message.
|
||||||
|
*/
|
||||||
|
public enum MessageFormats implements MessageFormat{
|
||||||
|
|
||||||
|
JSON() {
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson(false);
|
||||||
|
|
||||||
|
/** {@inheritDoc} **/
|
||||||
|
@Override
|
||||||
|
public <T> T fromByteString(ByteString data, Class<T> type) {
|
||||||
|
byte[] bytes = toByteArray(data);
|
||||||
|
if (bytes == null || bytes.length == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return OBJECT_MAPPER.readValue(bytes, type);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> T fromByteString(ByteString data, TypeReference<T> type) {
|
||||||
|
byte[] bytes = toByteArray(data);
|
||||||
|
if (bytes == null || bytes.length == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return OBJECT_MAPPER.readValue(bytes, type);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} **/
|
||||||
|
@Override
|
||||||
|
public ByteString toByteString(Object value) {
|
||||||
|
if (value == null) {
|
||||||
|
return ByteString.EMPTY;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return ByteString.copyFrom(OBJECT_MAPPER.writeValueAsBytes(value));
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private static byte[] toByteArray(ByteString data) {
|
||||||
|
return Optional.ofNullable(data).map(ByteString::toByteArray).orElse(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
33
worker/src/main/java/io/kestra/worker/BeanFactory.java
Normal file
33
worker/src/main/java/io/kestra/worker/BeanFactory.java
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package io.kestra.worker;
|
||||||
|
|
||||||
|
import io.kestra.controller.GrpcServiceLivenessUpdater;
|
||||||
|
import io.kestra.controller.grpc.LivenessControllerServiceGrpc;
|
||||||
|
import io.kestra.core.server.ServiceLivenessUpdater;
|
||||||
|
import io.kestra.server.GrpcChannelProvider;
|
||||||
|
import io.kestra.worker.grpc.WorkerControllerServiceGrpc;
|
||||||
|
import io.micronaut.context.annotation.Bean;
|
||||||
|
import io.micronaut.context.annotation.Factory;
|
||||||
|
import io.micronaut.context.annotation.Primary;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
|
@Factory
|
||||||
|
public class BeanFactory {
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
@Primary
|
||||||
|
public ServiceLivenessUpdater serviceLivenessUpdater(LivenessControllerServiceGrpc.LivenessControllerServiceBlockingStub client) {
|
||||||
|
return new GrpcServiceLivenessUpdater(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Singleton
|
||||||
|
public WorkerControllerServiceGrpc.WorkerControllerServiceBlockingStub blockingWorkerServiceStub(GrpcChannelProvider grpcChannelProvider) {
|
||||||
|
return WorkerControllerServiceGrpc.newBlockingStub(grpcChannelProvider.createOrGetDefault());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Singleton
|
||||||
|
public WorkerControllerServiceGrpc.WorkerControllerServiceStub asyncWorkerServiceStub(GrpcChannelProvider grpcChannelProvider) {
|
||||||
|
return WorkerControllerServiceGrpc.newStub(grpcChannelProvider.createOrGetDefault());
|
||||||
|
}
|
||||||
|
}
|
||||||
210
worker/src/main/java/io/kestra/worker/Worker.java
Normal file
210
worker/src/main/java/io/kestra/worker/Worker.java
Normal file
@@ -0,0 +1,210 @@
|
|||||||
|
package io.kestra.worker;
|
||||||
|
|
||||||
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
|
import io.kestra.core.server.Metric;
|
||||||
|
import io.kestra.core.server.ServerConfig;
|
||||||
|
import io.kestra.core.server.Service;
|
||||||
|
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||||
|
import io.kestra.core.server.ServiceType;
|
||||||
|
import io.kestra.core.services.WorkerGroupService;
|
||||||
|
import io.kestra.core.utils.Await;
|
||||||
|
import io.kestra.server.AbstractService;
|
||||||
|
import io.micronaut.context.annotation.Prototype;
|
||||||
|
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static io.kestra.core.server.Service.ServiceState.TERMINATED_FORCED;
|
||||||
|
import static io.kestra.core.server.Service.ServiceState.TERMINATED_GRACEFULLY;
|
||||||
|
|
||||||
|
@SuppressWarnings("this-escape")
|
||||||
|
@Slf4j
|
||||||
|
@Prototype
|
||||||
|
public class Worker extends AbstractService implements Service {
|
||||||
|
|
||||||
|
private static final String SERVICE_PROPS_WORKER_GROUP = "worker.group";
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private MetricRegistry metricRegistry;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private ServerConfig serverConfig;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private final Map<Long, AtomicInteger> metricRunningCount = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final AtomicBoolean skipGracefulTermination = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private final WorkerGroupService workerGroupService;
|
||||||
|
|
||||||
|
private final AtomicBoolean initialized = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private final AtomicInteger pendingJobCount = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger runningJobCount = new AtomicInteger(0);
|
||||||
|
|
||||||
|
private final WorkerJobExecutor workerJobExecutor;
|
||||||
|
private final WorkerJobFetcher workerJobFetcher;
|
||||||
|
private final WorkerTaskResultSender workerTaskResultSender;
|
||||||
|
|
||||||
|
private String workerGroup;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link Worker} instance.
|
||||||
|
*/
|
||||||
|
@Inject
|
||||||
|
public Worker(
|
||||||
|
ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher,
|
||||||
|
WorkerGroupService workerGroupService,
|
||||||
|
WorkerJobExecutor workerJobExecutor,
|
||||||
|
WorkerJobFetcher workerJobFetcher,
|
||||||
|
WorkerTaskResultSender workerTaskResultSender
|
||||||
|
) {
|
||||||
|
super(ServiceType.WORKER_AGENT, eventPublisher);
|
||||||
|
this.workerGroupService = workerGroupService;
|
||||||
|
this.workerJobExecutor = workerJobExecutor;
|
||||||
|
this.workerJobFetcher = workerJobFetcher;
|
||||||
|
this.workerTaskResultSender = workerTaskResultSender;
|
||||||
|
this.setState(ServiceState.CREATED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Metric> getMetrics() {
|
||||||
|
if (this.metricRegistry == null) {
|
||||||
|
// can arrive if called before the instance is fully created
|
||||||
|
return Collections.emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
Stream<String> metrics = Stream.of(
|
||||||
|
MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT,
|
||||||
|
MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT,
|
||||||
|
MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT
|
||||||
|
);
|
||||||
|
|
||||||
|
return metrics
|
||||||
|
.flatMap(metric -> Optional.ofNullable(metricRegistry.findGauge(metric)).stream())
|
||||||
|
.map(Metric::of)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start(int numThreads, String workerGroupKey) {
|
||||||
|
if (!this.initialized.compareAndSet(false, true)) {
|
||||||
|
throw new IllegalStateException("Worker already started");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.workerGroup = workerGroupService.resolveGroupFromKey(workerGroupKey);
|
||||||
|
|
||||||
|
String[] tags = workerGroup == null ? new String[0] : new String[]{MetricRegistry.TAG_WORKER_GROUP, workerGroup};
|
||||||
|
// create metrics to store thread count, pending jobs and running jobs, so we can have autoscaling easily
|
||||||
|
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT_DESCRIPTION, numThreads, tags);
|
||||||
|
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT_DESCRIPTION, pendingJobCount, tags);
|
||||||
|
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT, MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT_DESCRIPTION, runningJobCount, tags);
|
||||||
|
|
||||||
|
workerTaskResultSender.start(getId(), workerGroup);
|
||||||
|
workerJobFetcher.start(getId(), workerGroup);
|
||||||
|
workerJobExecutor.start(getId(), workerGroup, numThreads);
|
||||||
|
|
||||||
|
if (workerGroupKey != null) {
|
||||||
|
log.info("Worker started with {} thread(s) in group '{}'", numThreads, workerGroupKey);
|
||||||
|
} else {
|
||||||
|
log.info("Worker started with {} thread(s)", numThreads);
|
||||||
|
}
|
||||||
|
setState(ServiceState.RUNNING);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
**/
|
||||||
|
@Override
|
||||||
|
protected Map<String, Object> getProperties() {
|
||||||
|
Map<String, Object> properties = new HashMap<>();
|
||||||
|
properties.put(SERVICE_PROPS_WORKER_GROUP, workerGroup);
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServiceState doStop() {
|
||||||
|
this.workerJobFetcher.stop();
|
||||||
|
this.workerJobExecutor.pause();
|
||||||
|
this.workerTaskResultSender.stop();
|
||||||
|
|
||||||
|
final boolean terminatedGracefully;
|
||||||
|
if (!skipGracefulTermination.get()) {
|
||||||
|
terminatedGracefully = waitForTasksCompletion(serverConfig.terminationGracePeriod());
|
||||||
|
} else {
|
||||||
|
log.info("Terminating now and skip waiting for tasks completions.");
|
||||||
|
this.workerJobExecutor.shutdownNow();
|
||||||
|
terminatedGracefully = false;
|
||||||
|
}
|
||||||
|
return terminatedGracefully ? TERMINATED_GRACEFULLY : TERMINATED_FORCED;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private boolean waitForTasksCompletion(final Duration timeout) {
|
||||||
|
final Instant deadline = Instant.now().plus(timeout);
|
||||||
|
|
||||||
|
AtomicReference<ServiceState> shutdownState = new AtomicReference<>();
|
||||||
|
// start shutdown
|
||||||
|
Thread.ofVirtual().name("worker-shutdown").start(
|
||||||
|
() -> {
|
||||||
|
try {
|
||||||
|
long remaining = Math.max(0, Instant.now().until(deadline, ChronoUnit.MILLIS));
|
||||||
|
boolean gracefullyShutdown = this.workerJobExecutor.shutdown(Duration.ofMillis(remaining));
|
||||||
|
shutdownState.set(gracefullyShutdown ? TERMINATED_GRACEFULLY : TERMINATED_FORCED);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("Failed to shutdown. Thread was interrupted");
|
||||||
|
shutdownState.set(TERMINATED_FORCED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// wait for task completion
|
||||||
|
Await.until(
|
||||||
|
() -> {
|
||||||
|
ServiceState serviceState = shutdownState.get();
|
||||||
|
if (serviceState == TERMINATED_FORCED || serviceState == TERMINATED_GRACEFULLY) {
|
||||||
|
log.info("All working threads are terminated.");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
long runningJobs = this.workerJobExecutor.getRunningJobCount();
|
||||||
|
if (runningJobs == 0) {
|
||||||
|
log.debug("All worker threads is terminated.");
|
||||||
|
} else {
|
||||||
|
log.warn("Waiting for all worker threads to terminate (remaining: {}).", runningJobs);
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
},
|
||||||
|
Duration.ofSeconds(1)
|
||||||
|
);
|
||||||
|
|
||||||
|
return shutdownState.get() == TERMINATED_GRACEFULLY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specify whether to skip graceful termination on shutdown.
|
||||||
|
*
|
||||||
|
* @param skipGracefulTermination {@code true} to skip graceful termination on shutdown.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void skipGracefulTermination(final boolean skipGracefulTermination) {
|
||||||
|
this.skipGracefulTermination.set(skipGracefulTermination);
|
||||||
|
}
|
||||||
|
}
|
||||||
95
worker/src/main/java/io/kestra/worker/WorkerIOThread.java
Normal file
95
worker/src/main/java/io/kestra/worker/WorkerIOThread.java
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
package io.kestra.worker;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A WorkerIO thread is responsible for processing incoming/outgoing data from and to the worker.
|
||||||
|
* <p>
|
||||||
|
* A WorkerIO mostly does network operations.
|
||||||
|
*/
|
||||||
|
public abstract class WorkerIOThread implements Runnable {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(WorkerIOThread.class);
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
protected String workerId;
|
||||||
|
protected String workerGroup;
|
||||||
|
|
||||||
|
private volatile Thread thread;
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
private final CountDownLatch stopped = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public WorkerIOThread(final String name) {
|
||||||
|
this.name = Objects.requireNonNull(name, "name must not be null");
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void start(final String workerId, final String workerGroup) {
|
||||||
|
if (!running.compareAndSet(false, true)) {
|
||||||
|
throw new IllegalStateException("[%s] already started".formatted(getClass().getSimpleName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
this.workerId = workerId;
|
||||||
|
this.workerGroup = workerGroup;
|
||||||
|
// TODO could be probably replace with a virtual-thread executor
|
||||||
|
this.thread = new Thread(this, "worker-" + this.name + "-" + workerId);
|
||||||
|
this.thread.setDaemon(false);
|
||||||
|
this.thread.start();
|
||||||
|
|
||||||
|
LOG.info("[{}] started with workerId={} group={}", getClass().getSimpleName(), workerId, workerGroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (running.get()) {
|
||||||
|
try {
|
||||||
|
doOnLoop();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.info("[{}] interrupted, stopping", getClass().getSimpleName());
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break; // exit loop
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error in IO worker loop", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stopped.countDown();
|
||||||
|
LOG.info("[{}] stopped", getClass().getSimpleName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void doOnLoop() throws Exception;
|
||||||
|
|
||||||
|
protected void doOnStop() {
|
||||||
|
//noop
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void stop() {
|
||||||
|
if (!running.compareAndSet(true, false)) {
|
||||||
|
LOG.debug("[{}] stop() called but not running", getClass().getSimpleName());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (thread != null) {
|
||||||
|
try {
|
||||||
|
doOnStop();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error in IO worker loop", e);
|
||||||
|
}
|
||||||
|
thread.interrupt();
|
||||||
|
try {
|
||||||
|
if (!stopped.await(1, TimeUnit.MINUTES)) {
|
||||||
|
LOG.warn("Timeout while waiting for {} to complete", thread.getName());
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
272
worker/src/main/java/io/kestra/worker/WorkerJobExecutor.java
Normal file
272
worker/src/main/java/io/kestra/worker/WorkerJobExecutor.java
Normal file
@@ -0,0 +1,272 @@
|
|||||||
|
package io.kestra.worker;
|
||||||
|
|
||||||
|
import io.kestra.core.runners.WorkerJob;
|
||||||
|
import io.kestra.core.utils.ExecutorsUtils;
|
||||||
|
import io.kestra.worker.processors.WorkerJobProcessor;
|
||||||
|
import io.kestra.worker.processors.WorkerJobProcessorFactory;
|
||||||
|
import io.kestra.worker.queues.WorkerJobQueue;
|
||||||
|
import io.kestra.worker.queues.WorkerQueueFactory;
|
||||||
|
import io.micronaut.context.annotation.Prototype;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Components responsible for executing {@link io.kestra.core.runners.WorkerJob}
|
||||||
|
*/
|
||||||
|
@Prototype
|
||||||
|
@Slf4j
|
||||||
|
public class WorkerJobExecutor {
|
||||||
|
|
||||||
|
private static final String EXECUTOR_NAME = "worker";
|
||||||
|
|
||||||
|
private final WorkerQueueFactory workerQueueFactory;
|
||||||
|
private final WorkerJobProcessorFactory workerJobProcessorFactory;
|
||||||
|
private final ExecutorsUtils executorsUtils;
|
||||||
|
|
||||||
|
private ExecutorService executorService;
|
||||||
|
private List<WorkerJobConsumer> workerJobConsumers;
|
||||||
|
|
||||||
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public WorkerJobExecutor(final WorkerQueueFactory workerQueueFactory,
|
||||||
|
final ExecutorsUtils executorsUtils,
|
||||||
|
final WorkerJobProcessorFactory workerJobProcessorFactory) {
|
||||||
|
this.workerJobProcessorFactory = workerJobProcessorFactory;
|
||||||
|
this.workerQueueFactory = workerQueueFactory;
|
||||||
|
this.executorsUtils = executorsUtils;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start(final String workerId,
|
||||||
|
final String workerGroup,
|
||||||
|
int threads) {
|
||||||
|
WorkerJobQueue workerJobQueue = new WorkerJobQueue.Default(workerQueueFactory.getOrCreate(workerId, WorkerJob.class));
|
||||||
|
if (this.started.compareAndSet(false, true)) {
|
||||||
|
this.executorService = executorsUtils.maxCachedThreadPool(threads, EXECUTOR_NAME);
|
||||||
|
this.workerJobConsumers = new ArrayList<>(threads);
|
||||||
|
for (int i = 0; i < threads; i++) {
|
||||||
|
WorkerJobConsumer consumer = new WorkerJobConsumer(
|
||||||
|
workerJobQueue,
|
||||||
|
workerJobProcessorFactory,
|
||||||
|
workerId,
|
||||||
|
workerGroup
|
||||||
|
);
|
||||||
|
this.workerJobConsumers.add(consumer);
|
||||||
|
executorService.submit(consumer);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException("already started");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of running a job.
|
||||||
|
*
|
||||||
|
* @return the number of job being processed
|
||||||
|
*/
|
||||||
|
public long getRunningJobCount() {
|
||||||
|
return workerJobConsumers.stream()
|
||||||
|
.filter(WorkerJobConsumer::isProcessing)
|
||||||
|
.count();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notify all underlying WorkerJob consumers to pause.
|
||||||
|
*/
|
||||||
|
public void pause() {
|
||||||
|
workerJobConsumers.forEach(WorkerJobConsumer::pause);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notify all underlying WorkerJob consumers to resume.
|
||||||
|
*/
|
||||||
|
public void resume() {
|
||||||
|
checkIsStarted();
|
||||||
|
workerJobConsumers.forEach(WorkerJobConsumer::resume);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkIsStarted() {
|
||||||
|
if (!this.started.get()) {
|
||||||
|
throw new IllegalStateException("WorkerJobExecutor not started");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Immediately initiates shutdown of all consumers and halts the processing of waiting jobs.
|
||||||
|
* <p>
|
||||||
|
* This is a convenience method that calls {@link #shutdown(Duration)} with {@code Duration.ZERO}
|
||||||
|
* and ignores any {@link InterruptedException} by resetting the interrupt flag.
|
||||||
|
*/
|
||||||
|
public void shutdownNow() {
|
||||||
|
try {
|
||||||
|
shutdown(Duration.ZERO);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initiates a graceful shutdown by notifying all consumers to stop and waiting for termination.
|
||||||
|
* <p>
|
||||||
|
* If the specified {@code terminationGracePeriod} is {@code null} or {@code Duration.ZERO},
|
||||||
|
* the executor will skip graceful shutdown and immediately attempt to forcefully stop all
|
||||||
|
* running tasks.
|
||||||
|
*
|
||||||
|
* @param terminationGracePeriod the maximum duration to wait for graceful shutdown
|
||||||
|
* @return {@code true} if the executor terminated within the timeout; {@code false} if forced shutdown was required
|
||||||
|
* @throws InterruptedException if the current thread is interrupted while waiting
|
||||||
|
*/
|
||||||
|
public boolean shutdown(final Duration terminationGracePeriod) throws InterruptedException {
|
||||||
|
if (!this.started.compareAndSet(true, false)) {
|
||||||
|
return true; // Already shut down or not started.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initiate graceful shutdown
|
||||||
|
this.executorService.shutdown();
|
||||||
|
|
||||||
|
// Notify all WorkerJobConsumers to stop
|
||||||
|
this.workerJobConsumers.forEach(WorkerJobConsumer::stop);
|
||||||
|
|
||||||
|
if (terminationGracePeriod == null || terminationGracePeriod.equals(Duration.ZERO)) {
|
||||||
|
this.executorService.shutdownNow();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all WorkerJobConsumers to terminate
|
||||||
|
boolean terminated = this.executorService.awaitTermination(
|
||||||
|
terminationGracePeriod.toMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
if (!terminated) {
|
||||||
|
log.warn("Worker still has pending jobs after the termination grace period. Forcing shutdown.");
|
||||||
|
this.executorService.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
return terminated;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class WorkerJobConsumer implements Runnable {
|
||||||
|
|
||||||
|
private final AtomicBoolean stopped = new AtomicBoolean(false);
|
||||||
|
private final AtomicBoolean paused = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private final ReentrantLock pauseLock = new ReentrantLock();
|
||||||
|
private final Condition unpaused = pauseLock.newCondition();
|
||||||
|
|
||||||
|
private final AtomicReference<WorkerJobProcessor<WorkerJob>> running = new AtomicReference<>(null);
|
||||||
|
|
||||||
|
private final WorkerJobQueue workerJobQueue;
|
||||||
|
private final WorkerJobProcessorFactory workerJobProcessorFactory;
|
||||||
|
private final String workerId;
|
||||||
|
private final String workerGroup;
|
||||||
|
|
||||||
|
public WorkerJobConsumer(WorkerJobQueue workerJobQueue,
|
||||||
|
WorkerJobProcessorFactory workerJobProcessorFactory,
|
||||||
|
String workerId,
|
||||||
|
String workerGroup) {
|
||||||
|
this.workerJobQueue = workerJobQueue;
|
||||||
|
this.workerJobProcessorFactory = workerJobProcessorFactory;
|
||||||
|
this.workerId = workerId;
|
||||||
|
this.workerGroup = workerGroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Continuously polls for new {@link WorkerJob} and processes them sequentially.
|
||||||
|
* <p>
|
||||||
|
* It blocks while waiting for new jobs and ensures that only one job is processed
|
||||||
|
* at a time. This method will not return unless interrupted or explicitly stopped.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (!stopped.get()) {
|
||||||
|
waitIfPaused();
|
||||||
|
|
||||||
|
WorkerJob job = workerJobQueue.poll(Duration.ofSeconds(1));
|
||||||
|
if (job == null || stopped.get()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
WorkerJobProcessor<WorkerJob> processor =
|
||||||
|
workerJobProcessorFactory.create(workerId, workerGroup, job);
|
||||||
|
|
||||||
|
running.set(processor);
|
||||||
|
processor.process(job);
|
||||||
|
} finally {
|
||||||
|
running.set(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether a job is currently being processed
|
||||||
|
*
|
||||||
|
* @return {@code true} if a {@link WorkerJob} is actively being processed; {@code false} otherwise.
|
||||||
|
*/
|
||||||
|
public boolean isProcessing() {
|
||||||
|
return running.get() != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitIfPaused() throws InterruptedException {
|
||||||
|
pauseLock.lock();
|
||||||
|
try {
|
||||||
|
while (paused.get() && !stopped.get()) {
|
||||||
|
unpaused.await(); // Wait until resume() signals
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
pauseLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pauses polling for new {@link WorkerJob} instances.
|
||||||
|
* <p>
|
||||||
|
* If a job is currently running, it will continue to completion.
|
||||||
|
* No new jobs will be polled until resumed.
|
||||||
|
*/
|
||||||
|
public void pause() {
|
||||||
|
paused.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resumes polling and processing of {@link WorkerJob} instances if currently paused.
|
||||||
|
*/
|
||||||
|
public void resume() {
|
||||||
|
pauseLock.lock();
|
||||||
|
try {
|
||||||
|
if (paused.compareAndSet(true, false)) {
|
||||||
|
unpaused.signalAll();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
pauseLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops polling and processing of {@link WorkerJob} instances.
|
||||||
|
*/
|
||||||
|
public void stop() {
|
||||||
|
if (this.stopped.compareAndSet(false, true)) {
|
||||||
|
resume(); // In case it's paused and blocked
|
||||||
|
|
||||||
|
WorkerJobProcessor<WorkerJob> processor = running.get();
|
||||||
|
if (processor != null) {
|
||||||
|
processor.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
122
worker/src/main/java/io/kestra/worker/WorkerJobFetcher.java
Normal file
122
worker/src/main/java/io/kestra/worker/WorkerJobFetcher.java
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
package io.kestra.worker;
|
||||||
|
|
||||||
|
import io.grpc.stub.ClientCallStreamObserver;
|
||||||
|
import io.grpc.stub.ClientResponseObserver;
|
||||||
|
import io.kestra.core.contexts.KestraContext;
|
||||||
|
import io.kestra.core.runners.WorkerJob;
|
||||||
|
import io.kestra.server.grpc.RequestOrResponseHeader;
|
||||||
|
import io.kestra.server.internals.MessageFormat;
|
||||||
|
import io.kestra.server.internals.MessageFormats;
|
||||||
|
import io.kestra.worker.grpc.FetchWorkerJobRequest;
|
||||||
|
import io.kestra.worker.grpc.FetchWorkerJobResponse;
|
||||||
|
import io.kestra.worker.grpc.WorkerControllerServiceGrpc.WorkerControllerServiceStub;
|
||||||
|
import io.kestra.worker.messages.FetchWorkerJobMessage;
|
||||||
|
import io.kestra.worker.messages.WorkerJobBatchMessage;
|
||||||
|
import io.kestra.worker.queues.WorkerJobQueue;
|
||||||
|
import io.kestra.worker.queues.WorkerQueueFactory;
|
||||||
|
import io.micronaut.context.annotation.Prototype;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Component responsible for fetching worker jobs.
|
||||||
|
*/
|
||||||
|
@Prototype
|
||||||
|
@Slf4j
|
||||||
|
public class WorkerJobFetcher extends WorkerIOThread {
|
||||||
|
|
||||||
|
private final WorkerControllerServiceStub workerControllerServiceStub;
|
||||||
|
private final WorkerQueueFactory workerQueueFactory;
|
||||||
|
private WorkerJobQueue workerJobQueue;
|
||||||
|
|
||||||
|
private final AtomicReference<ClientCallStreamObserver<FetchWorkerJobRequest>> currentStreamObserver = new AtomicReference<>();
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public WorkerJobFetcher(
|
||||||
|
final WorkerControllerServiceStub workerControllerServiceStub,
|
||||||
|
final WorkerQueueFactory workerQueueFactory) {
|
||||||
|
super(WorkerJobFetcher.class.getSimpleName());
|
||||||
|
this.workerQueueFactory = workerQueueFactory;
|
||||||
|
this.workerControllerServiceStub = workerControllerServiceStub;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
**/
|
||||||
|
@Override
|
||||||
|
public synchronized void start(String workerId, String workerGroup) {
|
||||||
|
this.workerJobQueue = new WorkerJobQueue.Default(workerQueueFactory.getOrCreate(workerId, WorkerJob.class));
|
||||||
|
super.start(workerId, workerGroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
**/
|
||||||
|
@Override
|
||||||
|
protected void doOnLoop() throws Exception {
|
||||||
|
|
||||||
|
FetchWorkerJobRequest request = FetchWorkerJobRequest.newBuilder()
|
||||||
|
.setHeader(newRequestOrResponseHeader())
|
||||||
|
.setMessage(MessageFormats.JSON.toByteString(new FetchWorkerJobMessage(workerId, workerGroup)))
|
||||||
|
.build();
|
||||||
|
CountDownLatch completed = new CountDownLatch(1);
|
||||||
|
|
||||||
|
// Start the streaming call
|
||||||
|
ClientResponseObserver<FetchWorkerJobRequest, FetchWorkerJobResponse> streamCompleted = new ClientResponseObserver<>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeStart(ClientCallStreamObserver<FetchWorkerJobRequest> requestStream) {
|
||||||
|
currentStreamObserver.set(requestStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(FetchWorkerJobResponse response) {
|
||||||
|
log.info("Stream onNext: {}", response);
|
||||||
|
String messageFormat = response.getHeader().getMessageFormat();
|
||||||
|
WorkerJobBatchMessage workerJobBatch = MessageFormat
|
||||||
|
.resolve(messageFormat)
|
||||||
|
.fromByteString(response.getMessage(), WorkerJobBatchMessage.class);
|
||||||
|
|
||||||
|
if (workerJobBatch != null && !workerJobBatch.jobs().isEmpty()) {
|
||||||
|
workerJobBatch.jobs().forEach(workerJobQueue::put);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
log.error("Stream error: {}", t.getMessage(), t);
|
||||||
|
completed.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
log.error("Stream completed");
|
||||||
|
completed.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
workerControllerServiceStub.fetchWorkerJobsStream(request, streamCompleted);
|
||||||
|
completed.await(); // Block until stream ends
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doOnStop() {
|
||||||
|
ClientCallStreamObserver<FetchWorkerJobRequest> active = currentStreamObserver.getAndSet(null);
|
||||||
|
if (active != null) {
|
||||||
|
active.cancel("Worker stopping", null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private RequestOrResponseHeader newRequestOrResponseHeader() {
|
||||||
|
return RequestOrResponseHeader
|
||||||
|
.newBuilder()
|
||||||
|
.setClientId(workerId)
|
||||||
|
.setClientVersion(KestraContext.getContext().getVersion())
|
||||||
|
.setMessageFormat(MessageFormats.JSON.name())
|
||||||
|
.setCorrelationId(UUID.randomUUID().toString())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,95 @@
|
|||||||
|
package io.kestra.worker;
|
||||||
|
|
||||||
|
import io.grpc.stub.StreamObserver;
|
||||||
|
import io.kestra.core.contexts.KestraContext;
|
||||||
|
import io.kestra.core.runners.WorkerJob;
|
||||||
|
import io.kestra.core.runners.WorkerTaskResult;
|
||||||
|
import io.kestra.server.GrpcChannelProvider;
|
||||||
|
import io.kestra.server.grpc.RequestOrResponseHeader;
|
||||||
|
import io.kestra.server.internals.BatchMessage;
|
||||||
|
import io.kestra.server.internals.MessageFormats;
|
||||||
|
import io.kestra.worker.grpc.WorkerControllerServiceGrpc;
|
||||||
|
import io.kestra.worker.grpc.WorkerJobResultsRequest;
|
||||||
|
import io.kestra.worker.grpc.WorkerJobResultsResponse;
|
||||||
|
import io.kestra.worker.queues.WorkerJobQueue;
|
||||||
|
import io.kestra.worker.queues.WorkerQueueFactory;
|
||||||
|
import io.kestra.worker.queues.WorkerTaskResultQueue;
|
||||||
|
import io.kestra.worker.queues.WorkerTriggerResultQueue;
|
||||||
|
import io.micronaut.context.annotation.Prototype;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Component responsible for fetching worker jobs.
|
||||||
|
*/
|
||||||
|
@Prototype
|
||||||
|
@Slf4j
|
||||||
|
public class WorkerTaskResultSender extends WorkerIOThread {
|
||||||
|
|
||||||
|
private final WorkerControllerServiceGrpc.WorkerControllerServiceStub controllerServiceStub;
|
||||||
|
private final WorkerQueueFactory workerQueueFactory;
|
||||||
|
private WorkerTaskResultQueue queue;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public WorkerTaskResultSender(
|
||||||
|
final WorkerControllerServiceGrpc.WorkerControllerServiceStub controllerServiceStub,
|
||||||
|
final WorkerQueueFactory workerQueueFactory) {
|
||||||
|
super(WorkerTaskResultSender.class.getSimpleName());
|
||||||
|
this.workerQueueFactory = workerQueueFactory;
|
||||||
|
this.controllerServiceStub = controllerServiceStub;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
**/
|
||||||
|
@Override
|
||||||
|
public synchronized void start(String workerId, String workerGroup) {
|
||||||
|
this.queue = new WorkerTaskResultQueue.Default(workerQueueFactory.getOrCreate(workerId, WorkerTaskResult.class));
|
||||||
|
super.start(workerId, workerGroup);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void doOnLoop() throws Exception {
|
||||||
|
WorkerTaskResult result = queue.poll(Duration.ofMillis(Long.MAX_VALUE));
|
||||||
|
if (result == null) return;
|
||||||
|
|
||||||
|
WorkerJobResultsRequest request = WorkerJobResultsRequest
|
||||||
|
.newBuilder()
|
||||||
|
.setHeader(newRequestOrResponseHeader())
|
||||||
|
.setMessage(MessageFormats.JSON.toByteString(BatchMessage.of(result)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
controllerServiceStub.sendWorkerJobResults(request, new StreamObserver<>() {
|
||||||
|
@Override
|
||||||
|
public void onNext(WorkerJobResultsResponse value) {
|
||||||
|
log.info("onNext {}", value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
log.error("Error while sending worker job results", t);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
log.info("onCompleted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RequestOrResponseHeader newRequestOrResponseHeader() {
|
||||||
|
return RequestOrResponseHeader
|
||||||
|
.newBuilder()
|
||||||
|
.setClientId(workerId)
|
||||||
|
.setClientVersion(KestraContext.getContext().getVersion())
|
||||||
|
.setMessageFormat(MessageFormats.JSON.name())
|
||||||
|
.setCorrelationId(UUID.randomUUID().toString())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
package io.kestra.worker.messages;
|
||||||
|
|
||||||
|
public record FetchWorkerJobMessage(
|
||||||
|
String workerId,
|
||||||
|
String workerGroup
|
||||||
|
) {
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package io.kestra.worker.messages;
|
||||||
|
|
||||||
|
import io.kestra.core.runners.WorkerJob;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
public record WorkerJobBatchMessage(
|
||||||
|
List<WorkerJob> jobs
|
||||||
|
) {
|
||||||
|
|
||||||
|
public List<WorkerJob> getJobs() {
|
||||||
|
return Optional.ofNullable(jobs).orElse(List.of());
|
||||||
|
}
|
||||||
|
}
|
||||||
6
worker/src/main/java/io/kestra/worker/package-info.java
Normal file
6
worker/src/main/java/io/kestra/worker/package-info.java
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
@Configuration
|
||||||
|
@Requires(property = "kestra.server-type", pattern = "WORKER_AGENT")
|
||||||
|
package io.kestra.worker;
|
||||||
|
|
||||||
|
import io.micronaut.context.annotation.Configuration;
|
||||||
|
import io.micronaut.context.annotation.Requires;
|
||||||
@@ -0,0 +1,89 @@
|
|||||||
|
package io.kestra.worker.processors;
|
||||||
|
|
||||||
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
|
import io.kestra.core.models.flows.State;
|
||||||
|
import io.kestra.core.runners.AbstractWorkerCallable;
|
||||||
|
import io.kestra.core.runners.WorkerJob;
|
||||||
|
import io.kestra.core.runners.WorkerSecurityService;
|
||||||
|
import io.kestra.core.services.LogService;
|
||||||
|
import io.kestra.core.trace.TraceUtils;
|
||||||
|
import io.kestra.core.trace.Tracer;
|
||||||
|
import io.opentelemetry.api.common.Attributes;
|
||||||
|
|
||||||
|
import java.util.ConcurrentModificationException;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
public abstract class AbstractWorkerJobProcessor<T extends WorkerJob> implements WorkerJobProcessor<T> {
|
||||||
|
|
||||||
|
protected final String workerGroup;
|
||||||
|
protected final MetricRegistry metricRegistry;
|
||||||
|
protected final LogService logService;
|
||||||
|
|
||||||
|
private final WorkerSecurityService workerSecurityService;
|
||||||
|
private final Tracer tracer;
|
||||||
|
|
||||||
|
private final AtomicReference<WorkerJob> currentWorkerJob = new AtomicReference<>();
|
||||||
|
private final AtomicReference<AbstractWorkerCallable> currentWorkerCallable = new AtomicReference<>();
|
||||||
|
|
||||||
|
private final AtomicBoolean stopped = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
public AbstractWorkerJobProcessor(String workerGroup,
|
||||||
|
LogService logService,
|
||||||
|
MetricRegistry metricRegistry,
|
||||||
|
WorkerSecurityService workerSecurityService,
|
||||||
|
Tracer tracer) {
|
||||||
|
this.workerGroup = workerGroup;
|
||||||
|
this.tracer = tracer;
|
||||||
|
this.metricRegistry = metricRegistry;
|
||||||
|
this.logService = logService;
|
||||||
|
this.workerSecurityService = workerSecurityService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(final T job) {
|
||||||
|
if (currentWorkerJob.compareAndSet(null, job)) {
|
||||||
|
try {
|
||||||
|
doProcess(job);
|
||||||
|
} finally {
|
||||||
|
currentWorkerJob.set(null);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// avoid miss-use of this class
|
||||||
|
throw new ConcurrentModificationException("Processor can only process one job at a time.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void doProcess(final T job);
|
||||||
|
|
||||||
|
protected io.kestra.core.models.flows.State.Type callJob(AbstractWorkerCallable workerJobCallable) {
|
||||||
|
this.currentWorkerCallable.set(workerJobCallable);
|
||||||
|
try {
|
||||||
|
return tracer.inCurrentContext(
|
||||||
|
workerJobCallable.getRunContext(),
|
||||||
|
workerJobCallable.getType(),
|
||||||
|
Attributes.of(TraceUtils.ATTR_UID, workerJobCallable.getUid()),
|
||||||
|
() -> workerSecurityService.callInSecurityContext(workerJobCallable)
|
||||||
|
);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// should only occur if it fails in the tracing code which should be unexpected
|
||||||
|
// we add the exception to have some log in that case
|
||||||
|
workerJobCallable.setException(e);
|
||||||
|
return State.Type.FAILED;
|
||||||
|
} finally {
|
||||||
|
this.currentWorkerCallable.set(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
if (this.stopped.compareAndSet(false, true)) {
|
||||||
|
Optional.ofNullable(currentWorkerCallable.get()).ifPresent(AbstractWorkerCallable::signalStop);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isStopped() {
|
||||||
|
return this.stopped.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
package io.kestra.worker.processors;
|
||||||
|
|
||||||
|
import io.kestra.core.runners.WorkerJob;
|
||||||
|
import io.micronaut.core.annotation.Blocking;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A processor responsible for executing a specific {@link WorkerJob}.
|
||||||
|
*
|
||||||
|
* @param <T> the type of {@link WorkerJob} to be processed
|
||||||
|
*/
|
||||||
|
public interface WorkerJobProcessor<T extends WorkerJob> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Processes the given {@link WorkerJob}.
|
||||||
|
* <p>
|
||||||
|
* This method will block the calling thread until the job has been completed or terminated.
|
||||||
|
* Only one job may be processed at a time per {@code WorkerJobProcessor} instance.
|
||||||
|
*
|
||||||
|
* @param workerJob the {@link WorkerJob} to be executed
|
||||||
|
*/
|
||||||
|
@Blocking
|
||||||
|
void process(T workerJob);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signals the currently running job to stop, if any.
|
||||||
|
* <p>
|
||||||
|
* If no job is currently running, the method returns immediately without any side effects.
|
||||||
|
*/
|
||||||
|
void stop();
|
||||||
|
}
|
||||||
@@ -0,0 +1,92 @@
|
|||||||
|
package io.kestra.worker.processors;
|
||||||
|
|
||||||
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
|
import io.kestra.core.models.executions.LogEntry;
|
||||||
|
import io.kestra.core.models.executions.MetricEntry;
|
||||||
|
import io.kestra.core.runners.RunContextInitializer;
|
||||||
|
import io.kestra.core.runners.RunContextLoggerFactory;
|
||||||
|
import io.kestra.core.runners.Worker;
|
||||||
|
import io.kestra.core.runners.WorkerJob;
|
||||||
|
import io.kestra.core.runners.WorkerSecurityService;
|
||||||
|
import io.kestra.core.runners.WorkerTask;
|
||||||
|
import io.kestra.core.runners.WorkerTaskResult;
|
||||||
|
import io.kestra.core.runners.WorkerTrigger;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
import io.kestra.core.server.Metric;
|
||||||
|
import io.kestra.core.services.LogService;
|
||||||
|
import io.kestra.core.services.VariablesService;
|
||||||
|
import io.kestra.core.trace.Tracer;
|
||||||
|
import io.kestra.core.trace.TracerFactory;
|
||||||
|
import io.kestra.worker.queues.WorkerLogQueue;
|
||||||
|
import io.kestra.worker.queues.WorkerMetricQueue;
|
||||||
|
import io.kestra.worker.queues.WorkerQueueFactory;
|
||||||
|
import io.kestra.worker.queues.WorkerTaskResultQueue;
|
||||||
|
import io.kestra.worker.queues.WorkerTriggerResultQueue;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
public class WorkerJobProcessorFactory {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private LogService logService;
|
||||||
|
@Inject
|
||||||
|
private MetricRegistry metricRegistry;
|
||||||
|
@Inject
|
||||||
|
private WorkerSecurityService workerSecurityService;
|
||||||
|
@Inject
|
||||||
|
private RunContextInitializer runContextInitializer;
|
||||||
|
@Inject
|
||||||
|
private RunContextLoggerFactory runContextLoggerFactory;
|
||||||
|
@Inject
|
||||||
|
private VariablesService variablesService;
|
||||||
|
|
||||||
|
// QUEUES
|
||||||
|
@Inject
|
||||||
|
private WorkerQueueFactory workerQueueFactory;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private TracerFactory tracerFactory;
|
||||||
|
private Tracer tracer;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
this.tracer = tracerFactory.getTracer(Worker.class, "WORKER");
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public <T extends WorkerJob> WorkerJobProcessor<T> create(String workerId,
|
||||||
|
String workerGroup,
|
||||||
|
T job) {
|
||||||
|
if (job instanceof WorkerTask) {
|
||||||
|
return (WorkerJobProcessor<T>) new WorkerTaskProcessor(
|
||||||
|
workerId,
|
||||||
|
workerGroup,
|
||||||
|
logService,
|
||||||
|
metricRegistry,
|
||||||
|
workerSecurityService,
|
||||||
|
tracer,
|
||||||
|
variablesService,
|
||||||
|
runContextInitializer,
|
||||||
|
runContextLoggerFactory,
|
||||||
|
new WorkerTaskResultQueue.Default(workerQueueFactory.getOrCreate(workerId, WorkerTaskResult.class)),
|
||||||
|
new WorkerMetricQueue.Default(workerQueueFactory.getOrCreate(workerId, MetricEntry.class))
|
||||||
|
);
|
||||||
|
} else if (job instanceof WorkerTrigger) {
|
||||||
|
return (WorkerJobProcessor<T>) new WorkerTriggerProcessor(
|
||||||
|
workerGroup,
|
||||||
|
logService,
|
||||||
|
metricRegistry,
|
||||||
|
workerSecurityService,
|
||||||
|
tracer,
|
||||||
|
runContextInitializer,
|
||||||
|
new WorkerLogQueue.Default(workerQueueFactory.getOrCreate(workerId, LogEntry.class)),
|
||||||
|
new WorkerTriggerResultQueue.Default(workerQueueFactory.getOrCreate(workerId, WorkerTriggerResult.class))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new IllegalArgumentException("Unsupported worker job type [" + job.getClass().getName() + "]");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,447 @@
|
|||||||
|
package io.kestra.worker.processors;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
|
import io.kestra.core.models.executions.MetricEntry;
|
||||||
|
import io.kestra.core.models.executions.TaskRun;
|
||||||
|
import io.kestra.core.models.executions.TaskRunAttempt;
|
||||||
|
import io.kestra.core.models.executions.Variables;
|
||||||
|
import io.kestra.core.models.flows.State;
|
||||||
|
import io.kestra.core.models.tasks.RunnableTask;
|
||||||
|
import io.kestra.core.models.tasks.Task;
|
||||||
|
import io.kestra.core.runners.DefaultRunContext;
|
||||||
|
import io.kestra.core.runners.RunContext;
|
||||||
|
import io.kestra.core.runners.RunContextInitializer;
|
||||||
|
import io.kestra.core.runners.RunContextLogger;
|
||||||
|
import io.kestra.core.runners.RunContextLoggerFactory;
|
||||||
|
import io.kestra.core.runners.WorkerSecurityService;
|
||||||
|
import io.kestra.core.runners.WorkerTask;
|
||||||
|
import io.kestra.core.runners.WorkerTaskCallable;
|
||||||
|
import io.kestra.core.runners.WorkerTaskResult;
|
||||||
|
import io.kestra.core.serializers.JacksonMapper;
|
||||||
|
import io.kestra.core.services.LogService;
|
||||||
|
import io.kestra.core.services.VariablesService;
|
||||||
|
import io.kestra.core.storages.StorageContext;
|
||||||
|
import io.kestra.core.trace.Tracer;
|
||||||
|
import io.kestra.core.utils.Hashing;
|
||||||
|
import io.kestra.core.utils.TruthUtils;
|
||||||
|
import io.kestra.plugin.core.flow.WorkingDirectory;
|
||||||
|
import io.kestra.worker.queues.WorkerMetricQueue;
|
||||||
|
import io.kestra.worker.queues.WorkerTaskResultQueue;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.event.Level;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HexFormat;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.zip.ZipEntry;
|
||||||
|
import java.util.zip.ZipInputStream;
|
||||||
|
import java.util.zip.ZipOutputStream;
|
||||||
|
|
||||||
|
import static io.kestra.core.models.flows.State.Type.CREATED;
|
||||||
|
import static io.kestra.core.models.flows.State.Type.RUNNING;
|
||||||
|
import static io.kestra.core.models.flows.State.Type.SKIPPED;
|
||||||
|
import static io.kestra.core.models.flows.State.Type.SUCCESS;
|
||||||
|
import static io.kestra.core.models.flows.State.Type.WARNING;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class WorkerTaskProcessor extends AbstractWorkerJobProcessor<WorkerTask> {
|
||||||
|
|
||||||
|
private final RunContextInitializer runContextInitializer;
|
||||||
|
private final RunContextLoggerFactory runContextLoggerFactory;
|
||||||
|
private final String workerId;
|
||||||
|
private final String workerGroup;
|
||||||
|
private final VariablesService variablesService;
|
||||||
|
|
||||||
|
private final Map<Long, AtomicInteger> metricRunningCount = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final WorkerTaskResultQueue workerTaskResultQueue;
|
||||||
|
private final WorkerMetricQueue workerMetricQueue;
|
||||||
|
|
||||||
|
public WorkerTaskProcessor(final String workerId,
|
||||||
|
final String workerGroup,
|
||||||
|
final LogService logService,
|
||||||
|
final MetricRegistry metricRegistry,
|
||||||
|
final WorkerSecurityService workerSecurityService,
|
||||||
|
final Tracer tracer,
|
||||||
|
final VariablesService variablesService,
|
||||||
|
final RunContextInitializer runContextInitializer,
|
||||||
|
final RunContextLoggerFactory runContextLoggerFactory,
|
||||||
|
final WorkerTaskResultQueue workerTaskResultQueue,
|
||||||
|
final WorkerMetricQueue workerMetricQueue) {
|
||||||
|
super(workerGroup, logService, metricRegistry, workerSecurityService, tracer);
|
||||||
|
this.runContextInitializer = runContextInitializer;
|
||||||
|
this.runContextLoggerFactory = runContextLoggerFactory;
|
||||||
|
this.workerGroup = workerGroup;
|
||||||
|
this.workerId = workerId;
|
||||||
|
this.variablesService = variablesService;
|
||||||
|
this.workerTaskResultQueue = workerTaskResultQueue;
|
||||||
|
this.workerMetricQueue = workerMetricQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doProcess(final WorkerTask workerTask) {
|
||||||
|
Task task = workerTask.getTask();
|
||||||
|
if (task instanceof RunnableTask) {
|
||||||
|
runTask(workerTask, true);
|
||||||
|
} else if (task instanceof WorkingDirectory workingDirectory) {
|
||||||
|
runWorkingDirectory(workerTask, workingDirectory);
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Unable to process the task '" + task.getId() + "' as it's not a runnable task");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runWorkingDirectory(WorkerTask workerTask, WorkingDirectory workingDirectory) {
|
||||||
|
DefaultRunContext runContext = runContextInitializer.forWorkingDirectory(((DefaultRunContext) workerTask.getRunContext()), workerTask);
|
||||||
|
final RunContext workingDirectoryRunContext = runContext.clone();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// preExecuteTasks
|
||||||
|
try {
|
||||||
|
workingDirectory.preExecuteTasks(workingDirectoryRunContext, workerTask.getTaskRun());
|
||||||
|
} catch (Exception e) {
|
||||||
|
workingDirectoryRunContext.logger().error("Failed preExecuteTasks on WorkingDirectory: {}", e.getMessage(), e);
|
||||||
|
workerTask = workerTask.withTaskRun(workerTask.fail());
|
||||||
|
workerTaskResultQueue.put(new WorkerTaskResult(workerTask.getTaskRun()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute all tasks
|
||||||
|
for (Task currentTask : workingDirectory.getTasks()) {
|
||||||
|
if (Boolean.TRUE.equals(currentTask.getDisabled())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
WorkerTask currentWorkerTask = workingDirectory.workerTask(
|
||||||
|
workerTask.getTaskRun(),
|
||||||
|
currentTask,
|
||||||
|
runContextInitializer.forPlugin(runContext, currentTask)
|
||||||
|
);
|
||||||
|
|
||||||
|
// all tasks will be handled immediately by the worker
|
||||||
|
WorkerTaskResult workerTaskResult = null;
|
||||||
|
try {
|
||||||
|
if (!TruthUtils.isTruthy(runContext.render(currentWorkerTask.getTask().getRunIf()))) {
|
||||||
|
workerTaskResult = new WorkerTaskResult(currentWorkerTask.getTaskRun().withState(SKIPPED));
|
||||||
|
workerTaskResultQueue.put(workerTaskResult);
|
||||||
|
} else {
|
||||||
|
workerTaskResult = this.runTask(currentWorkerTask, false);
|
||||||
|
}
|
||||||
|
} catch (IllegalVariableEvaluationException e) {
|
||||||
|
RunContextLogger contextLogger = runContextLoggerFactory.create(currentWorkerTask);
|
||||||
|
contextLogger.logger().error("Failed evaluating runIf: {}", e.getMessage(), e);
|
||||||
|
workerTaskResultQueue.put(new WorkerTaskResult(workerTask.fail()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (workerTaskResult == null || workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the next RunContext populated with the previous WorkerTaskResult
|
||||||
|
runContext = runContextInitializer.forWorker(runContext.clone(), workerTaskResult, workerTask.getTaskRun());
|
||||||
|
}
|
||||||
|
|
||||||
|
// postExecuteTasks
|
||||||
|
try {
|
||||||
|
workingDirectory.postExecuteTasks(workingDirectoryRunContext, workerTask.getTaskRun());
|
||||||
|
} catch (Exception e) {
|
||||||
|
workingDirectoryRunContext.logger().error("Failed postExecuteTasks on WorkingDirectory: {}", e.getMessage(), e);
|
||||||
|
workerTaskResultQueue.put(new WorkerTaskResult(workerTask.fail()));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
this.logTerminated(workerTask);
|
||||||
|
runContext.cleanup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerTaskResult runTask(WorkerTask workerTask, boolean cleanUp) {
|
||||||
|
String[] metricTags = metricRegistry.tags(workerTask, workerGroup);
|
||||||
|
|
||||||
|
this.metricRegistry
|
||||||
|
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, MetricRegistry.METRIC_WORKER_STARTED_COUNT_DESCRIPTION, metricTags)
|
||||||
|
.increment();
|
||||||
|
|
||||||
|
if (workerTask.getTaskRun().getState().getCurrent() == CREATED) {
|
||||||
|
this.metricRegistry
|
||||||
|
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, MetricRegistry.METRIC_WORKER_QUEUED_DURATION_DESCRIPTION, metricTags)
|
||||||
|
.record(Duration.between(
|
||||||
|
workerTask.getTaskRun().getState().getStartDate(), Instant.now()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// TODO
|
||||||
|
/**
|
||||||
|
if (!Boolean.TRUE.equals(workerTask.getTaskRun().getForceExecution()) && killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
|
||||||
|
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun().withState(KILLED));
|
||||||
|
workerTaskResultQueue.produce(workerTaskResult);
|
||||||
|
|
||||||
|
// We cannot remove the execution ID from the killedExecution in case the worker is processing multiple tasks of the execution
|
||||||
|
// which can happens due to parallel processing.
|
||||||
|
return workerTaskResult;
|
||||||
|
}
|
||||||
|
**/
|
||||||
|
|
||||||
|
logService.logTaskRun(
|
||||||
|
workerTask.getTaskRun(),
|
||||||
|
Level.INFO,
|
||||||
|
"Type {} started",
|
||||||
|
workerTask.getTask().getClass().getSimpleName()
|
||||||
|
);
|
||||||
|
|
||||||
|
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(RUNNING));
|
||||||
|
|
||||||
|
DefaultRunContext runContext = runContextInitializer.forWorker((DefaultRunContext) workerTask.getRunContext(), workerTask);
|
||||||
|
Optional<String> hash = Optional.empty();
|
||||||
|
|
||||||
|
if (workerTask.getTask().getTaskCache() != null && workerTask.getTask().getTaskCache().getEnabled()) {
|
||||||
|
runContext.logger().debug("Task output caching is enabled for task '{}''", workerTask.getTask().getId());
|
||||||
|
hash = hashTask(runContext, workerTask.getTask());
|
||||||
|
if (hash.isPresent()) {
|
||||||
|
try {
|
||||||
|
Optional<InputStream> cacheFile = runContext.storage().getCacheFile(hash.get(), workerTask.getTaskRun().getValue(), workerTask.getTask().getTaskCache().getTtl());
|
||||||
|
if (cacheFile.isPresent()) {
|
||||||
|
runContext.logger().info("Skipping task execution for task '{}' as there is an existing cache entry for it", workerTask.getTask().getId());
|
||||||
|
try (ZipInputStream archive = new ZipInputStream(cacheFile.get())) {
|
||||||
|
if (archive.getNextEntry() != null) {
|
||||||
|
byte[] cache = archive.readAllBytes();
|
||||||
|
Map<String, Object> outputMap = JacksonMapper.ofIon().readValue(cache, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||||
|
Variables variables = variablesService.of(StorageContext.forTask(workerTask.getTaskRun()), outputMap);
|
||||||
|
|
||||||
|
TaskRunAttempt attempt = TaskRunAttempt.builder()
|
||||||
|
.state(new io.kestra.core.models.flows.State().withState(SUCCESS))
|
||||||
|
.workerId(this.workerId)
|
||||||
|
.build();
|
||||||
|
List<TaskRunAttempt> attempts = this.addAttempt(workerTask, attempt);
|
||||||
|
TaskRun taskRun = workerTask.getTaskRun().withAttempts(attempts).withOutputs(variables).withState(SUCCESS);
|
||||||
|
WorkerTaskResult workerTaskResult = new WorkerTaskResult(taskRun);
|
||||||
|
workerTaskResultQueue.put(workerTaskResult);
|
||||||
|
return workerTaskResult;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException | RuntimeException e) {
|
||||||
|
// in case of any exception, log an error and continue
|
||||||
|
runContext.logger().error("Unexpected exception while loading the cache for task '{}', the task will be executed instead.", workerTask.getTask().getId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run
|
||||||
|
workerTask = this.runAttempt(runContext, workerTask);
|
||||||
|
|
||||||
|
// get last state
|
||||||
|
TaskRunAttempt lastAttempt = workerTask.getTaskRun().lastAttempt();
|
||||||
|
if (lastAttempt == null) {
|
||||||
|
throw new IllegalStateException("Can find lastAttempt on taskRun '" +
|
||||||
|
workerTask.getTaskRun().toString(true) + "'"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
io.kestra.core.models.flows.State.Type state = lastAttempt.getState().getCurrent();
|
||||||
|
|
||||||
|
if (workerTask.getTask().getRetry() != null &&
|
||||||
|
workerTask.getTask().getRetry().getWarningOnRetry() &&
|
||||||
|
workerTask.getTaskRun().attemptNumber() > 1 &&
|
||||||
|
state == SUCCESS
|
||||||
|
) {
|
||||||
|
state = WARNING;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (workerTask.getTask().isAllowFailure() && !workerTask.getTaskRun().shouldBeRetried(workerTask.getTask().getRetry()) && state.isFailed()) {
|
||||||
|
state = WARNING;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (workerTask.getTask().isAllowWarning() && WARNING.equals(state)) {
|
||||||
|
state = SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// emit
|
||||||
|
List<WorkerTaskResult> dynamicWorkerResults = workerTask.getRunContext().dynamicWorkerResults();
|
||||||
|
List<TaskRun> dynamicTaskRuns = dynamicWorkerResults(dynamicWorkerResults);
|
||||||
|
|
||||||
|
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
|
||||||
|
|
||||||
|
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
|
||||||
|
workerTaskResultQueue.put(workerTaskResult);
|
||||||
|
|
||||||
|
// upload the cache file, hash may not be present if we didn't succeed in computing it
|
||||||
|
if (workerTask.getTask().getTaskCache() != null && workerTask.getTask().getTaskCache().getEnabled() && hash.isPresent() &&
|
||||||
|
(state == State.Type.SUCCESS || state == State.Type.WARNING)) {
|
||||||
|
runContext.logger().info("Uploading a cache entry for task '{}'", workerTask.getTask().getId());
|
||||||
|
|
||||||
|
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
ZipOutputStream archive = new ZipOutputStream(bos)) {
|
||||||
|
var zipEntry = new ZipEntry("outputs.ion");
|
||||||
|
archive.putNextEntry(zipEntry);
|
||||||
|
archive.write(JacksonMapper.ofIon().writeValueAsBytes(workerTask.getTaskRun().getOutputs()));
|
||||||
|
archive.closeEntry();
|
||||||
|
archive.finish();
|
||||||
|
Path archiveFile = runContext.workingDir().createTempFile(".zip");
|
||||||
|
Files.write(archiveFile, bos.toByteArray());
|
||||||
|
URI uri = runContext.storage().putCacheFile(archiveFile.toFile(), hash.get(), workerTask.getTaskRun().getValue());
|
||||||
|
runContext.logger().debug("Caching entry uploaded in URI {}", uri);
|
||||||
|
} catch (IOException | RuntimeException e) {
|
||||||
|
// in case of any exception, log an error and continue
|
||||||
|
runContext.logger().error("Unexpected exception while uploading the cache entry for task '{}', the task not be cached.", workerTask.getTask().getId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return workerTaskResult;
|
||||||
|
} finally {
|
||||||
|
this.logTerminated(workerTask);
|
||||||
|
|
||||||
|
// remove tmp directory
|
||||||
|
if (cleanUp) {
|
||||||
|
workerTask.getRunContext().cleanup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void logTerminated(WorkerTask workerTask) {
|
||||||
|
final String[] tags = metricRegistry.tags(workerTask, workerGroup);
|
||||||
|
|
||||||
|
metricRegistry
|
||||||
|
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, MetricRegistry.METRIC_WORKER_ENDED_COUNT_DESCRIPTION, tags)
|
||||||
|
.increment();
|
||||||
|
|
||||||
|
metricRegistry
|
||||||
|
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, tags)
|
||||||
|
.record(workerTask.getTaskRun().getState().getDuration());
|
||||||
|
|
||||||
|
logService.logTaskRun(
|
||||||
|
workerTask.getTaskRun(),
|
||||||
|
Level.INFO,
|
||||||
|
"Type {} with state {} completed in {}",
|
||||||
|
workerTask.getTask().getClass().getSimpleName(),
|
||||||
|
workerTask.getTaskRun().getState().getCurrent(),
|
||||||
|
workerTask.getTaskRun().getState().humanDuration()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerTask runAttempt(final RunContext runContext, final WorkerTask workerTask) {
|
||||||
|
Logger logger = runContext.logger();
|
||||||
|
|
||||||
|
if (!(workerTask.getTask() instanceof RunnableTask<?> task)) {
|
||||||
|
// This should never happen but better to deal with it than crashing the Worker
|
||||||
|
var state = State.Type.fail(workerTask.getTask());
|
||||||
|
TaskRunAttempt attempt = TaskRunAttempt.builder()
|
||||||
|
.state(new io.kestra.core.models.flows.State().withState(state))
|
||||||
|
.workerId(this.workerId)
|
||||||
|
.build();
|
||||||
|
List<TaskRunAttempt> attempts = this.addAttempt(workerTask, attempt);
|
||||||
|
TaskRun taskRun = workerTask.getTaskRun().withAttempts(attempts);
|
||||||
|
logger.error("Unable to execute the task '{}': only runnable tasks can be executed by the worker but the task is of type {}", workerTask.getTask().getId(), workerTask.getTask().getClass());
|
||||||
|
return workerTask.withTaskRun(taskRun);
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskRunAttempt.TaskRunAttemptBuilder builder = TaskRunAttempt.builder()
|
||||||
|
.state(new io.kestra.core.models.flows.State().withState(RUNNING))
|
||||||
|
.workerId(this.workerId);
|
||||||
|
|
||||||
|
// emit the attempt so the execution knows that the task is in RUNNING
|
||||||
|
workerTaskResultQueue.put(new WorkerTaskResult(
|
||||||
|
workerTask.getTaskRun()
|
||||||
|
.withAttempts(this.addAttempt(workerTask, builder.build()))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
AtomicInteger metricRunningCount = getMetricRunningCount(workerTask);
|
||||||
|
metricRunningCount.incrementAndGet();
|
||||||
|
|
||||||
|
// run it
|
||||||
|
WorkerTaskCallable workerTaskCallable = new WorkerTaskCallable(workerTask, task, runContext, metricRegistry);
|
||||||
|
io.kestra.core.models.flows.State.Type state = callJob(workerTaskCallable);
|
||||||
|
|
||||||
|
metricRunningCount.decrementAndGet();
|
||||||
|
|
||||||
|
// attempt
|
||||||
|
TaskRunAttempt taskRunAttempt = builder
|
||||||
|
.build()
|
||||||
|
.withState(state)
|
||||||
|
.withLogFile(runContext.logFileURI());
|
||||||
|
|
||||||
|
// metrics
|
||||||
|
runContext.metrics()
|
||||||
|
.stream()
|
||||||
|
.map(metric -> MetricEntry.of(workerTask.getTaskRun(), metric, workerTask.getExecutionKind()))
|
||||||
|
.forEach(workerMetricQueue::put);
|
||||||
|
|
||||||
|
// save outputs
|
||||||
|
List<TaskRunAttempt> attempts = this.addAttempt(workerTask, taskRunAttempt);
|
||||||
|
|
||||||
|
TaskRun taskRun = workerTask.getTaskRun()
|
||||||
|
.withAttempts(attempts);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Variables variables = variablesService.of(StorageContext.forTask(taskRun), workerTaskCallable.getTaskOutput());
|
||||||
|
taskRun = taskRun.withOutputs(variables);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("Unable to save output on taskRun '{}'", taskRun, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return workerTask
|
||||||
|
.withTaskRun(taskRun);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<TaskRunAttempt> addAttempt(WorkerTask workerTask, TaskRunAttempt taskRunAttempt) {
|
||||||
|
return ImmutableList.<TaskRunAttempt>builder()
|
||||||
|
.addAll(workerTask.getTaskRun().getAttempts() == null ? new ArrayList<>() : workerTask.getTaskRun().getAttempts())
|
||||||
|
.add(taskRunAttempt)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<String> hashTask(RunContext runContext, Task task) {
|
||||||
|
try {
|
||||||
|
var map = JacksonMapper.toMap(task);
|
||||||
|
var rMap = runContext.render(map);
|
||||||
|
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
|
||||||
|
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||||
|
digest.update(json);
|
||||||
|
byte[] bytes = digest.digest();
|
||||||
|
return Optional.of(HexFormat.of().formatHex(bytes));
|
||||||
|
} catch (RuntimeException | IllegalVariableEvaluationException | JsonProcessingException |
|
||||||
|
NoSuchAlgorithmException e) {
|
||||||
|
runContext.logger().error("Unable to create the cache key for the task '{}'", task.getId(), e);
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<TaskRun> dynamicWorkerResults(List<WorkerTaskResult> dynamicWorkerResults) {
|
||||||
|
return dynamicWorkerResults
|
||||||
|
.stream()
|
||||||
|
.map(WorkerTaskResult::getTaskRun)
|
||||||
|
.map(taskRun -> taskRun.withDynamic(true))
|
||||||
|
.toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public AtomicInteger getMetricRunningCount(final WorkerTask workerTask) {
|
||||||
|
String[] tags = this.metricRegistry.tags(workerTask, workerGroup);
|
||||||
|
Arrays.sort(tags);
|
||||||
|
|
||||||
|
long index = Hashing.hashToLong(String.join("-", tags));
|
||||||
|
|
||||||
|
return this.metricRunningCount
|
||||||
|
.computeIfAbsent(index, l -> metricRegistry.gauge(
|
||||||
|
MetricRegistry.METRIC_WORKER_RUNNING_COUNT,
|
||||||
|
MetricRegistry.METRIC_WORKER_RUNNING_COUNT_DESCRIPTION,
|
||||||
|
new AtomicInteger(0),
|
||||||
|
tags
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,268 @@
|
|||||||
|
package io.kestra.worker.processors;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
|
import io.kestra.core.models.Label;
|
||||||
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.executions.LogEntry;
|
||||||
|
import io.kestra.core.models.tasks.Output;
|
||||||
|
import io.kestra.core.models.triggers.PollingTriggerInterface;
|
||||||
|
import io.kestra.core.models.triggers.RealtimeTriggerInterface;
|
||||||
|
import io.kestra.core.models.triggers.TriggerService;
|
||||||
|
import io.kestra.core.runners.DefaultRunContext;
|
||||||
|
import io.kestra.core.runners.RunContextInitializer;
|
||||||
|
import io.kestra.core.runners.RunContextLogger;
|
||||||
|
import io.kestra.core.runners.WorkerSecurityService;
|
||||||
|
import io.kestra.core.runners.WorkerTrigger;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerCallable;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerRealtimeCallable;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
import io.kestra.core.services.LabelService;
|
||||||
|
import io.kestra.core.services.LogService;
|
||||||
|
import io.kestra.core.trace.Tracer;
|
||||||
|
import io.kestra.worker.queues.WorkerLogQueue;
|
||||||
|
import io.kestra.worker.queues.WorkerTriggerResultQueue;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.time.DurationFormatUtils;
|
||||||
|
import org.apache.commons.lang3.time.StopWatch;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.event.Level;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static io.kestra.core.models.flows.State.Type.FAILED;
|
||||||
|
import static io.kestra.core.models.flows.State.Type.SUCCESS;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class WorkerTriggerProcessor extends AbstractWorkerJobProcessor<WorkerTrigger> {
|
||||||
|
|
||||||
|
private final Map<String, AtomicInteger> evaluateTriggerRunningCount = new ConcurrentHashMap<>();
|
||||||
|
private final WorkerLogQueue workerLogQueue;
|
||||||
|
private final WorkerTriggerResultQueue workerTriggerResultQueue;
|
||||||
|
private final RunContextInitializer runContextInitializer;
|
||||||
|
|
||||||
|
public WorkerTriggerProcessor(String workerGroup,
|
||||||
|
LogService logService,
|
||||||
|
MetricRegistry metricRegistry,
|
||||||
|
WorkerSecurityService workerSecurityService,
|
||||||
|
Tracer tracer,
|
||||||
|
final RunContextInitializer runContextInitializer,
|
||||||
|
WorkerLogQueue workerLogQueue,
|
||||||
|
WorkerTriggerResultQueue workerTriggerResultQueue) {
|
||||||
|
super(workerGroup, logService, metricRegistry, workerSecurityService, tracer);
|
||||||
|
this.workerLogQueue = workerLogQueue;
|
||||||
|
this.workerTriggerResultQueue = workerTriggerResultQueue;
|
||||||
|
this.runContextInitializer = runContextInitializer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doProcess(WorkerTrigger workerTrigger) {
|
||||||
|
final String[] metricsTags = metricRegistry.tags(workerTrigger, workerGroup);
|
||||||
|
|
||||||
|
this.metricRegistry
|
||||||
|
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT_DESCRIPTION, metricsTags)
|
||||||
|
.increment();
|
||||||
|
|
||||||
|
this.metricRegistry
|
||||||
|
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, metricsTags)
|
||||||
|
.record(() -> {
|
||||||
|
StopWatch stopWatch = new StopWatch();
|
||||||
|
stopWatch.start();
|
||||||
|
|
||||||
|
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
|
||||||
|
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT_DESCRIPTION, new AtomicInteger(0), metricsTags));
|
||||||
|
|
||||||
|
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);
|
||||||
|
|
||||||
|
DefaultRunContext runContext = (DefaultRunContext) workerTrigger.getConditionContext().getRunContext();
|
||||||
|
runContextInitializer.forWorker(runContext, workerTrigger);
|
||||||
|
try {
|
||||||
|
|
||||||
|
logService.logTrigger(
|
||||||
|
workerTrigger.getTriggerContext(),
|
||||||
|
runContext.logger(),
|
||||||
|
Level.INFO,
|
||||||
|
"Type {} started",
|
||||||
|
workerTrigger.getTrigger().getType()
|
||||||
|
);
|
||||||
|
|
||||||
|
if (workerTrigger.getTrigger() instanceof PollingTriggerInterface pollingTrigger) {
|
||||||
|
WorkerTriggerCallable workerCallable = new WorkerTriggerCallable(runContext, workerTrigger, pollingTrigger);
|
||||||
|
io.kestra.core.models.flows.State.Type state = callJob(workerCallable);
|
||||||
|
|
||||||
|
if (workerCallable.getException() != null || !state.equals(SUCCESS)) {
|
||||||
|
this.handleTriggerError(workerTrigger, workerCallable.getException());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!state.equals(FAILED)) {
|
||||||
|
this.publishTriggerExecution(workerTrigger, workerCallable.getEvaluate());
|
||||||
|
}
|
||||||
|
} else if (workerTrigger.getTrigger() instanceof RealtimeTriggerInterface streamingTrigger) {
|
||||||
|
WorkerTriggerRealtimeCallable workerCallable = new WorkerTriggerRealtimeCallable(
|
||||||
|
runContext,
|
||||||
|
workerTrigger,
|
||||||
|
streamingTrigger,
|
||||||
|
throwable -> this.handleTriggerError(workerTrigger, throwable),
|
||||||
|
execution -> this.publishTriggerExecution(workerTrigger, Optional.of(execution))
|
||||||
|
);
|
||||||
|
io.kestra.core.models.flows.State.Type state = callJob(workerCallable);
|
||||||
|
|
||||||
|
// here the realtime trigger fail before the publisher being call so we create a fail execution
|
||||||
|
if (workerCallable.getException() != null || !state.equals(SUCCESS)) {
|
||||||
|
this.handleRealtimeTriggerError(workerTrigger, workerCallable.getException());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
this.handleTriggerError(workerTrigger, e);
|
||||||
|
} finally {
|
||||||
|
logService.logTrigger(
|
||||||
|
workerTrigger.getTriggerContext(),
|
||||||
|
runContext.logger(),
|
||||||
|
Level.INFO,
|
||||||
|
"Type {} completed in {}",
|
||||||
|
workerTrigger.getTrigger().getType(),
|
||||||
|
DurationFormatUtils.formatDurationHMS(stopWatch.getTime(TimeUnit.MILLISECONDS))
|
||||||
|
);
|
||||||
|
|
||||||
|
workerTrigger.getConditionContext().getRunContext().cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(-1);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
metricRegistry
|
||||||
|
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ENDED_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_ENDED_COUNT_DESCRIPTION, metricsTags)
|
||||||
|
.increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleTriggerError(WorkerTrigger workerTrigger, Throwable e) {
|
||||||
|
String[] tags = metricRegistry.tags(workerTrigger, workerGroup);
|
||||||
|
|
||||||
|
metricRegistry
|
||||||
|
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION, tags)
|
||||||
|
.increment();
|
||||||
|
|
||||||
|
logError(workerTrigger, e);
|
||||||
|
Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null)
|
||||||
|
.withState(FAILED) : null;
|
||||||
|
if (execution != null) {
|
||||||
|
RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution)).forEach(workerLogQueue::put);
|
||||||
|
}
|
||||||
|
this.workerTriggerResultQueue.put(
|
||||||
|
WorkerTriggerResult.builder()
|
||||||
|
.triggerContext(workerTrigger.getTriggerContext())
|
||||||
|
.trigger(workerTrigger.getTrigger())
|
||||||
|
.execution(Optional.ofNullable(execution))
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleRealtimeTriggerError(WorkerTrigger workerTrigger, Throwable e) {
|
||||||
|
String[] tags = metricRegistry.tags(workerTrigger, workerGroup);
|
||||||
|
|
||||||
|
this.metricRegistry
|
||||||
|
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION, tags)
|
||||||
|
.increment();
|
||||||
|
|
||||||
|
// We create a FAILED execution, so the user is aware that the realtime trigger failed to be created
|
||||||
|
var execution = TriggerService
|
||||||
|
.generateRealtimeExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), null)
|
||||||
|
.withState(FAILED);
|
||||||
|
|
||||||
|
// We create an ERROR log attached to the execution
|
||||||
|
Logger logger = workerTrigger.getConditionContext().getRunContext().logger();
|
||||||
|
logService.logExecution(
|
||||||
|
execution,
|
||||||
|
logger,
|
||||||
|
Level.ERROR,
|
||||||
|
"[date: {}] Realtime trigger failed to be created in the worker with error: {}",
|
||||||
|
workerTrigger.getTriggerContext().getDate(),
|
||||||
|
e != null ? e.getMessage() : "unknown",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
if (logger.isTraceEnabled() && e != null) {
|
||||||
|
logger.trace(Throwables.getStackTraceAsString(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
this.workerTriggerResultQueue.put(
|
||||||
|
WorkerTriggerResult.builder()
|
||||||
|
.execution(Optional.of(execution))
|
||||||
|
.triggerContext(workerTrigger.getTriggerContext())
|
||||||
|
.trigger(workerTrigger.getTrigger())
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void publishTriggerExecution(WorkerTrigger workerTrigger, Optional<Execution> evaluate) {
|
||||||
|
metricRegistry
|
||||||
|
.counter(
|
||||||
|
MetricRegistry.METRIC_WORKER_TRIGGER_EXECUTION_COUNT,
|
||||||
|
MetricRegistry.METRIC_WORKER_TRIGGER_EXECUTION_COUNT_DESCRIPTION,
|
||||||
|
metricRegistry.tags(workerTrigger, workerGroup)
|
||||||
|
).increment();
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
logService.logTrigger(
|
||||||
|
workerTrigger.getTriggerContext(),
|
||||||
|
Level.DEBUG,
|
||||||
|
"[type: {}] {}",
|
||||||
|
workerTrigger.getTrigger().getType(),
|
||||||
|
evaluate.map(execution -> "New execution '" + execution.getId() + "'").orElse("Empty evaluation")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
var flow = workerTrigger.getConditionContext().getFlow();
|
||||||
|
if (flow.getLabels() != null) {
|
||||||
|
evaluate = evaluate.map(execution -> {
|
||||||
|
List<Label> executionLabels = execution.getLabels() != null ? execution.getLabels() : new ArrayList<>();
|
||||||
|
executionLabels.addAll(LabelService.labelsExcludingSystem(flow));
|
||||||
|
return execution.withLabels(executionLabels);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.workerTriggerResultQueue.put(
|
||||||
|
WorkerTriggerResult.builder()
|
||||||
|
.execution(evaluate)
|
||||||
|
.triggerContext(workerTrigger.getTriggerContext())
|
||||||
|
.trigger(workerTrigger.getTrigger())
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void logError(WorkerTrigger workerTrigger, Throwable e) {
|
||||||
|
Logger logger = workerTrigger.getConditionContext().getRunContext().logger();
|
||||||
|
|
||||||
|
if (e instanceof InterruptedException || (e != null && e.getCause() instanceof InterruptedException)) {
|
||||||
|
logService.logTrigger(
|
||||||
|
workerTrigger.getTriggerContext(),
|
||||||
|
logger,
|
||||||
|
Level.WARN,
|
||||||
|
"[date: {}] Trigger evaluation interrupted in the worker",
|
||||||
|
workerTrigger.getTriggerContext().getDate()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
logService.logTrigger(
|
||||||
|
workerTrigger.getTriggerContext(),
|
||||||
|
logger,
|
||||||
|
Level.WARN,
|
||||||
|
"[date: {}] Trigger evaluation failed in the worker with error: {}",
|
||||||
|
workerTrigger.getTriggerContext().getDate(),
|
||||||
|
e != null ? e.getMessage() : "unknown",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled() && e != null) {
|
||||||
|
logger.trace(Throwables.getStackTraceAsString(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,53 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public abstract class AbstractDelegateWorkerQueue<T> implements WorkerQueue<T> {
|
||||||
|
|
||||||
|
private final WorkerQueue<T> queue;
|
||||||
|
|
||||||
|
public AbstractDelegateWorkerQueue(final WorkerQueue<T> queue) {
|
||||||
|
this.queue = Objects.requireNonNull(queue, "queue must not be null.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public T poll(Duration timeout) throws InterruptedException {
|
||||||
|
return queue.poll(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void put(T event) {
|
||||||
|
queue.put(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int remainingCapacity() {
|
||||||
|
return queue.remainingCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int capacity() {
|
||||||
|
return queue.capacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return queue.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,61 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class InMemoryWorkerQueue<T> implements WorkerQueue<T> {
|
||||||
|
|
||||||
|
private final int capacity;
|
||||||
|
private final LinkedBlockingQueue<T> queue;
|
||||||
|
|
||||||
|
public InMemoryWorkerQueue(int capacity) {
|
||||||
|
this.capacity = capacity;
|
||||||
|
this.queue = new LinkedBlockingQueue<>(capacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public T poll(Duration timeout) throws InterruptedException {
|
||||||
|
return queue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void put(T event) {
|
||||||
|
try {
|
||||||
|
this.queue.put(event);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int remainingCapacity() {
|
||||||
|
return this.queue.remainingCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int capacity() {
|
||||||
|
return this.capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return this.queue.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
|
import io.micrometer.core.instrument.Counter;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
public class MonitoredWorkerQueue<T> extends AbstractDelegateWorkerQueue<T> {
|
||||||
|
|
||||||
|
public static final String QUEUE_SIZE = "queue.size";
|
||||||
|
public static final String QUEUE_REMAINING_CAPACITY = "queue.remaining.capacity";
|
||||||
|
public static final String QUEUE_ENQUEUED = "queue.enqueued";
|
||||||
|
public static final String QUEUE_DEQUEUED = "queue.dequeued";
|
||||||
|
private final MetricRegistry metricRegistry;
|
||||||
|
|
||||||
|
private final Counter enqueuedCounter;
|
||||||
|
private final Counter dequeuedCounter;
|
||||||
|
|
||||||
|
public MonitoredWorkerQueue(MetricRegistry metricRegistry, String queueName, WorkerQueue<T> queue) {
|
||||||
|
super(queue);
|
||||||
|
this.metricRegistry = metricRegistry;
|
||||||
|
|
||||||
|
String[] tags = new String[]{"name", queueName};
|
||||||
|
this.metricRegistry.registerGauge(QUEUE_SIZE, "Current number of items in the queue", this::size, tags);
|
||||||
|
this.metricRegistry.registerGauge(QUEUE_REMAINING_CAPACITY, "Remaining capacity in the queue", this::size, tags);
|
||||||
|
this.enqueuedCounter = this.metricRegistry.counter(QUEUE_ENQUEUED, "Number of items enqueued", tags);
|
||||||
|
this.dequeuedCounter = this.metricRegistry.counter(QUEUE_DEQUEUED, "Number of items dequeued", tags);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public T poll(Duration timeout) throws InterruptedException {
|
||||||
|
T item = super.poll(timeout);
|
||||||
|
dequeuedCounter.increment();
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void put(T event) {
|
||||||
|
super.put(event);
|
||||||
|
enqueuedCounter.increment();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import io.kestra.core.runners.WorkerJob;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface WorkerJobQueue extends WorkerQueue<WorkerJob> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default {@link WorkerJob} implementation
|
||||||
|
*/
|
||||||
|
class Default extends AbstractDelegateWorkerQueue<WorkerJob> implements WorkerJobQueue {
|
||||||
|
public Default(final WorkerQueue<WorkerJob> queue) {
|
||||||
|
super(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import io.kestra.core.models.executions.LogEntry;
|
||||||
|
import io.kestra.core.models.executions.MetricEntry;
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public interface WorkerLogQueue extends WorkerQueue<LogEntry>{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default {@link LogEntry} implementation
|
||||||
|
*/
|
||||||
|
class Default extends AbstractDelegateWorkerQueue<LogEntry> implements WorkerLogQueue {
|
||||||
|
public Default(final WorkerQueue<LogEntry> queue) {
|
||||||
|
super(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import io.kestra.core.models.executions.MetricEntry;
|
||||||
|
|
||||||
|
public interface WorkerMetricQueue extends WorkerQueue<MetricEntry> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default {@link MetricEntry} implementation
|
||||||
|
*/
|
||||||
|
class Default extends AbstractDelegateWorkerQueue<MetricEntry> implements WorkerMetricQueue {
|
||||||
|
public Default(final WorkerQueue<MetricEntry> queue) {
|
||||||
|
super(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents an event queue used for worker intra-processes communication.
|
||||||
|
* <p>
|
||||||
|
* Implementations of this interface are expected to be in-memory oriented.
|
||||||
|
*
|
||||||
|
* @param <T> type of the queue.
|
||||||
|
*/
|
||||||
|
public interface WorkerQueue<T> {
|
||||||
|
|
||||||
|
T poll(Duration timeout) throws InterruptedException;
|
||||||
|
|
||||||
|
void put(T event);
|
||||||
|
|
||||||
|
int remainingCapacity();
|
||||||
|
|
||||||
|
int capacity();
|
||||||
|
|
||||||
|
int size();
|
||||||
|
}
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
public class WorkerQueueFactory {
|
||||||
|
|
||||||
|
public static final int DEFAULT_QUEUE_SIZE = 5000;
|
||||||
|
|
||||||
|
private final Map<QueueKey, WorkerQueue<?>> queues;
|
||||||
|
|
||||||
|
private final MetricRegistry metricRegistry;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public WorkerQueueFactory(final MetricRegistry metricRegistry) {
|
||||||
|
this.queues = new ConcurrentHashMap<>();
|
||||||
|
this.metricRegistry = metricRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public synchronized <T> WorkerQueue<T> getOrCreate(final String workerId, final Class<T> type) {
|
||||||
|
QueueKey key = new QueueKey(workerId, type);
|
||||||
|
return (WorkerQueue<T>) queues.computeIfAbsent(key, unused ->
|
||||||
|
new MonitoredWorkerQueue<T>(metricRegistry, type.getSimpleName().toLowerCase(),
|
||||||
|
new InMemoryWorkerQueue<>(DEFAULT_QUEUE_SIZE)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private record QueueKey(String workerId, Class<?> type) {
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import io.kestra.core.runners.WorkerTaskResult;
|
||||||
|
|
||||||
|
public interface WorkerTaskResultQueue extends WorkerQueue<WorkerTaskResult> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default {@link WorkerTaskResultQueue} implementation
|
||||||
|
*/
|
||||||
|
class Default extends AbstractDelegateWorkerQueue<WorkerTaskResult> implements WorkerTaskResultQueue {
|
||||||
|
public Default(final WorkerQueue<WorkerTaskResult> queue) {
|
||||||
|
super(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
package io.kestra.worker.queues;
|
||||||
|
|
||||||
|
import io.kestra.core.runners.WorkerTriggerResult;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Typed worker queue for {@link WorkerTriggerResult}.
|
||||||
|
*/
|
||||||
|
public interface WorkerTriggerResultQueue extends WorkerQueue<WorkerTriggerResult> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default {@link WorkerTriggerResultQueue} implementation
|
||||||
|
*/
|
||||||
|
class Default extends AbstractDelegateWorkerQueue<WorkerTriggerResult> implements WorkerTriggerResultQueue{
|
||||||
|
public Default(final WorkerQueue<WorkerTriggerResult> queue) {
|
||||||
|
super(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
20
worker/src/main/proto/liveness_controller.proto
Normal file
20
worker/src/main/proto/liveness_controller.proto
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
option java_package = "io.kestra.controller.grpc";
|
||||||
|
option java_multiple_files = true;
|
||||||
|
|
||||||
|
import "request.proto";
|
||||||
|
|
||||||
|
service LivenessControllerService {
|
||||||
|
rpc heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
message HeartbeatRequest {
|
||||||
|
RequestOrResponseHeader header = 1;
|
||||||
|
bytes message = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message HeartbeatResponse {
|
||||||
|
RequestOrResponseHeader header = 1;
|
||||||
|
bytes message = 2;
|
||||||
|
}
|
||||||
16
worker/src/main/proto/request.proto
Normal file
16
worker/src/main/proto/request.proto
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
option java_package = "io.kestra.server.grpc";
|
||||||
|
option java_multiple_files = true;
|
||||||
|
|
||||||
|
// Common request and response header
|
||||||
|
message RequestOrResponseHeader {
|
||||||
|
// The client ID string.
|
||||||
|
string clientId = 1;
|
||||||
|
// The client version.
|
||||||
|
string clientVersion = 2;
|
||||||
|
// The correlation ID of this request.
|
||||||
|
string correlationId = 3;
|
||||||
|
// The format of the message
|
||||||
|
string messageFormat = 4;
|
||||||
|
}
|
||||||
34
worker/src/main/proto/worker_controller.proto
Normal file
34
worker/src/main/proto/worker_controller.proto
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
option java_package = "io.kestra.worker.grpc";
|
||||||
|
option java_multiple_files = true;
|
||||||
|
|
||||||
|
import "request.proto";
|
||||||
|
|
||||||
|
service WorkerControllerService {
|
||||||
|
rpc fetchWorkerJobs(FetchWorkerJobRequest) returns (FetchWorkerJobResponse);
|
||||||
|
|
||||||
|
rpc fetchWorkerJobsStream(FetchWorkerJobRequest) returns (stream FetchWorkerJobResponse);
|
||||||
|
|
||||||
|
rpc sendWorkerJobResults(WorkerJobResultsRequest) returns (WorkerJobResultsResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
message FetchWorkerJobRequest {
|
||||||
|
RequestOrResponseHeader header = 1;
|
||||||
|
bytes message = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message FetchWorkerJobResponse {
|
||||||
|
RequestOrResponseHeader header = 1;
|
||||||
|
bytes message = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message WorkerJobResultsRequest {
|
||||||
|
RequestOrResponseHeader header = 1;
|
||||||
|
bytes message = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message WorkerJobResultsResponse {
|
||||||
|
RequestOrResponseHeader header = 1;
|
||||||
|
bytes message = 2;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user