refactor(core): separate to different module for runner-memory & local repository

This commit is contained in:
tchiotludo
2019-09-23 20:34:50 +02:00
parent c24d8bd697
commit 83c11ee119
24 changed files with 383 additions and 148 deletions

View File

@@ -46,6 +46,7 @@ allprojects {
mavenCentral()
maven { url "https://jcenter.bintray.com" }
maven { url "http://packages.confluent.io/maven/" }
maven { url 'https://jitpack.io' }
}
// micronaut

View File

@@ -8,4 +8,9 @@ dependencies {
// modules
compile project(":core")
compile project(":repository-local")
compile project(":runner-memory")
compile project(":runner-kafka")
}

View File

@@ -1,6 +1,8 @@
package org.floworc.core;
import io.micronaut.configuration.picocli.PicocliRunner;
import org.floworc.core.commands.TestCommand;
import org.floworc.core.commands.WorkerCommand;
import picocli.CommandLine;
import java.util.concurrent.Callable;
@@ -8,10 +10,15 @@ import java.util.concurrent.Callable;
@CommandLine.Command(
name = "floworc",
version = "v0.1",
header = "floworc client",
parameterListHeading = "%nParameters:%n",
optionListHeading = "%nOptions:%n",
commandListHeading = "%nCommands:%n",
mixinStandardHelpOptions = true,
subcommands = {
TestCommand.class,
WorkerCommand.class
}
)
public class App implements Callable<Object> {

View File

@@ -0,0 +1,35 @@
package org.floworc.core.commands;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.serializers.YamlFlowParser;
import org.floworc.runner.memory.MemoryRunner;
import picocli.CommandLine;
import java.io.File;
import java.io.IOException;
@CommandLine.Command(
name = "test",
description = "test a flow"
)
@Slf4j
public class TestCommand implements Runnable {
@CommandLine.Parameters(description = "the flow file to test")
private File file;
private static final YamlFlowParser yamlFlowParser = new YamlFlowParser();
public void run() {
MemoryRunner runner = new MemoryRunner();
Flow flow = null;
try {
flow = yamlFlowParser.parse(file);
Execution execution = runner.run(flow);
} catch (IOException | InterruptedException e) {
log.error("Failed flow", e);
}
}
}

View File

@@ -0,0 +1,31 @@
package org.floworc.core.commands;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.runners.Worker;
import picocli.CommandLine;
import javax.inject.Inject;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@CommandLine.Command(
name = "worker",
description = "start a worker"
)
@Slf4j
public class WorkerCommand implements Runnable {
private ExecutorService poolExecutor = Executors.newCachedThreadPool();
@Inject
private ApplicationContext applicationContext;
@CommandLine.Option(names = {"-t", "--thread"}, description = "the number of concurrent threads to launch")
private int thread = Runtime.getRuntime().availableProcessors();
public void run() {
for (int i = 0; i < thread; i++) {
poolExecutor.execute(applicationContext.getBean(Worker.class));
}
}
}

View File

@@ -7,7 +7,13 @@ dependencies {
// yaml
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.9.2'
compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.9'
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names', version: '2.9.9'
// validations
compile group: 'org.hibernate.validator', name: 'hibernate-validator', version: '6.0.17.Final'
// redis
implementation 'com.github.lettuce-io:lettuce-core:master'
}

View File

@@ -0,0 +1,13 @@
package org.floworc.core.queues;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.runners.WorkerTask;
import org.floworc.core.runners.WorkerTaskResult;
public interface QueueFactoryInterface {
QueueInterface<Execution> execution();
QueueInterface<WorkerTask> workerTask();
QueueInterface<WorkerTaskResult> workerTaskResult();
}

View File

@@ -1,8 +1,9 @@
package org.floworc.core.queues;
import java.io.Closeable;
import java.util.function.Consumer;
public interface QueueInterface<T> {
public interface QueueInterface<T> extends Closeable {
void emit(T message);
void receive(Class consumerGroup, Consumer<T> consumer);

View File

@@ -0,0 +1,7 @@
package org.floworc.core.runners;
import io.micronaut.context.annotation.Prototype;
@Prototype
public interface ExecutionStateInterface extends Runnable {
}

View File

@@ -0,0 +1,49 @@
package org.floworc.core.runners;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Prototype;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.repositories.FlowRepositoryInterface;
import javax.inject.Inject;
import javax.inject.Named;
@Factory
public class RunnerProcessFactory {
@Prototype
@Inject
public Worker worker(
@Named("executionQueue") QueueInterface<Execution> executionQueue,
@Named("workerTaskQueue") QueueInterface<WorkerTask> workerTaskQueue,
@Named("workerTaskResultQueue") QueueInterface<WorkerTaskResult> workerTaskResultQueue
) {
return new Worker(
workerTaskQueue,
workerTaskResultQueue
);
}
@Prototype
public Executor executor(
@Named("executionQueue") QueueInterface<Execution> executionQueue,
@Named("workerTaskQueue") QueueInterface<WorkerTask> workerTaskQueue,
@Named("workerTaskResultQueue") QueueInterface<WorkerTaskResult> workerTaskResultQueue,
FlowRepositoryInterface flowRepository,
ExecutionService executionService
) {
return new Executor(
executionQueue,
workerTaskQueue,
flowRepository,
executionService
);
}
@Prototype
public ExecutionService executionService(
@Named("workerTaskResultQueue") QueueInterface<WorkerTaskResult> workerTaskResultQueue
) {
return new ExecutionService(workerTaskResultQueue);
}
}

View File

@@ -0,0 +1,80 @@
package org.floworc.core.runners;
import com.devskiller.friendly_id.FriendlyId;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.models.flows.State;
import org.floworc.core.queues.QueueInterface;
import javax.inject.Inject;
import javax.inject.Named;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class StandAloneRunner implements RunnerInterface {
private ExecutorService poolExecutor = Executors.newCachedThreadPool();
@Inject
@Named("executionQueue")
protected QueueInterface<Execution> executionQueue;
@Inject
@Named("workerTaskQueue")
protected QueueInterface<WorkerTask> workerTaskQueue;
@Inject
@Named("workerTaskResultQueue")
protected QueueInterface<WorkerTaskResult> workerTaskResultQueue;
@Inject
private ApplicationContext applicationContext;
@Override
public void run() {
int processors = Math.max(3, Runtime.getRuntime().availableProcessors());
for (int i = 0; i < processors; i++) {
poolExecutor.execute(applicationContext.getBean(Executor.class));
poolExecutor.execute(applicationContext.getBean(ExecutionStateInterface.class));
poolExecutor.execute(applicationContext.getBean(Worker.class));
}
}
public Execution runOne(Flow flow) throws InterruptedException, IOException {
this.run();
Execution execution = Execution.builder()
.id(FriendlyId.createFriendlyId())
.flowId(flow.getId())
.state(new State())
.build();
AtomicReference<Execution> receive = new AtomicReference<>();
this.executionQueue.receive(StandAloneRunner.class, current -> {
if (current.getState().isTerninated()) {
receive.set(current);
poolExecutor.shutdown();
}
});
this.executionQueue.emit(execution);
poolExecutor.awaitTermination(1, TimeUnit.MINUTES);
this.executionQueue.close();
this.workerTaskQueue.close();
this.workerTaskResultQueue.close();
return receive.get();
}
}

View File

@@ -1,21 +1,24 @@
package org.floworc.core.runners;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.AllArgsConstructor;
import lombok.Value;
import org.floworc.core.models.executions.TaskRun;
import org.floworc.core.models.flows.State;
import org.floworc.core.models.tasks.Task;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@Data
public class WorkerTaskResult extends WorkerTask {
public WorkerTaskResult(TaskRun taskRun, Task task) {
super(taskRun, task);
}
import javax.validation.constraints.NotNull;
@Value
@AllArgsConstructor
public class WorkerTaskResult {
@NotNull
private TaskRun taskRun;
@NotNull
private Task task;
public WorkerTaskResult(WorkerTask workerTask, State.Type state) {
super(workerTask.getTaskRun().withState(state), workerTask.getTask());
this.taskRun = workerTask.getTaskRun().withState(state);
this.task = workerTask.getTask();
}
}

View File

@@ -1,102 +0,0 @@
package org.floworc.core.runners.types;
import com.devskiller.friendly_id.FriendlyId;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.models.flows.State;
import org.floworc.core.queues.types.LocalQueue;
import org.floworc.core.repositories.types.LocalFlowRepository;
import org.floworc.core.runners.*;
import java.io.File;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class StandAloneRunner implements RunnerInterface {
private LocalQueue<Execution> executionQueue;
private LocalQueue<WorkerTask> workerTaskQueue;
private LocalQueue<WorkerTaskResult> workerTaskResultQueue;
private LocalFlowRepository localRepository;
private ThreadPoolExecutor poolExecutor;
private ExecutionService executionService;
public StandAloneRunner(File basePath) {
this.executionQueue = new LocalQueue<>(Execution.class);
this.workerTaskQueue = new LocalQueue<>(WorkerTask.class);
this.workerTaskResultQueue = new LocalQueue<>(WorkerTaskResult.class);
this.localRepository = new LocalFlowRepository(basePath);
this.executionService = new ExecutionService(this.workerTaskResultQueue);
}
@Override
public void run() {
int processors = Math.max(3, Runtime.getRuntime().availableProcessors());
poolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(processors * 4);
for (int i = 0; i < processors; i++) {
poolExecutor.execute(new Executor(
this.executionQueue,
this.workerTaskQueue,
this.localRepository,
this.executionService
));
poolExecutor.execute(new ExecutionState(
this.executionQueue,
this.workerTaskResultQueue
));
poolExecutor.execute(new Worker(
this.workerTaskQueue,
this.workerTaskResultQueue
));
}
// @FIXME: Ugly hack to wait that all thread is created and ready to listen
boolean isReady = false;
do {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("Can't sleep", e);
}
isReady = this.executionQueue.getSubscribersCount() == processors * 2 &&
this.workerTaskQueue.getSubscribersCount() == processors &&
this.workerTaskResultQueue.getSubscribersCount() == processors;
}
while (!isReady);
}
public Execution run(Flow flow) throws InterruptedException {
this.run();
Execution execution = Execution.builder()
.id(FriendlyId.createFriendlyId())
.flowId(flow.getId())
.state(new State())
.build();
AtomicReference<Execution> receive = new AtomicReference<>();
this.executionQueue.receive(StandAloneRunner.class, current -> {
if (current.getState().isTerninated()) {
receive.set(current);
poolExecutor.shutdown();
}
});
this.executionQueue.emit(execution);
poolExecutor.awaitTermination(1, TimeUnit.MINUTES);
return receive.get();
}
}

View File

@@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.1.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip

View File

@@ -0,0 +1,7 @@
sourceCompatibility = 11
dependencies {
compile project(":core")
testCompile project(':core').sourceSets.test.output
}

View File

@@ -1,9 +1,11 @@
package org.floworc.core.repositories.types;
package org.floworc.repository.local;
import io.micronaut.context.annotation.Value;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.repositories.FlowRepositoryInterface;
import org.floworc.core.serializers.YamlFlowParser;
import javax.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -11,13 +13,12 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Singleton
public class LocalFlowRepository implements FlowRepositoryInterface {
@Value("${floworc.repository.local.base-path}")
private File basePath;
private static final YamlFlowParser yamlFlowParser = new YamlFlowParser();
public LocalFlowRepository(File basePath) {
this.basePath = basePath;
}
private static final YamlFlowParser yamlFlowParser = new YamlFlowParser();
@Override
public Optional<Flow> getFlowById(String id) {

View File

@@ -0,0 +1,8 @@
sourceCompatibility = 11
dependencies {
compile project(":core")
testCompile project(':core').sourceSets.test.output
testCompile project(':repository-local').sourceSets.main.output
}

View File

@@ -1,22 +1,26 @@
package org.floworc.core.runners;
package org.floworc.runner.memory;
import io.micronaut.context.annotation.Prototype;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.executions.TaskRun;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.runners.ExecutionStateInterface;
import org.floworc.core.runners.WorkerTaskResult;
import javax.inject.Named;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ExecutionState implements Runnable {
public class MemoryExecutionState implements ExecutionStateInterface {
private final Object lock = new Object();
private final QueueInterface<Execution> executionQueue;
private final QueueInterface<WorkerTaskResult> workerTaskResultQueue;
private static ConcurrentHashMap<String, Execution> executions = new ConcurrentHashMap<>();
public ExecutionState(
QueueInterface<Execution> executionQueue,
QueueInterface<WorkerTaskResult> workerTaskResultQueue
public MemoryExecutionState(
@Named("executionQueue") QueueInterface<Execution> executionQueue,
@Named("workerTaskResultQueue") QueueInterface<WorkerTaskResult> workerTaskResultQueue
) {
this.executionQueue = executionQueue;
this.workerTaskResultQueue = workerTaskResultQueue;
@@ -24,7 +28,7 @@ public class ExecutionState implements Runnable {
@Override
public void run() {
this.executionQueue.receive(ExecutionState.class, execution -> {
this.executionQueue.receive(MemoryExecutionState.class, execution -> {
synchronized (lock) {
if (execution.getState().isTerninated()) {
executions.remove(execution.getId());
@@ -34,7 +38,7 @@ public class ExecutionState implements Runnable {
}
});
this.workerTaskResultQueue.receive(ExecutionState.class, message -> {
this.workerTaskResultQueue.receive(MemoryExecutionState.class, message -> {
synchronized (lock) {
TaskRun taskRun = message.getTaskRun();

View File

@@ -1,20 +1,21 @@
package org.floworc.core.queues.types;
package org.floworc.runner.memory;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.queues.QueueInterface;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Slf4j
public class LocalQueue <T> implements QueueInterface<T> {
public class MemoryQueue<T> implements QueueInterface<T> {
private Class<T> cls;
private static ExecutorService poolExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private Map<String, List<Consumer<T>>> consumers = new HashMap<>();
public LocalQueue(Class<T> cls) {
public MemoryQueue(Class<T> cls) {
this.cls = cls;
}
@@ -54,4 +55,11 @@ public class LocalQueue <T> implements QueueInterface<T> {
public void ack(T message) {
// no ack needed with local queues
}
@Override
public void close() throws IOException {
if (!poolExecutor.isShutdown()) {
poolExecutor.shutdown();
}
}
}

View File

@@ -0,0 +1,31 @@
package org.floworc.runner.memory;
import io.micronaut.context.annotation.Factory;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.queues.QueueFactoryInterface;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.runners.*;
import javax.inject.Named;
import javax.inject.Singleton;
@Factory
public class MemoryQueueFactory implements QueueFactoryInterface {
@Singleton
@Named("executionQueue")
public QueueInterface<Execution> execution() {
return new MemoryQueue<>(Execution.class);
}
@Singleton
@Named("workerTaskQueue")
public QueueInterface<WorkerTask> workerTask() {
return new MemoryQueue<>(WorkerTask.class);
}
@Singleton
@Named("workerTaskResultQueue")
public QueueInterface<WorkerTaskResult> workerTaskResult() {
return new MemoryQueue<>(WorkerTaskResult.class);
}
}

View File

@@ -0,0 +1,35 @@
package org.floworc.runner.memory;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.runners.StandAloneRunner;
import org.floworc.core.runners.WorkerTask;
import org.floworc.core.runners.WorkerTaskResult;
import javax.inject.Singleton;
@Slf4j
@Singleton
public class MemoryRunner extends StandAloneRunner {
@Override
public void run() {
super.run();
int processors = Math.max(3, Runtime.getRuntime().availableProcessors());
// @FIXME: Ugly hack to wait that all thread is created and ready to listen
boolean isReady;
do {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("Can't sleep", e);
}
isReady = ((MemoryQueue<Execution>) this.executionQueue).getSubscribersCount() == processors * 2 &&
((MemoryQueue<WorkerTask>) this.workerTaskQueue).getSubscribersCount() == processors &&
((MemoryQueue<WorkerTaskResult>) this.workerTaskResultQueue).getSubscribersCount() == processors;
}
while (!isReady);
}
}

View File

@@ -1,32 +1,27 @@
package org.floworc.core.runners;
package org.floworc.runner.memory;
import org.junit.jupiter.api.Test;
import io.micronaut.test.annotation.MicronautTest;
import org.floworc.core.Utils;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.runners.types.StandAloneRunner;
import org.floworc.core.runners.StandAloneRunner;
import org.junit.jupiter.api.Test;
import java.io.File;
import javax.inject.Inject;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Objects;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
class StandAloneRunnerTest {
private final StandAloneRunner runner = new StandAloneRunner(
new File(Objects.requireNonNull(Utils.class.getClassLoader().getResource("flows")).toURI())
);
StandAloneRunnerTest() throws URISyntaxException {
}
@MicronautTest
class MemoryRunnerTest {
@Inject
private StandAloneRunner runner;
@Test
void full() throws IOException, InterruptedException {
Flow flow = Utils.parse("flows/full.yaml");
Execution execution = runner.run(flow);
Execution execution = runner.runOne(flow);
assertThat(execution.getTaskRunList(), hasSize(13));
}
@@ -34,7 +29,7 @@ class StandAloneRunnerTest {
@Test
void errors() throws IOException, InterruptedException {
Flow flow = Utils.parse("flows/errors.yaml");
Execution execution = runner.run(flow);
Execution execution = runner.runOne(flow);
assertThat(execution.getTaskRunList(), hasSize(7));
}

View File

@@ -0,0 +1,4 @@
floworc:
repository:
local:
base-path: ../core/src/test/resources/flows/

View File

@@ -2,3 +2,9 @@ rootProject.name="floworc"
include 'cli'
include 'core'
include 'runner-memory'
include 'runner-kafka'
include 'repository-local'