feat(queue): introduce Kafka Queue

This commit is contained in:
tchiotludo
2019-10-01 23:05:50 +02:00
parent 83c11ee119
commit b4d026574c
61 changed files with 1377 additions and 215 deletions

View File

@@ -1,9 +1,10 @@
buildscript {
ext {
micronautVersion = "1.2.0"
micronautVersion = "1.2.3"
confluentVersion = "5.2.1"
kafkaVersion = "2.3.0"
avroVersion = "1.9.0"
lombokVersion = "1.18.10"
}
}
@@ -15,6 +16,9 @@ plugins {
id "net.ltgt.apt-idea" version "0.21"
id "com.github.johnrengelman.shadow" version "4.0.2"
id "application"
// test
id 'com.adarshr.test-logger' version '1.7.0'
}
idea {
@@ -32,6 +36,7 @@ sourceCompatibility = 11
dependencies {
compile project(":cli")
testCompile project(":repository-memory")
}
@@ -53,7 +58,10 @@ allprojects {
apply plugin:"java"
apply plugin:"net.ltgt.apt-eclipse"
apply plugin:"net.ltgt.apt-idea"
apply plugin: "io.spring.dependency-management"
apply plugin:"io.spring.dependency-management"
// test
apply plugin:"com.adarshr.test-logger"
dependencyManagement {
imports {
@@ -71,8 +79,8 @@ allprojects {
// utils
runtime "ch.qos.logback:logback-classic:1.2.3"
compile group: 'com.google.guava', name: 'guava', version: '27.1-jre'
compileOnly 'org.projectlombok:lombok:1.18.8'
annotationProcessor "org.projectlombok:lombok:1.18.8"
compileOnly 'org.projectlombok:lombok:' + lombokVersion
annotationProcessor "org.projectlombok:lombok:" + lombokVersion
// micronaut
annotationProcessor "io.micronaut:micronaut-inject-java"
@@ -96,6 +104,22 @@ allprojects {
// floworc
compile group: 'com.devskiller.friendly-id', name: 'friendly-id', version: '1.1.0'
}
// test
test {
useJUnitPlatform()
testLogging {
exceptionFormat = "full"
}
}
testlogger {
theme 'mocha-parallel'
showExceptions true
slowThreshold 2000
showStandardStreams true
}
}
/**********************************************************************************************************************\
@@ -115,16 +139,6 @@ run.jvmArgs(
"-Dcom.sun.management.jmxremote",
'-Dmicronaut.environments=dev,override'
)
/**********************************************************************************************************************\
* Test
**********************************************************************************************************************/
test {
useJUnitPlatform()
testLogging {
exceptionFormat = "full"
}
}
/**********************************************************************************************************************\
* Jar

View File

@@ -9,7 +9,7 @@ dependencies {
// modules
compile project(":core")
compile project(":repository-local")
compile project(":repository-memory")
compile project(":runner-memory")
compile project(":runner-kafka")

View File

@@ -1,8 +1,8 @@
package org.floworc.core;
package org.floworc.cli;
import io.micronaut.configuration.picocli.PicocliRunner;
import org.floworc.core.commands.TestCommand;
import org.floworc.core.commands.WorkerCommand;
import org.floworc.cli.commands.TestCommand;
import org.floworc.cli.commands.WorkerCommand;
import picocli.CommandLine;
import java.util.concurrent.Callable;

View File

@@ -0,0 +1,55 @@
package org.floworc.cli.commands;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.repositories.FlowRepositoryInterface;
import org.floworc.core.repositories.LocalFlowRepositoryLoader;
import org.floworc.core.runners.RunnerUtils;
import org.floworc.runner.memory.MemoryRunner;
import picocli.CommandLine;
import javax.inject.Inject;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeoutException;
@CommandLine.Command(
name = "test",
description = "test a flow"
)
@Slf4j
public class TestCommand implements Runnable {
@CommandLine.Parameters(description = "the flow file to test")
private Path file;
@Inject
private MemoryRunner runner;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private RunnerUtils runnerUtils;
public void run() {
try {
runner.run();
repositoryLoader.load(file.toFile());
List<Flow> all = flowRepository.findAll();
if (all.size() != 1) {
throw new IllegalArgumentException("Too many flow found, need 1, found " + all.size());
}
runnerUtils.runOne(all.get(0).getId());
runner.close();
} catch (IOException | TimeoutException e) {
throw new IllegalStateException(e);
}
}
}

View File

@@ -1,4 +1,4 @@
package org.floworc.core.commands;
package org.floworc.cli.commands;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
@@ -27,5 +27,7 @@ public class WorkerCommand implements Runnable {
for (int i = 0; i < thread; i++) {
poolExecutor.execute(applicationContext.getBean(Worker.class));
}
log.info("Workers started with {} thread(s)", this.thread);
}
}

View File

@@ -1,35 +0,0 @@
package org.floworc.core.commands;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.serializers.YamlFlowParser;
import org.floworc.runner.memory.MemoryRunner;
import picocli.CommandLine;
import java.io.File;
import java.io.IOException;
@CommandLine.Command(
name = "test",
description = "test a flow"
)
@Slf4j
public class TestCommand implements Runnable {
@CommandLine.Parameters(description = "the flow file to test")
private File file;
private static final YamlFlowParser yamlFlowParser = new YamlFlowParser();
public void run() {
MemoryRunner runner = new MemoryRunner();
Flow flow = null;
try {
flow = yamlFlowParser.parse(file);
Execution execution = runner.run(flow);
} catch (IOException | InterruptedException e) {
log.error("Failed flow", e);
}
}
}

View File

@@ -1,3 +1,41 @@
micronaut:
application:
name: floworc
floworc:
kafka:
defaults:
topic:
partitions: 64
replication-factor: 1
properties:
compression.type: "lz4"
consumer:
properties:
isolation.level: "read_committed"
auto.offset.reset: "earliest"
enable.auto.commit: "false"
producer:
properties:
acks: "all"
stream:
properties:
processing.guarantee: "exactly_once"
replication.factor: "${floworc.kafka.defaults.topic.replication-factor}"
acks: "all"
topics:
org.floworc.core.models.executions.Execution:
name: "floworc_execution"
properties:
cleanup.policy: "compact"
retention.ms: "-1"
org.floworc.core.runners.WorkerTask:
name: "floworc_workertask"
org.floworc.core.runners.WorkerTaskResult:
name: "floworc_workertaskresult"

View File

@@ -32,16 +32,22 @@
</encoder>
</appender>
<root level="INFO">
<root level="WARN">
<appender-ref ref="STDOUT" />
<appender-ref ref="STDERR" />
</root>
<logger name="org.apache" level="WARN" />
<logger name="io.confluent" level="WARN" />
<logger name="org.floworc" level="INFO" />
<logger name="flow" level="INFO" />
<logger name="org.floworc.runner.kafka.services" level="WARN" />
<!-- The configuration '%s' was supplied but isn't a known config. > https://github.com/apache/kafka/pull/5876 -->
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="ERROR" />
<logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="ERROR" />
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="ERROR" />
<!--- Error registering AppInfo mbean -->
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR" />
</configuration>

View File

@@ -11,9 +11,4 @@ dependencies {
// validations
compile group: 'org.hibernate.validator', name: 'hibernate-validator', version: '6.0.17.Final'
// redis
implementation 'com.github.lettuce-io:lettuce-core:master'
}

View File

@@ -1,6 +1,7 @@
package org.floworc.core.models.executions;
import lombok.*;
import lombok.Builder;
import lombok.Value;
import lombok.experimental.Wither;
import org.floworc.core.models.flows.State;
import org.floworc.core.models.tasks.Task;

View File

@@ -5,6 +5,10 @@ import org.floworc.core.runners.WorkerTask;
import org.floworc.core.runners.WorkerTaskResult;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String WORKERTASK_NAMED = "workerTaskQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
QueueInterface<Execution> execution();
QueueInterface<WorkerTask> workerTask();

View File

@@ -6,7 +6,5 @@ import java.util.function.Consumer;
public interface QueueInterface<T> extends Closeable {
void emit(T message);
void receive(Class consumerGroup, Consumer<T> consumer);
void ack(T message);
Runnable receive(Class consumerGroup, Consumer<T> consumer);
}

View File

@@ -6,7 +6,15 @@ import java.util.List;
import java.util.Optional;
public interface FlowRepositoryInterface {
Optional<Flow> getFlowById(String id);
Optional<Flow> findById(String id);
List<Flow> getFlows();
List<Flow> findAll();
void save(Flow flow);
void insert(Flow flow);
void update(Flow flow);
void delete(Flow flow);
}

View File

@@ -0,0 +1,39 @@
package org.floworc.core.repositories;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.serializers.YamlFlowParser;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@Singleton
public class LocalFlowRepositoryLoader {
private static final YamlFlowParser yamlFlowParser = new YamlFlowParser();
@Inject
private FlowRepositoryInterface flowRepository;
public void load(URL basePath) throws IOException, URISyntaxException {
this.load(new File(basePath.toURI()));
}
public void load(File basePath) throws IOException {
List<Path> list = Files.walk(basePath.toPath())
.filter(path -> com.google.common.io.Files.getFileExtension(path.toString()).equals("yaml"))
.collect(Collectors.toList());
for (Path file: list) {
Flow parse = yamlFlowParser.parse(file.toFile());
flowRepository.save(parse);
}
}
}

View File

@@ -1,7 +1,4 @@
package org.floworc.core.runners;
import io.micronaut.context.annotation.Prototype;
@Prototype
public interface ExecutionStateInterface extends Runnable {
}

View File

@@ -38,7 +38,7 @@ public class Executor implements Runnable {
}
Flow flow = this.flowRepository
.getFlowById(execution.getFlowId())
.findById(execution.getFlowId())
.orElseThrow(() -> new IllegalArgumentException("Invalid flow id '" + execution.getFlowId() + "'"));
this.executionService.getNexts(execution, flow.getTasks())

View File

@@ -3,6 +3,7 @@ package org.floworc.core.runners;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Prototype;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.queues.QueueFactoryInterface;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.repositories.FlowRepositoryInterface;
@@ -14,9 +15,9 @@ public class RunnerProcessFactory {
@Prototype
@Inject
public Worker worker(
@Named("executionQueue") QueueInterface<Execution> executionQueue,
@Named("workerTaskQueue") QueueInterface<WorkerTask> workerTaskQueue,
@Named("workerTaskResultQueue") QueueInterface<WorkerTaskResult> workerTaskResultQueue
@Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface<Execution> executionQueue,
@Named(QueueFactoryInterface.WORKERTASK_NAMED) QueueInterface<WorkerTask> workerTaskQueue,
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) QueueInterface<WorkerTaskResult> workerTaskResultQueue
) {
return new Worker(
workerTaskQueue,
@@ -26,9 +27,9 @@ public class RunnerProcessFactory {
@Prototype
public Executor executor(
@Named("executionQueue") QueueInterface<Execution> executionQueue,
@Named("workerTaskQueue") QueueInterface<WorkerTask> workerTaskQueue,
@Named("workerTaskResultQueue") QueueInterface<WorkerTaskResult> workerTaskResultQueue,
@Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface<Execution> executionQueue,
@Named(QueueFactoryInterface.WORKERTASK_NAMED) QueueInterface<WorkerTask> workerTaskQueue,
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) QueueInterface<WorkerTaskResult> workerTaskResultQueue,
FlowRepositoryInterface flowRepository,
ExecutionService executionService
) {
@@ -42,7 +43,7 @@ public class RunnerProcessFactory {
@Prototype
public ExecutionService executionService(
@Named("workerTaskResultQueue") QueueInterface<WorkerTaskResult> workerTaskResultQueue
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) QueueInterface<WorkerTaskResult> workerTaskResultQueue
) {
return new ExecutionService(workerTaskResultQueue);
}

View File

@@ -0,0 +1,59 @@
package org.floworc.core.runners;
import com.devskiller.friendly_id.FriendlyId;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.models.flows.State;
import org.floworc.core.queues.QueueFactoryInterface;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.repositories.FlowRepositoryInterface;
import org.floworc.core.utils.Await;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@Singleton
public class RunnerUtils {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
private FlowRepositoryInterface flowRepository;
public Execution runOne(String flowId) throws TimeoutException {
return this.runOne(
flowRepository
.findById(flowId)
.orElseThrow(() -> new IllegalArgumentException("Unable to find execution '" + flowId + "'"))
);
}
private Execution runOne(Flow flow) throws TimeoutException {
Execution execution = Execution.builder()
.id(FriendlyId.createFriendlyId())
.flowId(flow.getId())
.state(new State())
.build();
AtomicReference<Execution> receive = new AtomicReference<>();
Runnable cancel = this.executionQueue.receive(StandAloneRunner.class, current -> {
if (current.getId().equals(execution.getId()) && current.getState().isTerninated()) {
receive.set(current);
}
});
this.executionQueue.emit(execution);
Await.until(() -> receive.get() != null, 5 * 1000);
cancel.run();
return receive.get();
}
}

View File

@@ -1,44 +1,48 @@
package org.floworc.core.runners;
import com.devskiller.friendly_id.FriendlyId;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.models.flows.State;
import org.floworc.core.queues.QueueFactoryInterface;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.repositories.FlowRepositoryInterface;
import javax.inject.Inject;
import javax.inject.Named;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class StandAloneRunner implements RunnerInterface {
public class StandAloneRunner implements RunnerInterface, Closeable {
private ExecutorService poolExecutor = Executors.newCachedThreadPool();
@Inject
@Named("executionQueue")
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
@Named("workerTaskQueue")
@Named(QueueFactoryInterface.WORKERTASK_NAMED)
protected QueueInterface<WorkerTask> workerTaskQueue;
@Inject
@Named("workerTaskResultQueue")
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
protected QueueInterface<WorkerTaskResult> workerTaskResultQueue;
@Inject
private ApplicationContext applicationContext;
@Inject
private FlowRepositoryInterface flowRepository;
private boolean running = false;
@Override
public void run() {
int processors = Math.max(3, Runtime.getRuntime().availableProcessors());
this.running = true;
int processors = Math.max(3, Runtime.getRuntime().availableProcessors());
for (int i = 0; i < processors; i++) {
poolExecutor.execute(applicationContext.getBean(Executor.class));
@@ -48,33 +52,16 @@ public class StandAloneRunner implements RunnerInterface {
}
}
public Execution runOne(Flow flow) throws InterruptedException, IOException {
this.run();
Execution execution = Execution.builder()
.id(FriendlyId.createFriendlyId())
.flowId(flow.getId())
.state(new State())
.build();
AtomicReference<Execution> receive = new AtomicReference<>();
this.executionQueue.receive(StandAloneRunner.class, current -> {
if (current.getState().isTerninated()) {
receive.set(current);
poolExecutor.shutdown();
}
});
this.executionQueue.emit(execution);
poolExecutor.awaitTermination(1, TimeUnit.MINUTES);
public boolean isRunning() {
return this.running;
}
@Override
public void close() throws IOException {
this.poolExecutor.shutdown();
this.executionQueue.close();
this.workerTaskQueue.close();
this.workerTaskResultQueue.close();
return receive.get();
}
}

View File

@@ -31,24 +31,27 @@ public class Worker implements Runnable {
);
if (workerTask.getTask() instanceof RunnableTask) {
State.Type state;
RunnableTask task = (RunnableTask) workerTask.getTask();
try {
task.run();
this.workerTaskResultQueue.emit(
new WorkerTaskResult(workerTask, State.Type.SUCCESS)
);
state = State.Type.SUCCESS;
} catch (Exception e) {
workerTask.logger().error("Failed task", e);
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask, State.Type.FAILED));
state = State.Type.FAILED;
}
this.workerTaskResultQueue.emit(
new WorkerTaskResult(workerTask, state)
);
workerTask.logger().info(
"[execution: {}] [taskrun: {}] Task {} completed in {}",
"[execution: {}] [taskrun: {}] Task {} with state {} completed in {}",
workerTask.getTaskRun().getExecutionId(),
workerTask.getTaskRun().getId(),
workerTask.getTask().getClass().getSimpleName(),
state,
workerTask.getTaskRun().getState().humanDuration()
);
}

View File

@@ -0,0 +1,29 @@
package org.floworc.core.serializers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
abstract public class JacksonMapper {
public static ObjectMapper ofJson() {
return JacksonMapper.configure(
new ObjectMapper()
);
}
public static ObjectMapper ofYaml() {
return JacksonMapper.configure(
new ObjectMapper(new YAMLFactory())
);
}
private static ObjectMapper configure(ObjectMapper mapper) {
return mapper
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.registerModule(new JavaTimeModule())
.registerModule(new ParameterNamesModule());
}
}

View File

@@ -15,9 +15,7 @@ import java.io.IOException;
import java.util.Set;
public class YamlFlowParser {
private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory())
.registerModule(new Jdk8Module())
.registerModule(new JavaTimeModule());
private static final ObjectMapper mapper = JacksonMapper.ofYaml();
private static final Validator validator = Validation.byDefaultProvider()
.configure()

View File

@@ -0,0 +1,32 @@
package org.floworc.core.utils;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
public class Await {
public static void until(BooleanSupplier condition) {
while (!condition.getAsBoolean()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException("Can't sleep");
}
}
}
public static void until(BooleanSupplier condition, long timeoutMs) throws TimeoutException {
long start = System.currentTimeMillis();
while (!condition.getAsBoolean()) {
if (System.currentTimeMillis() - start > timeoutMs) {
throw new TimeoutException(String.format("Execution failed to terminate within %s ms", timeoutMs));
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException("Can't sleep");
}
}
}
}
}

View File

@@ -1,11 +1,14 @@
package org.floworc.core;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.repositories.LocalFlowRepositoryLoader;
import org.floworc.core.serializers.YamlFlowParser;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Objects;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -20,4 +23,13 @@ abstract public class Utils {
return yamlFlowParser.parse(file);
}
public static void loads(LocalFlowRepositoryLoader repositoryLoader) throws IOException, URISyntaxException {
Utils.loads(repositoryLoader, "flows/valids");
}
public static void loads(LocalFlowRepositoryLoader repositoryLoader, String path) throws IOException, URISyntaxException {
URL url = Objects.requireNonNull(Utils.class.getClassLoader().getResource(path));
repositoryLoader.load(url);
}
}

View File

@@ -15,7 +15,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
class YamlFlowParserTest {
@Test
void parse() throws IOException {
Flow flow = Utils.parse("flows/full.yaml");
Flow flow = Utils.parse("flows/valids/full.yaml");
assertThat(flow.getId(), is("full"));
assertThat(flow.getTasks().size(), is(5));
@@ -31,11 +31,11 @@ class YamlFlowParserTest {
@Test
void validation() throws IOException {
assertThrows(InvalidDefinitionException.class, () -> {
Utils.parse("flows/invalid.yaml");
Utils.parse("flows/invalids/invalid.yaml");
});
try {
Utils.parse("flows/invalid.yaml");
Utils.parse("flows/invalids/invalid.yaml");
} catch (InvalidDefinitionException e) {
assertThat(e.getViolations().size(), is(3));
}

View File

@@ -20,4 +20,15 @@
<logger name="org.floworc" level="INFO" />
<logger name="flow" level="INFO" />
<logger name="org.floworc.runner.kafka.services" level="WARN" />
<!-- The configuration '%s' was supplied but isn't a known config. > https://github.com/apache/kafka/pull/5876 -->
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="ERROR" />
<logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="ERROR" />
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="ERROR" />
<!--- Error registering AppInfo mbean -->
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR" />
</configuration>

View File

@@ -1,45 +0,0 @@
package org.floworc.repository.local;
import io.micronaut.context.annotation.Value;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.repositories.FlowRepositoryInterface;
import org.floworc.core.serializers.YamlFlowParser;
import javax.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Singleton
public class LocalFlowRepository implements FlowRepositoryInterface {
@Value("${floworc.repository.local.base-path}")
private File basePath;
private static final YamlFlowParser yamlFlowParser = new YamlFlowParser();
@Override
public Optional<Flow> getFlowById(String id) {
File file = new File(this.basePath, id + ".yaml");
try {
return Optional.of(yamlFlowParser.parse(file));
} catch (IOException e) {
return Optional.empty();
}
}
@Override
public List<Flow> getFlows() {
try {
return Files.list(this.basePath.toPath())
.map(path -> this.getFlowById(path.toFile().getName()))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,54 @@
package org.floworc.repository.memory;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.repositories.FlowRepositoryInterface;
import javax.inject.Singleton;
import java.util.*;
@Singleton
public class MemoryFlowRepository implements FlowRepositoryInterface {
private Map<String, Flow> flows = new HashMap<>();
@Override
public Optional<Flow> findById(String id) {
return this.flows.containsKey(id) ? Optional.of(this.flows.get(id)) : Optional.empty();
}
@Override
public List<Flow> findAll() {
return new ArrayList<>(this.flows.values());
}
@Override
public void save(Flow flow) {
this.flows.put(flow.getId(), flow);
}
@Override
public void insert(Flow flow) {
if (this.flows.containsKey(flow.getId())) {
throw new IllegalStateException("Flow " + flow.getId() + " already exists");
}
this.flows.put(flow.getId(), flow);
}
@Override
public void update(Flow flow) {
if (!this.flows.containsKey(flow.getId())) {
throw new IllegalStateException("Flow " + flow.getId() + " already exists");
}
this.flows.put(flow.getId(), flow);
}
@Override
public void delete(Flow flow) {
if (!this.flows.containsKey(flow.getId())) {
throw new IllegalStateException("Flow " + flow.getId() + " already exists");
}
this.flows.remove(flow.getId());
}
}

11
runner-kafka/build.gradle Normal file
View File

@@ -0,0 +1,11 @@
sourceCompatibility = 11
dependencies {
compile project(":core")
compile group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion
compile group: "org.apache.kafka", name: 'kafka-streams', version: kafkaVersion
testCompile project(':core').sourceSets.test.output
testCompile project(':repository-memory').sourceSets.main.output
}

View File

@@ -0,0 +1,66 @@
package org.floworc.runner.kafka;
import io.micronaut.context.annotation.Prototype;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.runners.ExecutionStateInterface;
import org.floworc.core.runners.WorkerTaskResult;
import org.floworc.runner.kafka.serializers.JsonSerde;
import org.floworc.runner.kafka.services.KafkaAdminService;
import org.floworc.runner.kafka.services.KafkaStreamService;
import javax.inject.Inject;
@Slf4j
@KafkaQueueEnabled
@Prototype
public class KafkaExecutionState implements ExecutionStateInterface {
@Inject
KafkaStreamService kafkaStreamService;
@Inject
KafkaAdminService kafkaAdminService;
private Topology topology() {
kafkaAdminService.createIfNotExist(WorkerTaskResult.class);
kafkaAdminService.createIfNotExist(Execution.class);
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(
kafkaAdminService.getTopicName(WorkerTaskResult.class),
Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class))
)
.leftJoin(
builder.table(
kafkaAdminService.getTopicName(Execution.class),
Consumed.with(Serdes.String(), JsonSerde.of(Execution.class)),
Materialized.<String, Execution, KeyValueStore<Bytes, byte[]>>as("execution_join")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde.of(Execution.class))
),
(workerTaskResult, execution) -> execution.withTaskRun(workerTaskResult.getTaskRun())
)
.to(
kafkaAdminService.getTopicName(Execution.class),
Produced.with(Serdes.String(), JsonSerde.of(Execution.class))
);
return builder.build();
}
@Override
public void run() {
KafkaStreamService.Stream stream = kafkaStreamService.of(KafkaExecutionState.class, this.topology());
stream.start();
}
}

View File

@@ -0,0 +1,127 @@
package org.floworc.runner.kafka;
import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.runners.WorkerTask;
import org.floworc.core.runners.WorkerTaskResult;
import org.floworc.runner.kafka.configs.TopicsConfig;
import org.floworc.runner.kafka.serializers.JsonSerde;
import org.floworc.runner.kafka.services.KafkaAdminService;
import org.floworc.runner.kafka.services.KafkaConsumerService;
import org.floworc.runner.kafka.services.KafkaProducerService;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@Slf4j
public class KafkaQueue<T> implements QueueInterface<T> {
private Class<T> cls;
private KafkaConsumerService kafkaConsumerService;
private KafkaProducerService kafkaProducerService;
private TopicsConfig topicsConfig;
private static ExecutorService poolExecutor = Executors.newCachedThreadPool();
public KafkaQueue(Class<T> cls, ApplicationContext applicationContext) {
this.cls = cls;
this.kafkaConsumerService = applicationContext.getBean(KafkaConsumerService.class);
this.kafkaProducerService = applicationContext.getBean(KafkaProducerService.class);
this.topicsConfig = applicationContext
.getBeansOfType(TopicsConfig.class)
.stream()
.filter(r -> r.getCls().equals(this.cls.getName().toLowerCase().replace(".", "-")))
.findFirst()
.orElseThrow();
applicationContext
.getBean(KafkaAdminService.class)
.createIfNotExist(this.cls);
}
private String key(Object object) {
if (this.cls == Execution.class) {
return ((Execution) object).getId();
} else if (this.cls == WorkerTask.class) {
return ((WorkerTask) object).getTaskRun().getExecutionId();
} else if (this.cls == WorkerTaskResult.class) {
return ((WorkerTaskResult) object).getTaskRun().getExecutionId();
} else {
throw new IllegalArgumentException("Unknown type '" + this.cls.getName() + "'");
}
}
@Override
public void emit(T message) {
if (log.isTraceEnabled()) {
log.trace("New message: topic '{}', value {}", topicsConfig.getName(), message);
}
try {
kafkaProducerService
.of(cls, JsonSerde.of(cls))
.send(new ProducerRecord<>(topicsConfig.getName(), this.key(message), message))
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@Override
public Runnable receive(Class consumerGroup, Consumer<T> consumer) {
AtomicBoolean running = new AtomicBoolean(true);
Runnable exitCallback = () -> running.set(false);
poolExecutor.execute(() -> {
KafkaConsumer<String, T> kafkaConsumer = kafkaConsumerService.of(
consumerGroup,
JsonSerde.of(this.cls)
);
kafkaConsumer.subscribe(Collections.singleton(topicsConfig.getName()));
while (running.get()) {
ConsumerRecords<String, T> records = kafkaConsumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
consumer.accept(record.value());
kafkaConsumer.commitSync(
ImmutableMap.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset())
)
);
});
}
});
return exitCallback;
}
public static String getConsumerGroupName(Class group) {
return "floworc_" +
CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE,
group.getSimpleName().replace("Kafka", "")
);
}
@Override
public void close() throws IOException {
if (!poolExecutor.isShutdown()) {
poolExecutor.shutdown();
}
}
}

View File

@@ -0,0 +1,12 @@
package org.floworc.runner.kafka;
import io.micronaut.context.annotation.Requires;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "floworc.queue.type", value = "kafka")
public @interface KafkaQueueEnabled {
}

View File

@@ -0,0 +1,38 @@
package org.floworc.runner.kafka;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Factory;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.queues.QueueFactoryInterface;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.runners.WorkerTask;
import org.floworc.core.runners.WorkerTaskResult;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
@Factory
@KafkaQueueEnabled
public class KafkaQueueFactory implements QueueFactoryInterface {
@Inject
ApplicationContext applicationContext;
@Singleton
@Named(QueueFactoryInterface.EXECUTION_NAMED)
public QueueInterface<Execution> execution() {
return new KafkaQueue<>(Execution.class, applicationContext);
}
@Singleton
@Named(QueueFactoryInterface.WORKERTASK_NAMED)
public QueueInterface<WorkerTask> workerTask() {
return new KafkaQueue<>(WorkerTask.class, applicationContext);
}
@Singleton
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
public QueueInterface<WorkerTaskResult> workerTaskResult() {
return new KafkaQueue<>(WorkerTaskResult.class, applicationContext);
}
}

View File

@@ -0,0 +1,15 @@
package org.floworc.runner.kafka.configs;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.convert.format.MapFormat;
import lombok.Getter;
import java.util.Map;
@ConfigurationProperties("floworc.kafka.client")
@Getter
public class ClientConfig {
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
}

View File

@@ -0,0 +1,15 @@
package org.floworc.runner.kafka.configs;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.convert.format.MapFormat;
import lombok.Getter;
import java.util.Map;
@ConfigurationProperties("floworc.kafka.defaults.consumer")
@Getter
public class ConsumerDefaultsConfig {
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
}

View File

@@ -0,0 +1,15 @@
package org.floworc.runner.kafka.configs;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.convert.format.MapFormat;
import lombok.Getter;
import java.util.Map;
@ConfigurationProperties("floworc.kafka.defaults.producer")
@Getter
public class ProducerDefaultsConfig {
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
}

View File

@@ -0,0 +1,15 @@
package org.floworc.runner.kafka.configs;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.convert.format.MapFormat;
import lombok.Getter;
import java.util.Map;
@ConfigurationProperties("floworc.kafka.defaults.stream")
@Getter
public class StreamDefaultsConfig {
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
}

View File

@@ -0,0 +1,19 @@
package org.floworc.runner.kafka.configs;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.convert.format.MapFormat;
import lombok.Getter;
import java.util.Map;
@ConfigurationProperties("floworc.kafka.defaults.topic")
@Getter
public class TopicDefaultsConfig {
int partitions = 6;
short replicationFactor = 1;
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
}

View File

@@ -0,0 +1,24 @@
package org.floworc.runner.kafka.configs;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.core.convert.format.MapFormat;
import lombok.Getter;
import java.util.Map;
@EachProperty("floworc.kafka.topics")
@Getter
public class TopicsConfig {
String cls;
String name;
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
public TopicsConfig(@Parameter String cls) {
this.cls = cls;
}
}

View File

@@ -0,0 +1,37 @@
package org.floworc.runner.kafka.serializers;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.floworc.core.serializers.JacksonMapper;
import java.io.IOException;
import java.util.Map;
public class JsonDeserializer<T> implements Deserializer<T> {
private static final ObjectMapper mapper = JacksonMapper.ofJson();
private Class<T> cls;
public JsonDeserializer(Class<T> cls) {
super();
this.cls = cls;
}
@Override
public void configure(Map<String, ?> settings, boolean isKey) {
}
@Override
public T deserialize(String topic, byte[] bytes) {
if (null == bytes) {
return null;
}
try {
return mapper.readValue(bytes, this.cls);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}

View File

@@ -0,0 +1,43 @@
package org.floworc.runner.kafka.serializers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class JsonSerde<T> implements Serde<T> {
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
private JsonSerde(Class<T> cls) {
this.deserializer = new JsonDeserializer<>(cls);
this.serializer = new JsonSerializer<>();
}
public static <T> JsonSerde<T> of(Class<T> cls) {
return new JsonSerde<>(cls);
}
@Override
public void configure(Map<String, ?> settings, boolean isKey) {
this.serializer.configure(settings, isKey);
this.deserializer.configure(settings, isKey);
}
@Override
public void close() {
this.deserializer.close();
this.serializer.close();
}
@Override
public Serializer<T> serializer() {
return this.serializer;
}
@Override
public Deserializer<T> deserializer() {
return this.deserializer;
}
}

View File

@@ -0,0 +1,37 @@
package org.floworc.runner.kafka.serializers;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.floworc.core.serializers.JacksonMapper;
import java.util.Map;
public class JsonSerializer<T> extends JacksonMapper implements Serializer<T> {
private static final ObjectMapper mapper = JacksonMapper.ofJson();
public JsonSerializer() {
super();
}
@Override
public void configure(Map<String, ?> settings, boolean isKey) {
}
@Override
public byte[] serialize(String topic, T message) {
if (null == message) {
return null;
}
try {
return mapper.writeValueAsBytes(message);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}
}
public byte[] serialize(T message) {
return this.serialize("", message);
}
}

View File

@@ -0,0 +1,108 @@
package org.floworc.runner.kafka.services;
import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.floworc.runner.kafka.configs.ClientConfig;
import org.floworc.runner.kafka.configs.TopicDefaultsConfig;
import org.floworc.runner.kafka.configs.TopicsConfig;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@Singleton
@Slf4j
public class KafkaAdminService {
@Inject
private ClientConfig clientConfig;
@Inject
private TopicDefaultsConfig topicDefaultsConfig;
@Inject
private List<TopicsConfig> topicsConfig;
public AdminClient of() {
Properties properties = new Properties();
properties.putAll(clientConfig.getProperties());
return AdminClient.create(properties);
}
private TopicsConfig getTopicConfig(Class cls) {
return this.topicsConfig
.stream()
.filter(r -> r.getCls().equals(cls.getName().toLowerCase().replace(".", "-")))
.findFirst()
.orElseThrow();
}
@SuppressWarnings("deprecation")
public boolean createIfNotExist(Class cls) {
TopicsConfig topicConfig = this.getTopicConfig(cls);
AdminClient admin = this.of();
NewTopic newTopic = new NewTopic(
topicConfig.getName(),
topicDefaultsConfig.getPartitions(),
topicDefaultsConfig.getReplicationFactor()
);
Map<String, String> properties = new HashMap<>();
if (topicDefaultsConfig.getProperties() != null) {
properties.putAll(topicDefaultsConfig.getProperties());
}
if (topicConfig.getProperties() != null) {
properties.putAll(topicConfig.getProperties());
}
newTopic.configs(properties);
try {
admin.createTopics(Collections.singletonList(newTopic)).all().get();
log.info("Topic '{}' created", newTopic.name());
return true;
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof TopicExistsException) {
try {
admin
.alterConfigs(new HashMap<>() {{
put(
new ConfigResource(ConfigResource.Type.TOPIC, newTopic.name()),
new org.apache.kafka.clients.admin.Config(
newTopic.configs()
.entrySet()
.stream()
.map(config -> new ConfigEntry(config.getKey(), config.getValue()))
.collect(Collectors.toList())
)
);
}}).all().get();
log.info("Topic Config '{}' updated", newTopic.name());
} catch (InterruptedException | ExecutionException e1) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException(e);
}
}
return false;
}
public String getTopicName(Class cls) {
return this.getTopicConfig(cls).getName();
}
}

View File

@@ -0,0 +1,79 @@
package org.floworc.runner.kafka.services;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.floworc.runner.kafka.KafkaQueue;
import org.floworc.runner.kafka.configs.ClientConfig;
import org.floworc.runner.kafka.configs.ConsumerDefaultsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Collection;
import java.util.Properties;
@Singleton
@Slf4j
public class KafkaConsumerService {
@Inject
private ClientConfig clientConfig;
@Inject
private ConsumerDefaultsConfig consumerConfig;
public <V> Consumer<V> of(Class group, Serde<V> serde) {
Properties properties = new Properties();
properties.putAll(clientConfig.getProperties());
if (this.consumerConfig.getProperties() != null) {
properties.putAll(consumerConfig.getProperties());
}
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, KafkaQueue.getConsumerGroupName(group));
properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaQueue.getConsumerGroupName(group));
return new Consumer<>(properties, serde);
}
public static class Consumer<V> extends KafkaConsumer<String, V> {
protected Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
private Consumer(Properties properties, Serde<V> valueSerde) {
super(properties, new StringDeserializer(), valueSerde.deserializer());
}
@Override
public void subscribe(Collection<String> topics) {
super.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (log.isTraceEnabled()) {
partitions.forEach(topicPartition -> logger.trace(
"Revoke partitions for topic {}, partition {}",
topicPartition.topic(),
topicPartition.partition()
));
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
if (log.isTraceEnabled()) {
partitions.forEach(topicPartition -> logger.trace(
"Switching partitions for topic {}, partition {}",
topicPartition.topic(),
topicPartition.partition()
));
}
}
});
}
}
}

View File

@@ -0,0 +1,41 @@
package org.floworc.runner.kafka.services;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringSerializer;
import org.floworc.runner.kafka.KafkaQueue;
import org.floworc.runner.kafka.configs.ClientConfig;
import org.floworc.runner.kafka.configs.ProducerDefaultsConfig;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Properties;
@Singleton
public class KafkaProducerService {
@Inject
private ClientConfig clientConfig;
@Inject
private ProducerDefaultsConfig producerConfig;
public <V> KafkaProducerService.Producer<V> of(Class name, Serde<V> serde) {
Properties properties = new Properties();
properties.putAll(clientConfig.getProperties());
if (producerConfig.getProperties() != null) {
properties.putAll(producerConfig.getProperties());
}
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, KafkaQueue.getConsumerGroupName(name));
return new KafkaProducerService.Producer<>(properties, serde);
}
public static class Producer<V> extends KafkaProducer<String, V> {
private Producer(Properties properties, Serde<V> valueSerde) {
super(properties, new StringSerializer(), valueSerde.serializer());
}
}
}

View File

@@ -0,0 +1,68 @@
package org.floworc.runner.kafka.services;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
import org.floworc.runner.kafka.KafkaQueue;
import org.floworc.runner.kafka.configs.ClientConfig;
import org.floworc.runner.kafka.configs.StreamDefaultsConfig;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.validation.constraints.NotNull;
import java.time.Duration;
import java.util.Properties;
@Singleton
@Slf4j
public class KafkaStreamService {
@Inject
@NotNull
private ClientConfig clientConfig;
@Inject
private StreamDefaultsConfig streamConfig;
public KafkaStreamService.Stream of(Class group, Topology topology) {
Properties properties = new Properties();
properties.putAll(clientConfig.getProperties());
if (this.streamConfig.getProperties() != null) {
properties.putAll(streamConfig.getProperties());
}
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, KafkaQueue.getConsumerGroupName(group));
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, KafkaQueue.getConsumerGroupName(group));
return new KafkaStreamService.Stream(topology, properties);
}
public static class Stream extends KafkaStreams {
private Stream(Topology topology, Properties props) {
super(topology, props);
}
@Override
public synchronized void start() throws IllegalStateException, StreamsException {
this.setUncaughtExceptionHandler((thread, e) -> {
log.error("Uncaught exception in Kafka Stream " + thread.getName() + ", closing !", e);
System.exit(1);
});
if (log.isTraceEnabled()) {
this.setStateListener((newState, oldState) -> {
log.trace("Switching stream state from {} to {}", oldState, newState);
});
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
this.close(Duration.ofSeconds(10));
}));
super.start();
}
}
}

View File

@@ -0,0 +1,50 @@
package org.floworc.runner.kafka;
import io.micronaut.test.annotation.MicronautTest;
import org.floworc.core.Utils;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.repositories.LocalFlowRepositoryLoader;
import org.floworc.core.runners.RunnerUtils;
import org.floworc.core.runners.StandAloneRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
@MicronautTest
class KafkaRunnerTest {
@Inject
private StandAloneRunner runner;
@Inject
private RunnerUtils runnerUtils;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;
@BeforeEach
private void init() throws IOException, URISyntaxException {
runner.run();
Utils.loads(repositoryLoader);
}
@Test
void full() throws TimeoutException {
Execution execution = runnerUtils.runOne("full");
assertThat(execution.getTaskRunList(), hasSize(13));
}
@Test
void errors() throws TimeoutException {
Execution execution = runnerUtils.runOne("errors");
assertThat(execution.getTaskRunList(), hasSize(7));
}
}

View File

@@ -0,0 +1,42 @@
floworc:
queue:
type: kafka
kafka:
client:
properties:
bootstrap.servers: "kafka:9092"
defaults:
topic:
properties:
compression.type: "lz4"
consumer:
properties:
isolation.level: "read_committed"
auto.offset.reset: "earliest"
enable.auto.commit: "false"
producer:
properties:
acks: "all"
stream:
properties:
processing.guarantee: "exactly_once"
acks: "all"
state.dir: "/tmp/floworc/kafka/state"
topics:
org-floworc-core-models-executions-execution:
name: "floworc_execution"
properties:
cleanup.policy: "compact"
retention.ms: "-1"
org-floworc-core-runners-workertask:
name: "floworc_workertask"
org-floworc-core-runners-workertaskresult:
name: "floworc_workertaskresult"

View File

@@ -4,5 +4,5 @@ dependencies {
compile project(":core")
testCompile project(':core').sourceSets.test.output
testCompile project(':repository-local').sourceSets.main.output
testCompile project(':repository-memory').sourceSets.main.output
}

View File

@@ -4,6 +4,7 @@ import io.micronaut.context.annotation.Prototype;
import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.executions.TaskRun;
import org.floworc.core.queues.QueueFactoryInterface;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.runners.ExecutionStateInterface;
import org.floworc.core.runners.WorkerTaskResult;
@@ -12,6 +13,8 @@ import javax.inject.Named;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Prototype
@MemoryQueueEnabled
public class MemoryExecutionState implements ExecutionStateInterface {
private final Object lock = new Object();
private final QueueInterface<Execution> executionQueue;
@@ -19,8 +22,8 @@ public class MemoryExecutionState implements ExecutionStateInterface {
private static ConcurrentHashMap<String, Execution> executions = new ConcurrentHashMap<>();
public MemoryExecutionState(
@Named("executionQueue") QueueInterface<Execution> executionQueue,
@Named("workerTaskResultQueue") QueueInterface<WorkerTaskResult> workerTaskResultQueue
@Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface<Execution> executionQueue,
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) QueueInterface<WorkerTaskResult> workerTaskResultQueue
) {
this.executionQueue = executionQueue;
this.workerTaskResultQueue = workerTaskResultQueue;

View File

@@ -5,14 +5,27 @@ import org.floworc.core.queues.QueueInterface;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.function.Consumer;
@Slf4j
public class MemoryQueue<T> implements QueueInterface<T> {
private Class<T> cls;
private static ExecutorService poolExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// private static ExecutorService poolExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static int threads = Runtime.getRuntime().availableProcessors();
private static ExecutorService poolExecutor = new ThreadPoolExecutor(
threads,
threads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("sdfsdf");
}
}
);
private Map<String, List<Consumer<T>>> consumers = new HashMap<>();
public MemoryQueue(Class<T> cls) {
@@ -30,17 +43,23 @@ public class MemoryQueue<T> implements QueueInterface<T> {
poolExecutor.execute(() -> {
consumers.get((new Random()).nextInt(consumers.size())).accept(message);
});
});
}
@Override
public synchronized void receive(Class consumerGroup, Consumer<T> consumer) {
public synchronized Runnable receive(Class consumerGroup, Consumer<T> consumer) {
if (!this.consumers.containsKey(consumerGroup.getName())) {
this.consumers.put(consumerGroup.getName(), new ArrayList<>());
}
this.consumers.get(consumerGroup.getName()).add(consumer);
synchronized (this) {
this.consumers.get(consumerGroup.getName()).add(consumer);
int index = this.consumers.get(consumerGroup.getName()).size() - 1;
return () -> {
this.consumers.get(consumerGroup.getName()).remove(index);
};
}
}
public int getSubscribersCount() {
@@ -51,11 +70,6 @@ public class MemoryQueue<T> implements QueueInterface<T> {
.reduce(0, Integer::sum);
}
@Override
public void ack(T message) {
// no ack needed with local queues
}
@Override
public void close() throws IOException {
if (!poolExecutor.isShutdown()) {

View File

@@ -0,0 +1,14 @@
package org.floworc.runner.memory;
import io.micronaut.context.annotation.DefaultImplementation;
import io.micronaut.context.annotation.Requires;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "floworc.queue.type", value = "memory")
@DefaultImplementation
public @interface MemoryQueueEnabled {
}

View File

@@ -10,21 +10,22 @@ import javax.inject.Named;
import javax.inject.Singleton;
@Factory
@MemoryQueueEnabled
public class MemoryQueueFactory implements QueueFactoryInterface {
@Singleton
@Named("executionQueue")
@Named(QueueFactoryInterface.EXECUTION_NAMED)
public QueueInterface<Execution> execution() {
return new MemoryQueue<>(Execution.class);
}
@Singleton
@Named("workerTaskQueue")
@Named(QueueFactoryInterface.WORKERTASK_NAMED)
public QueueInterface<WorkerTask> workerTask() {
return new MemoryQueue<>(WorkerTask.class);
}
@Singleton
@Named("workerTaskResultQueue")
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
public QueueInterface<WorkerTaskResult> workerTaskResult() {
return new MemoryQueue<>(WorkerTaskResult.class);
}

View File

@@ -5,6 +5,7 @@ import org.floworc.core.models.executions.Execution;
import org.floworc.core.runners.StandAloneRunner;
import org.floworc.core.runners.WorkerTask;
import org.floworc.core.runners.WorkerTaskResult;
import org.floworc.core.utils.Await;
import javax.inject.Singleton;
@@ -17,19 +18,10 @@ public class MemoryRunner extends StandAloneRunner {
int processors = Math.max(3, Runtime.getRuntime().availableProcessors());
// @FIXME: Ugly hack to wait that all thread is created and ready to listen
boolean isReady;
do {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("Can't sleep", e);
}
isReady = ((MemoryQueue<Execution>) this.executionQueue).getSubscribersCount() == processors * 2 &&
((MemoryQueue<WorkerTask>) this.workerTaskQueue).getSubscribersCount() == processors &&
((MemoryQueue<WorkerTaskResult>) this.workerTaskResultQueue).getSubscribersCount() == processors;
}
while (!isReady);
// @FIXME: Ugly hack to wait that all threads is created and ready to listen
Await.until(() -> ((MemoryQueue<Execution>) this.executionQueue).getSubscribersCount() == processors * 2 &&
((MemoryQueue<WorkerTask>) this.workerTaskQueue).getSubscribersCount() == processors &&
((MemoryQueue<WorkerTaskResult>) this.workerTaskResultQueue).getSubscribersCount() == processors
);
}
}

View File

@@ -3,12 +3,15 @@ package org.floworc.runner.memory;
import io.micronaut.test.annotation.MicronautTest;
import org.floworc.core.Utils;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.runners.StandAloneRunner;
import org.floworc.core.repositories.LocalFlowRepositoryLoader;
import org.floworc.core.runners.RunnerUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
@@ -16,22 +19,33 @@ import static org.hamcrest.Matchers.hasSize;
@MicronautTest
class MemoryRunnerTest {
@Inject
private StandAloneRunner runner;
private MemoryRunner runner;
@Inject
private RunnerUtils runnerUtils;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;
@BeforeEach
private void init() throws IOException, URISyntaxException {
if (!runner.isRunning()) {
runner.run();
Utils.loads(repositoryLoader);
}
}
@Test
void full() throws IOException, InterruptedException {
Flow flow = Utils.parse("flows/full.yaml");
Execution execution = runner.runOne(flow);
void full() throws TimeoutException {
Execution execution = runnerUtils.runOne("full");
assertThat(execution.getTaskRunList(), hasSize(13));
}
@Test
void errors() throws IOException, InterruptedException {
Flow flow = Utils.parse("flows/errors.yaml");
Execution execution = runner.runOne(flow);
void errors() throws TimeoutException {
Execution execution = runnerUtils.runOne("errors");
assertThat(execution.getTaskRunList(), hasSize(7));
}
}

View File

@@ -1,4 +1,3 @@
floworc:
repository:
local:
base-path: ../core/src/test/resources/flows/
queue:
type: memory

View File

@@ -6,5 +6,5 @@ include 'core'
include 'runner-memory'
include 'runner-kafka'
include 'repository-local'
include 'repository-memory'