mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
refactor(core): queue refactor
This commit is contained in:
@@ -2,10 +2,10 @@ package org.floworc.core.queues;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface QueueInterface <T> {
|
||||
boolean emit(QueueMessage<T> message);
|
||||
public interface QueueInterface<T> {
|
||||
void emit(T message);
|
||||
|
||||
void receive(Consumer<QueueMessage<T>> consumer);
|
||||
void receive(Class consumerGroup, Consumer<T> consumer);
|
||||
|
||||
void ack(QueueMessage<T> message);
|
||||
void ack(T message);
|
||||
}
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
package org.floworc.core.queues;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
|
||||
@Value
|
||||
@Builder
|
||||
public class QueueMessage <T> {
|
||||
private String key;
|
||||
|
||||
private T body;
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
package org.floworc.core.queues;
|
||||
|
||||
public enum QueueName {
|
||||
WORKERS,
|
||||
EXECUTIONS,
|
||||
WORKERS_RESULT;
|
||||
|
||||
public boolean isPubSub() {
|
||||
return this == EXECUTIONS;
|
||||
}
|
||||
}
|
||||
@@ -2,60 +2,56 @@ package org.floworc.core.queues.types;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.floworc.core.queues.QueueInterface;
|
||||
import org.floworc.core.queues.QueueMessage;
|
||||
import org.floworc.core.queues.QueueName;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Slf4j
|
||||
public class LocalQueue <T> implements QueueInterface<T> {
|
||||
private static ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
||||
private QueueName topic;
|
||||
private List<QueueMessage<T>> messages = new ArrayList<>();
|
||||
private List<Consumer<QueueMessage<T>>> consumers = new ArrayList<>();
|
||||
private Class<T> cls;
|
||||
private static ExecutorService poolExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
||||
private Map<String, List<Consumer<T>>> consumers = new HashMap<>();
|
||||
|
||||
public LocalQueue(QueueName topic) {
|
||||
this.topic = topic;
|
||||
public LocalQueue(Class<T> cls) {
|
||||
this.cls = cls;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean emit(QueueMessage<T> message) {
|
||||
public void emit(T message) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("New message: topic '{}', key '{}', value {}", this.topic, message.getKey(), message.getBody());
|
||||
log.trace("New message: topic '{}', value {}", this.cls.getName(), message);
|
||||
}
|
||||
|
||||
this.messages.add(message);
|
||||
|
||||
if (this.consumers != null) {
|
||||
if (this.topic.isPubSub()) {
|
||||
this.consumers
|
||||
.forEach(consumers ->
|
||||
poolExecutor.execute(() ->
|
||||
consumers.accept(message)
|
||||
)
|
||||
);
|
||||
} else {
|
||||
this.consumers
|
||||
.forEach((consumerGroup, consumers) -> {
|
||||
poolExecutor.execute(() -> {
|
||||
this.consumers.get((new Random()).nextInt(this.consumers.size())).accept(message);
|
||||
consumers.get((new Random()).nextInt(consumers.size())).accept(message);
|
||||
});
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void receive(Class consumerGroup, Consumer<T> consumer) {
|
||||
if (!this.consumers.containsKey(consumerGroup.getName())) {
|
||||
this.consumers.put(consumerGroup.getName(), new ArrayList<>());
|
||||
}
|
||||
|
||||
return true;
|
||||
this.consumers.get(consumerGroup.getName()).add(consumer);
|
||||
}
|
||||
|
||||
public int getSubscribersCount() {
|
||||
return this.consumers
|
||||
.values()
|
||||
.stream()
|
||||
.map(List::size)
|
||||
.reduce(0, Integer::sum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receive(Consumer<QueueMessage<T>> consumer) {
|
||||
this.consumers.add(consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ack(QueueMessage<T> message) {
|
||||
this.messages.remove(message);
|
||||
public void ack(T message) {
|
||||
// no ack needed with local queues
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import org.floworc.core.models.executions.Execution;
|
||||
import org.floworc.core.models.executions.TaskRun;
|
||||
import org.floworc.core.models.flows.State;
|
||||
import org.floworc.core.queues.QueueInterface;
|
||||
import org.floworc.core.queues.QueueMessage;
|
||||
import org.floworc.core.models.tasks.FlowableTask;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
|
||||
@@ -17,9 +16,9 @@ import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public class ExecutionService {
|
||||
private QueueInterface<WorkerTask> workerTaskResultQueue;
|
||||
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
|
||||
|
||||
public ExecutionService(QueueInterface<WorkerTask> workerTaskResultQueue) {
|
||||
public ExecutionService(QueueInterface<WorkerTaskResult> workerTaskResultQueue) {
|
||||
this.workerTaskResultQueue = workerTaskResultQueue;
|
||||
}
|
||||
|
||||
@@ -127,19 +126,12 @@ public class ExecutionService {
|
||||
|
||||
// all childs are done, continue the main flow
|
||||
if (nexts.isEmpty()) {
|
||||
WorkerTask workerTask = WorkerTask.builder()
|
||||
.taskRun(execution
|
||||
this.workerTaskResultQueue.emit(new WorkerTaskResult(
|
||||
execution
|
||||
.findTaskRunById(parentTaskRun.getId())
|
||||
.withState(execution.hasFailed(childs.get()) ? State.Type.FAILED : State.Type.SUCCESS)
|
||||
)
|
||||
.task(parent)
|
||||
.build();
|
||||
|
||||
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
|
||||
.key(workerTask.getTaskRun().getExecutionId())
|
||||
.body(workerTask)
|
||||
.build()
|
||||
);
|
||||
.withState(execution.hasFailed(childs.get()) ? State.Type.FAILED : State.Type.SUCCESS),
|
||||
parent
|
||||
));
|
||||
|
||||
return Optional.of(new ArrayList<>());
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.floworc.core.models.executions.Execution;
|
||||
import org.floworc.core.models.executions.TaskRun;
|
||||
import org.floworc.core.queues.QueueInterface;
|
||||
import org.floworc.core.queues.QueueMessage;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@@ -12,12 +11,12 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
public class ExecutionState implements Runnable {
|
||||
private final Object lock = new Object();
|
||||
private final QueueInterface<Execution> executionQueue;
|
||||
private final QueueInterface<WorkerTask> workerTaskResultQueue;
|
||||
private final QueueInterface<WorkerTaskResult> workerTaskResultQueue;
|
||||
private static ConcurrentHashMap<String, Execution> executions = new ConcurrentHashMap<>();
|
||||
|
||||
public ExecutionState(
|
||||
QueueInterface<Execution> executionQueue,
|
||||
QueueInterface<WorkerTask> workerTaskResultQueue
|
||||
QueueInterface<WorkerTaskResult> workerTaskResultQueue
|
||||
) {
|
||||
this.executionQueue = executionQueue;
|
||||
this.workerTaskResultQueue = workerTaskResultQueue;
|
||||
@@ -25,21 +24,19 @@ public class ExecutionState implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
this.executionQueue.receive(message -> {
|
||||
this.executionQueue.receive(ExecutionState.class, execution -> {
|
||||
synchronized (lock) {
|
||||
Execution execution = message.getBody();
|
||||
|
||||
if (execution.getState().isTerninated()) {
|
||||
executions.remove(message.getKey());
|
||||
executions.remove(execution.getId());
|
||||
} else {
|
||||
executions.put(message.getKey(), execution);
|
||||
executions.put(execution.getId(), execution);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.workerTaskResultQueue.receive(message -> {
|
||||
this.workerTaskResultQueue.receive(ExecutionState.class, message -> {
|
||||
synchronized (lock) {
|
||||
TaskRun taskRun = message.getBody().getTaskRun();
|
||||
TaskRun taskRun = message.getTaskRun();
|
||||
|
||||
if (!executions.containsKey(taskRun.getExecutionId())) {
|
||||
throw new RuntimeException("Unable to find execution '" + taskRun.getExecutionId() + "' on ExecutionState");
|
||||
@@ -48,12 +45,7 @@ public class ExecutionState implements Runnable {
|
||||
Execution execution = executions.get(taskRun.getExecutionId());
|
||||
Execution newExecution = execution.withTaskRun(taskRun);
|
||||
|
||||
this.executionQueue.emit(
|
||||
QueueMessage.<Execution>builder()
|
||||
.key(newExecution.getId())
|
||||
.body(newExecution)
|
||||
.build()
|
||||
);
|
||||
this.executionQueue.emit(newExecution);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import org.floworc.core.models.executions.TaskRun;
|
||||
import org.floworc.core.models.flows.Flow;
|
||||
import org.floworc.core.models.flows.State;
|
||||
import org.floworc.core.queues.QueueInterface;
|
||||
import org.floworc.core.queues.QueueMessage;
|
||||
import org.floworc.core.repositories.FlowRepositoryInterface;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@@ -33,9 +32,7 @@ public class Executor implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
this.executionQueue.receive(message -> {
|
||||
Execution execution = message.getBody();
|
||||
|
||||
this.executionQueue.receive(Executor.class, execution -> {
|
||||
if (execution.getState().isTerninated()) {
|
||||
return;
|
||||
}
|
||||
@@ -91,23 +88,14 @@ public class Executor implements Runnable {
|
||||
newExecution = newExecution.withState(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
this.executionQueue.emit(
|
||||
QueueMessage.<Execution>builder()
|
||||
.key(newExecution.getId())
|
||||
.body(newExecution)
|
||||
.build()
|
||||
);
|
||||
this.executionQueue.emit(newExecution);
|
||||
|
||||
// submit TaskRun
|
||||
final Execution finalNewExecution = newExecution;
|
||||
nexts.forEach(taskRun -> this.workerTaskQueue.emit(QueueMessage.<WorkerTask>builder()
|
||||
.key(finalNewExecution.getId())
|
||||
.body(WorkerTask.builder()
|
||||
nexts.forEach(taskRun -> this.workerTaskQueue.emit(
|
||||
WorkerTask.builder()
|
||||
.taskRun(taskRun)
|
||||
.task(flow.findTaskById(taskRun.getTaskId()))
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
));
|
||||
}
|
||||
|
||||
@@ -123,11 +111,6 @@ public class Executor implements Runnable {
|
||||
newExecution.getState().humanDuration()
|
||||
);
|
||||
|
||||
this.executionQueue.emit(
|
||||
QueueMessage.<Execution>builder()
|
||||
.key(newExecution.getId())
|
||||
.body(newExecution)
|
||||
.build()
|
||||
);
|
||||
this.executionQueue.emit(newExecution);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,24 +1,21 @@
|
||||
package org.floworc.core.runners;
|
||||
|
||||
import org.floworc.core.models.flows.State;
|
||||
import org.floworc.core.queues.QueueInterface;
|
||||
import org.floworc.core.queues.QueueMessage;
|
||||
import org.floworc.core.models.tasks.RunnableTask;
|
||||
import org.floworc.core.queues.QueueInterface;
|
||||
|
||||
public class Worker implements Runnable {
|
||||
private QueueInterface<WorkerTask> workerTaskQueue;
|
||||
private QueueInterface<WorkerTask> workerTaskResultQueue;
|
||||
private QueueInterface<WorkerTaskResult> workerTaskResultQueue;
|
||||
|
||||
public Worker(QueueInterface<WorkerTask> workerTaskQueue, QueueInterface<WorkerTask> workerTaskResultQueue) {
|
||||
public Worker(QueueInterface<WorkerTask> workerTaskQueue, QueueInterface<WorkerTaskResult> workerTaskResultQueue) {
|
||||
this.workerTaskQueue = workerTaskQueue;
|
||||
this.workerTaskResultQueue = workerTaskResultQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
this.workerTaskQueue.receive(message -> {
|
||||
this.run(message.getBody());
|
||||
});
|
||||
this.workerTaskQueue.receive(Worker.class, this::run);
|
||||
}
|
||||
|
||||
public void run(WorkerTask workerTask) {
|
||||
@@ -29,10 +26,8 @@ public class Worker implements Runnable {
|
||||
workerTask.getTask().getClass().getSimpleName()
|
||||
);
|
||||
|
||||
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
|
||||
.key(workerTask.getTaskRun().getExecutionId())
|
||||
.body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING)))
|
||||
.build()
|
||||
this.workerTaskResultQueue.emit(
|
||||
new WorkerTaskResult(workerTask, State.Type.RUNNING)
|
||||
);
|
||||
|
||||
if (workerTask.getTask() instanceof RunnableTask) {
|
||||
@@ -40,19 +35,13 @@ public class Worker implements Runnable {
|
||||
try {
|
||||
task.run();
|
||||
|
||||
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
|
||||
.key(workerTask.getTaskRun().getExecutionId())
|
||||
.body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.SUCCESS)))
|
||||
.build()
|
||||
this.workerTaskResultQueue.emit(
|
||||
new WorkerTaskResult(workerTask, State.Type.SUCCESS)
|
||||
);
|
||||
} catch (Exception e) {
|
||||
workerTask.logger().error("Failed task", e);
|
||||
|
||||
this.workerTaskResultQueue.emit(QueueMessage.<WorkerTask>builder()
|
||||
.key(workerTask.getTaskRun().getExecutionId())
|
||||
.body(workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.FAILED)))
|
||||
.build()
|
||||
);
|
||||
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask, State.Type.FAILED));
|
||||
}
|
||||
|
||||
workerTask.logger().info(
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package org.floworc.core.runners;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
import lombok.Data;
|
||||
import lombok.experimental.Wither;
|
||||
import org.floworc.core.models.executions.TaskRun;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
@@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
@Value
|
||||
@Data
|
||||
@Builder
|
||||
public class WorkerTask {
|
||||
@NotNull
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
package org.floworc.core.runners;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
import org.floworc.core.models.executions.TaskRun;
|
||||
import org.floworc.core.models.flows.State;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
|
||||
@ToString(callSuper = true)
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class WorkerTaskResult extends WorkerTask {
|
||||
public WorkerTaskResult(TaskRun taskRun, Task task) {
|
||||
super(taskRun, task);
|
||||
}
|
||||
|
||||
public WorkerTaskResult(WorkerTask workerTask, State.Type state) {
|
||||
super(workerTask.getTaskRun().withState(state), workerTask.getTask());
|
||||
}
|
||||
}
|
||||
@@ -3,18 +3,11 @@ package org.floworc.core.runners.types;
|
||||
import com.devskiller.friendly_id.FriendlyId;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.floworc.core.models.executions.Execution;
|
||||
import org.floworc.core.runners.ExecutionService;
|
||||
import org.floworc.core.runners.WorkerTask;
|
||||
import org.floworc.core.models.flows.Flow;
|
||||
import org.floworc.core.models.flows.State;
|
||||
import org.floworc.core.queues.QueueMessage;
|
||||
import org.floworc.core.queues.QueueName;
|
||||
import org.floworc.core.queues.types.LocalQueue;
|
||||
import org.floworc.core.repositories.types.LocalFlowRepository;
|
||||
import org.floworc.core.runners.ExecutionState;
|
||||
import org.floworc.core.runners.Executor;
|
||||
import org.floworc.core.runners.RunnerInterface;
|
||||
import org.floworc.core.runners.Worker;
|
||||
import org.floworc.core.runners.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.Executors;
|
||||
@@ -26,15 +19,15 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
public class StandAloneRunner implements RunnerInterface {
|
||||
private LocalQueue<Execution> executionQueue;
|
||||
private LocalQueue<WorkerTask> workerTaskQueue;
|
||||
private LocalQueue<WorkerTask> workerTaskResultQueue;
|
||||
private LocalQueue<WorkerTaskResult> workerTaskResultQueue;
|
||||
private LocalFlowRepository localRepository;
|
||||
private ThreadPoolExecutor poolExecutor;
|
||||
private ExecutionService executionService;
|
||||
|
||||
public StandAloneRunner(File basePath) {
|
||||
this.executionQueue = new LocalQueue<>(QueueName.EXECUTIONS);
|
||||
this.workerTaskQueue = new LocalQueue<>(QueueName.WORKERS);
|
||||
this.workerTaskResultQueue = new LocalQueue<>(QueueName.WORKERS_RESULT);
|
||||
this.executionQueue = new LocalQueue<>(Execution.class);
|
||||
this.workerTaskQueue = new LocalQueue<>(WorkerTask.class);
|
||||
this.workerTaskResultQueue = new LocalQueue<>(WorkerTaskResult.class);
|
||||
|
||||
this.localRepository = new LocalFlowRepository(basePath);
|
||||
this.executionService = new ExecutionService(this.workerTaskResultQueue);
|
||||
@@ -42,27 +35,43 @@ public class StandAloneRunner implements RunnerInterface {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
poolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
|
||||
poolExecutor.execute(new Executor(
|
||||
this.executionQueue,
|
||||
this.workerTaskQueue,
|
||||
this.localRepository,
|
||||
this.executionService
|
||||
));
|
||||
int processors = Math.max(3, Runtime.getRuntime().availableProcessors());
|
||||
poolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(processors * 4);
|
||||
|
||||
poolExecutor.execute(new ExecutionState(
|
||||
this.executionQueue,
|
||||
this.workerTaskResultQueue
|
||||
));
|
||||
for (int i = 0; i < processors; i++) {
|
||||
poolExecutor.execute(new Executor(
|
||||
this.executionQueue,
|
||||
this.workerTaskQueue,
|
||||
this.localRepository,
|
||||
this.executionService
|
||||
));
|
||||
|
||||
poolExecutor.execute(new ExecutionState(
|
||||
this.executionQueue,
|
||||
this.workerTaskResultQueue
|
||||
));
|
||||
|
||||
while(poolExecutor.getActiveCount() != poolExecutor.getCorePoolSize()) {
|
||||
poolExecutor.execute(new Worker(
|
||||
this.workerTaskQueue,
|
||||
this.workerTaskResultQueue
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// @FIXME: Ugly hack to wait that all thread is created and ready to listen
|
||||
boolean isReady = false;
|
||||
do {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Can't sleep", e);
|
||||
}
|
||||
|
||||
isReady = this.executionQueue.getSubscribersCount() == processors * 2 &&
|
||||
this.workerTaskQueue.getSubscribersCount() == processors &&
|
||||
this.workerTaskResultQueue.getSubscribersCount() == processors;
|
||||
}
|
||||
while (!isReady);
|
||||
}
|
||||
|
||||
public Execution run(Flow flow) throws InterruptedException {
|
||||
this.run();
|
||||
@@ -75,22 +84,17 @@ public class StandAloneRunner implements RunnerInterface {
|
||||
|
||||
AtomicReference<Execution> receive = new AtomicReference<>();
|
||||
|
||||
this.executionQueue.receive(message -> {
|
||||
if (message.getBody().getState().isTerninated()) {
|
||||
receive.set(message.getBody());
|
||||
this.executionQueue.receive(StandAloneRunner.class, current -> {
|
||||
if (current.getState().isTerninated()) {
|
||||
receive.set(current);
|
||||
|
||||
this.poolExecutor.shutdownNow();
|
||||
poolExecutor.shutdown();
|
||||
}
|
||||
});
|
||||
|
||||
this.executionQueue.emit(
|
||||
QueueMessage.<Execution>builder()
|
||||
.key(execution.getId())
|
||||
.body(execution)
|
||||
.build()
|
||||
);
|
||||
this.executionQueue.emit(execution);
|
||||
|
||||
this.poolExecutor.awaitTermination(1, TimeUnit.MINUTES);
|
||||
poolExecutor.awaitTermination(1, TimeUnit.MINUTES);
|
||||
|
||||
return receive.get();
|
||||
}
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package org.floworc.core.tasks;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.floworc.core.runners.types.StandAloneRunner;
|
||||
import org.floworc.core.tasks.scripts.Bash;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class BashTest {
|
||||
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
Bash bash = new Bash(
|
||||
|
||||
Reference in New Issue
Block a user