feat(storage): initial commit of storage

This commit is contained in:
tchiotludo
2019-10-09 22:48:13 +02:00
parent aa8574ca90
commit bcc5798d7f
47 changed files with 829 additions and 83 deletions

1
.gitignore vendored
View File

@@ -13,5 +13,6 @@ out/
.classpath
.attach*
docker-compose.override.yml
.env
cli/src/main/resources/application-override.yml

View File

@@ -1,7 +1,7 @@
buildscript {
ext {
micronautVersion = "1.2.3"
confluentVersion = "5.2.1"
micronautVersion = "1.2.4"
confluentVersion = "5.3.1"
kafkaVersion = "2.3.0"
avroVersion = "1.9.0"
lombokVersion = "1.18.10"
@@ -10,7 +10,6 @@ buildscript {
plugins {
// micronaut
id "io.spring.dependency-management" version "1.0.6.RELEASE"
id "java"
id "net.ltgt.apt-eclipse" version "0.21"
id "net.ltgt.apt-idea" version "0.21"
@@ -36,7 +35,7 @@ sourceCompatibility = 11
dependencies {
compile project(":cli")
testCompile project(":repository-memory")
testCompile project(":cli")
}
@@ -58,17 +57,10 @@ allprojects {
apply plugin:"java"
apply plugin:"net.ltgt.apt-eclipse"
apply plugin:"net.ltgt.apt-idea"
apply plugin:"io.spring.dependency-management"
// test
apply plugin:"com.adarshr.test-logger"
dependencyManagement {
imports {
mavenBom "io.micronaut:micronaut-bom:" + micronautVersion
}
}
configurations {
// for dependencies that are needed for development only
developmentOnly
@@ -77,29 +69,33 @@ allprojects {
// dependencies
dependencies {
// utils
annotationProcessor "org.projectlombok:lombok:" + lombokVersion
runtime "ch.qos.logback:logback-classic:1.2.3"
compile group: 'com.google.guava', name: 'guava', version: '27.1-jre'
compileOnly 'org.projectlombok:lombok:' + lombokVersion
annotationProcessor "org.projectlombok:lombok:" + lombokVersion
// micronaut
annotationProcessor "io.micronaut:micronaut-inject-java"
annotationProcessor "io.micronaut:micronaut-validation"
compile "io.micronaut:micronaut-runtime"
compile "io.micronaut:micronaut-inject"
compile "io.micronaut:micronaut-validation"
compile "info.picocli:picocli"
compile "io.micronaut.configuration:micronaut-picocli"
compile 'io.micronaut:micronaut-views'
compile 'com.github.jknack:handlebars:4.1.2'
// micronaut
annotationProcessor platform("io.micronaut:micronaut-bom:$micronautVersion")
annotationProcessor "io.micronaut:micronaut-inject-java"
annotationProcessor "io.micronaut:micronaut-validation"
implementation platform("io.micronaut:micronaut-bom:$micronautVersion")
implementation "io.micronaut:micronaut-inject"
implementation "io.micronaut:micronaut-validation"
implementation "io.micronaut:micronaut-runtime"
compile 'io.micronaut:micronaut-views'
// test
testCompile "org.junit.jupiter:junit-jupiter-api"
testRuntime "org.junit.jupiter:junit-jupiter-engine"
testRuntime "org.junit.jupiter:junit-jupiter-params"
testAnnotationProcessor platform("io.micronaut:micronaut-bom:$micronautVersion")
testAnnotationProcessor "io.micronaut:micronaut-inject-java"
testCompile "io.micronaut.test:micronaut-test-junit5"
testCompile "io.micronaut:micronaut-inject-java"
testImplementation platform("io.micronaut:micronaut-bom:$micronautVersion")
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "io.micronaut.test:micronaut-test-junit5"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine"
testRuntime "org.junit.jupiter:junit-jupiter-params"
// hamcrest
testImplementation 'org.hamcrest:hamcrest:2.1'
testImplementation 'org.hamcrest:hamcrest-library:2.1'
testCompile group: 'org.exparity', name: 'hamcrest-date', version: '2.0.7'

View File

@@ -2,6 +2,8 @@ sourceCompatibility = 11
dependencies {
// micronaut
compile "info.picocli:picocli"
compile "io.micronaut.configuration:micronaut-picocli"
compile "io.micronaut:micronaut-management"
compile "io.micronaut:micronaut-http-client"
compile "io.micronaut:micronaut-http-server-netty"
@@ -13,4 +15,8 @@ dependencies {
compile project(":runner-memory")
compile project(":runner-kafka")
compile project(":storage-local")
compile project(":storage-gcs")
compile project(":storage-minio")
}

View File

@@ -66,7 +66,9 @@ public class TestCommand implements Runnable {
runnerUtils.runOne(
all.get(0).getId(),
runnerUtils.typedInputs(all.get(0).getId(), inputs)
(flow, execution) -> {
return runnerUtils.typedInputs(flow, execution, inputs);
}
);
runner.close();
} catch (MissingRequiredInput e) {

View File

@@ -18,4 +18,5 @@ dependencies {
// test
testCompile project(':repository-memory').sourceSets.main.output
testCompile project(':runner-memory').sourceSets.main.output
testCompile project(':storage-local').sourceSets.main.output
}

View File

@@ -1,6 +1,10 @@
package org.floworc.core.exceptions;
public class MissingRequiredInput extends IllegalArgumentException {
public MissingRequiredInput(String message, Throwable e) {
super(message, e);
}
public MissingRequiredInput(String message) {
super(message);
}

View File

@@ -22,6 +22,7 @@ public class Execution {
@Wither
private List<TaskRun> taskRunList;
@Wither
private Map<String, Object> inputs;
@NotNull
@@ -37,6 +38,19 @@ public class Execution {
);
}
public TaskRun findTaskRunByTaskId(String id) {
Optional<TaskRun> find = this.taskRunList
.stream()
.filter(taskRun -> taskRun.getTaskId().equals(id))
.findFirst();
if (find.isEmpty()) {
throw new IllegalArgumentException("Can't find taskrun with task id '" + id + "' on execution '" + this.id + "'");
}
return find.get();
}
public TaskRun findTaskRunById(String id) {
Optional<TaskRun> find = this.taskRunList
.stream()
@@ -44,7 +58,7 @@ public class Execution {
.findFirst();
if (find.isEmpty()) {
throw new IllegalArgumentException("Can't find taskrun with id '" + id + "' on execution '" + this.id + "'");
throw new IllegalArgumentException("Can't find taskrun with taskrun id '" + id + "' on execution '" + this.id + "'");
}
return find.get();

View File

@@ -18,5 +18,6 @@ public class Input {
INT,
FLOAT,
DATETIME,
FILE
}
}

View File

@@ -92,18 +92,17 @@ public class Executor implements Runnable {
this.executionQueue.emit(newExecution);
// submit TaskRun
nexts.forEach(taskRun -> {
Task task = flow.findTaskById(taskRun.getTaskId());
for (TaskRun taskRun: nexts) {
Task task = flow.findTaskById(taskRun.getTaskId());
this.workerTaskQueue.emit(
WorkerTask.builder()
.runContext(new RunContext(execution, taskRun, task))
.taskRun(taskRun)
.task(task)
.build()
);
}
);
this.workerTaskQueue.emit(
WorkerTask.builder()
.runContext(new RunContext(execution, taskRun, task))
.taskRun(taskRun)
.task(task)
.build()
);
}
}
private void onEnd(Flow flow, Execution execution) {

View File

@@ -23,9 +23,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
@NoArgsConstructor
@@ -134,7 +134,7 @@ public class RunContext {
}
public static class ContextAppender extends AppenderBase<ILoggingEvent> {
private final List<ILoggingEvent> events = new ArrayList<>();
private final ConcurrentLinkedQueue<ILoggingEvent> events = new ConcurrentLinkedQueue<>();
@Override
public void start() {

View File

@@ -9,11 +9,13 @@ import org.floworc.core.models.flows.State;
import org.floworc.core.queues.QueueFactoryInterface;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.repositories.FlowRepositoryInterface;
import org.floworc.core.storages.StorageInterface;
import org.floworc.core.utils.Await;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
@@ -21,6 +23,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -34,15 +37,14 @@ public class RunnerUtils {
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private StorageInterface storageInterface;
public Execution runOne(String flowId) throws TimeoutException {
return this.runOne(flowId, null, null);
}
public Map<String, Object> typedInputs(String flowId, Map<String, String> in) {
Flow flow = flowRepository
.findById(flowId)
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'"));
public Map<String, Object> typedInputs(Flow flow, Execution execution, Map<String, String> in) {
return flow
.getInputs()
.stream()
@@ -63,21 +65,35 @@ public class RunnerUtils {
input.getName(),
current
));
case INT:
return Optional.of(new AbstractMap.SimpleEntry<String, Object>(
input.getName(),
Integer.valueOf(current)
));
case FLOAT:
return Optional.of(new AbstractMap.SimpleEntry<String, Object>(
input.getName(),
Float.valueOf(current)
));
case DATETIME:
return Optional.of(new AbstractMap.SimpleEntry<String, Object>(
input.getName(),
Instant.parse(current)
));
case FILE:
try {
return Optional.of(new AbstractMap.SimpleEntry<String, Object>(
input.getName(),
storageInterface.from(flow, execution, input, new File(current))
));
} catch (Exception e) {
throw new MissingRequiredInput("Invalid input for '" + input.getName() + "'", e);
}
default:
throw new MissingRequiredInput("Invalid input type '" + input.getType() + "' for '" + input.getName() + "'");
}
@@ -87,11 +103,11 @@ public class RunnerUtils {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public Execution runOne(String flowId, Map<String, Object> inputs) throws TimeoutException {
public Execution runOne(String flowId, BiFunction<Flow, Execution, Map<String, Object>> inputs) throws TimeoutException {
return this.runOne(flowId, inputs, null);
}
public Execution runOne(String flowId, Map<String, Object> inputs, Duration duration) throws TimeoutException {
public Execution runOne(String flowId, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException {
return this.runOne(
flowRepository
.findById(flowId)
@@ -101,29 +117,34 @@ public class RunnerUtils {
);
}
private Execution runOne(Flow flow, Map<String, Object> inputs, Duration duration) throws TimeoutException {
private Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException {
if (duration == null) {
duration = Duration.ofSeconds(5);
}
Execution execution = Execution.builder()
.id(FriendlyId.createFriendlyId())
.inputs(inputs)
.flowId(flow.getId())
.state(new State())
.build();
if (inputs != null) {
execution = execution.withInputs(inputs.apply(flow, execution));
}
final String executionId = execution.getId();
AtomicReference<Execution> receive = new AtomicReference<>();
Runnable cancel = this.executionQueue.receive(StandAloneRunner.class, current -> {
if (current.getId().equals(execution.getId()) && current.getState().isTerninated()) {
if (current.getId().equals(executionId) && current.getState().isTerninated()) {
receive.set(current);
}
});
this.executionQueue.emit(execution);
Await.until(() -> receive.get() != null, duration);
Await.until(() -> receive.get() != null, null, duration);
cancel.run();

View File

@@ -0,0 +1,37 @@
package org.floworc.core.storages;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.models.flows.Input;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
public interface StorageInterface {
InputStream get(URI uri) throws FileNotFoundException;
StorageObject put(URI uri, InputStream data) throws IOException;
default StorageObject from(Flow flow, Execution execution, Input input, File file) throws IOException {
try {
URI uri = new URI(String.join(
"/",
Arrays.asList(
flow.getNamespace().replace(".", "/"),
flow.getId(),
"executions",
execution.getId(),
"inputs",
input.getName(),
file.getName()
)
));
return this.put(uri, new FileInputStream(file));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,37 @@
package org.floworc.core.storages;
import com.google.common.io.CharStreams;
import lombok.AllArgsConstructor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
@AllArgsConstructor
public class StorageObject {
private StorageInterface storageInterface;
private URI uri;
public URI getUri() {
return uri;
}
public String getContent() throws IOException {
InputStreamReader inputStreamReader = new InputStreamReader(storageInterface.get(this.uri));
String content = CharStreams.toString(inputStreamReader);
inputStreamReader.close();;
return content;
}
public InputStream getInputStream() throws FileNotFoundException {
return storageInterface.get(this.uri);
}
@Override
public String toString() {
return this.uri.toString();
}
}

View File

@@ -5,24 +5,38 @@ import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
public class Await {
private static final Duration defaultSleep = Duration.ofMillis(100);
public static void until(BooleanSupplier condition) {
Await.until(condition, null);
}
public static void until(BooleanSupplier condition, Duration sleep) {
if (sleep == null) {
sleep = defaultSleep;
}
while (!condition.getAsBoolean()) {
try {
Thread.sleep(100);
Thread.sleep(sleep.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException("Can't sleep");
}
}
}
public static void until(BooleanSupplier condition, Duration duration) throws TimeoutException {
public static void until(BooleanSupplier condition, Duration sleep, Duration timeout) throws TimeoutException {
if (sleep == null) {
sleep = defaultSleep;
}
long start = System.currentTimeMillis();
while (!condition.getAsBoolean()) {
if (System.currentTimeMillis() - start > duration.toMillis()) {
throw new TimeoutException(String.format("Execution failed to terminate within %s", duration));
if (System.currentTimeMillis() - start > timeout.toMillis()) {
throw new TimeoutException(String.format("Execution failed to terminate within %s", timeout));
} else {
try {
Thread.sleep(100);
Thread.sleep(sleep.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException("Can't sleep");
}

View File

@@ -1,53 +1,116 @@
package org.floworc.core.runners;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import org.floworc.core.AbstractMemoryRunnerTest;
import org.floworc.core.models.executions.Execution;
import org.floworc.core.models.flows.State;
import org.floworc.core.repositories.FlowRepositoryInterface;
import org.floworc.core.storages.StorageObject;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
@SuppressWarnings("OptionalGetWithoutIsPresent")
class InputsTest extends AbstractMemoryRunnerTest {
private Map<String, String> inputs = ImmutableMap.of(
"string", "myString",
"int", "42",
"float", "42.42",
"instant", "2019-10-06T18:27:49Z"
"instant", "2019-10-06T18:27:49Z",
"file", Objects.requireNonNull(InputsTest.class.getClassLoader().getResource("application.yml")).getPath()
);
@Inject
private FlowRepositoryInterface flowRepository;
private Map<String, Object> typedInputs(Map<String, String> map) {
return runnerUtils.typedInputs(
flowRepository.findById("inputs").get(),
Execution.builder()
.id("test")
.build(),
map
);
}
@Test
void missingRequired() {
assertThrows(IllegalArgumentException.class, () -> {
runnerUtils.typedInputs("inputs", new HashMap<>());
typedInputs(new HashMap<>());
});
}
@Test
void inputString() {
Map<String, Object> typeds = runnerUtils.typedInputs("inputs", this.inputs);
Map<String, Object> typeds = typedInputs(this.inputs);
assertThat(typeds.get("string"), is("myString"));
}
@Test
void inputInt() {
Map<String, Object> typeds = runnerUtils.typedInputs("inputs", this.inputs);
Map<String, Object> typeds = typedInputs(this.inputs);
assertThat(typeds.get("int"), is(42));
}
@Test
void inputFloat() {
Map<String, Object> typeds = runnerUtils.typedInputs("inputs", this.inputs);
Map<String, Object> typeds = typedInputs(this.inputs);
assertThat(typeds.get("float"), is(42.42F));
}
@Test
void inputInstant() {
Map<String, Object> typeds = runnerUtils.typedInputs("inputs", this.inputs);
Map<String, Object> typeds = typedInputs(this.inputs);
assertThat(typeds.get("instant"), is(Instant.parse("2019-10-06T18:27:49Z")));
}
@Test
void inputFile() throws URISyntaxException, IOException {
Map<String, Object> typeds = typedInputs(this.inputs);
StorageObject file = (StorageObject) typeds.get("file");
assertThat(file.getUri(), is(new URI("org/floworc/tests/inputs/executions/test/inputs/file/application.yml")));
assertThat(file.getClass(), is(StorageObject.class));
assertThat(
file.getContent(),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(this.inputs.get("file")))))
);
assertThat(
CharStreams.toString(new InputStreamReader(file.getInputStream())),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(this.inputs.get("file")))))
);
}
@Test
void inputFlow() throws TimeoutException {
Execution execution = runnerUtils.runOne(
"inputs",
(flow, execution1) -> runnerUtils.typedInputs(flow, execution1, this.inputs)
);
assertThat(execution.getTaskRunList(), hasSize(7));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Arrays.asList("file-uri", "file").forEach(o ->
assertThat(
(String) execution.findTaskRunByTaskId(o).getOutputs().get("return"),
matchesRegex("org/floworc/tests/inputs/executions/.*/inputs/file/application.yml")
)
);
}
}

View File

@@ -21,12 +21,12 @@ class YamlFlowParserTest {
assertThat(flow.getId(), is("full"));
assertThat(flow.getTasks().size(), is(5));
// third with all optionnals
Task optionnals = flow.getTasks().get(2);
assertThat(optionnals.getTimeout(), is(1000));
assertThat(optionnals.getRetry().getInterval().getSeconds(), is(900L));
assertThat(optionnals.getRetry().getType(), is(RetryIntervalType.CONSTANT));
assertThat(optionnals.getRetry().getLimit(), is(5));
// third with all optionals
Task optionals = flow.getTasks().get(2);
assertThat(optionals.getTimeout(), is(1000));
assertThat(optionals.getRetry().getInterval().getSeconds(), is(900L));
assertThat(optionals.getRetry().getType(), is(RetryIntervalType.CONSTANT));
assertThat(optionals.getRetry().getLimit(), is(5));
}
@Test

View File

@@ -0,0 +1,64 @@
package org.floworc.core.storages;
import com.google.common.io.CharStreams;
import io.micronaut.test.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.util.Objects;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
@MicronautTest
abstract public class AbstractLocalStorageTest {
@Inject
StorageInterface storageInterface;
private StorageObject putFile(URL resource, String Path) throws Exception {
return storageInterface.put(
new URI(Path),
new FileInputStream(Objects.requireNonNull(resource).getFile())
);
}
@Test
void get() throws Exception {
URL resource = AbstractLocalStorageTest.class.getClassLoader().getResource("application.yml");
this.putFile(resource, "file/storage/get.yml");
InputStream get = storageInterface.get(new URI("file/storage/get.yml"));
assertThat(
CharStreams.toString(new InputStreamReader(get)),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile()))))
);
}
@Test
void missing() {
assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(new URI("file/storage/missing.yml"));
});
}
@Test
void put() throws Exception {
URL resource = AbstractLocalStorageTest.class.getClassLoader().getResource("application.yml");
StorageObject put = this.putFile(resource, "file/storage/put.yml");
InputStream get = storageInterface.get(new URI("file/storage/put.yml"));
// assertThat(put.getUri(), is(new StorageObject(new URI("file/storage/get.yml"))));
assertThat(
CharStreams.toString(new InputStreamReader(get)),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile()))))
);
}
}

View File

@@ -1,3 +1,7 @@
floworc:
queue:
type: memory
storage:
type: local
local:
base-path: /tmp/unittest

View File

@@ -6,6 +6,7 @@ namespace: org.floworc.tests
# expression: 42 4 1 * *
# backfill:
# start: 2018-01-01
# depend-on-past: false
#
tasks:
@@ -20,7 +21,7 @@ tasks:
- id: 3rd
type: org.floworc.core.tasks.debugs.Echo
format: third all optionnal args {{ outputs.2nd.return }}
format: third all optional args {{ outputs.2nd.return }}
timeout: 1000
retry:
limit: 5

View File

@@ -4,15 +4,17 @@ namespace: org.floworc.tests
inputs:
- name: string
type: STRING
- name: optionnal
- name: optional
type: STRING
required: true
required: false
- name: int
type: INT
- name: float
type: FLOAT
- name: instant
type: DATETIME
- name: file
type: FILE
tasks:
- id: string
@@ -27,3 +29,12 @@ tasks:
- id: instant
type: org.floworc.core.tasks.debugs.Return
format: "{{inputs.instant}}"
- id: file-content
type: org.floworc.core.tasks.debugs.Return
format: "{{inputs.file.content}}"
- id: file
type: org.floworc.core.tasks.debugs.Return
format: "{{inputs.file}}"
- id: file-uri
type: org.floworc.core.tasks.debugs.Return
format: "{{inputs.file.uri}}"

18
docker-compose-dev.yml Normal file
View File

@@ -0,0 +1,18 @@
version: "3.6"
volumes:
minio-data:
driver: local
services:
minio:
image: minio/minio
volumes:
- minio-data:/data
command: server /data
environment:
MINIO_ACCESS_KEY: AKIAIOSFODNN7EXAMPLE
MINIO_SECRET_KEY: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
ports:
- 127.8.9.13:9000:9000

View File

@@ -8,4 +8,5 @@ dependencies {
testCompile project(':core').sourceSets.test.output
testCompile project(':repository-memory').sourceSets.main.output
testCompile project(':storage-local').sourceSets.main.output
}

View File

@@ -1,6 +1,10 @@
floworc:
queue:
type: kafka
storage:
type: local
local:
base-path: /tmp/unittest
kafka:
client:
@@ -26,7 +30,7 @@ floworc:
properties:
processing.guarantee: "exactly_once"
acks: "all"
state.dir: "/tmp/floworc/kafka/state"
state.dir: "/tmp/"
topics:
org-floworc-core-models-executions-execution:

View File

@@ -5,4 +5,5 @@ dependencies {
testCompile project(':core').sourceSets.test.output
testCompile project(':repository-memory').sourceSets.main.output
testCompile project(':storage-local').sourceSets.main.output
}

View File

@@ -8,15 +8,16 @@ import org.floworc.core.queues.QueueFactoryInterface;
import org.floworc.core.queues.QueueInterface;
import org.floworc.core.runners.ExecutionStateInterface;
import org.floworc.core.runners.WorkerTaskResult;
import org.floworc.core.utils.Await;
import javax.inject.Named;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Prototype
@MemoryQueueEnabled
public class MemoryExecutionState implements ExecutionStateInterface {
private final Object lock = new Object();
private final QueueInterface<Execution> executionQueue;
private final QueueInterface<WorkerTaskResult> workerTaskResultQueue;
private static ConcurrentHashMap<String, Execution> executions = new ConcurrentHashMap<>();
@@ -32,7 +33,7 @@ public class MemoryExecutionState implements ExecutionStateInterface {
@Override
public void run() {
this.executionQueue.receive(MemoryExecutionState.class, execution -> {
synchronized (lock) {
synchronized (this) {
if (execution.getState().isTerninated()) {
executions.remove(execution.getId());
} else {
@@ -42,17 +43,25 @@ public class MemoryExecutionState implements ExecutionStateInterface {
});
this.workerTaskResultQueue.receive(MemoryExecutionState.class, message -> {
synchronized (lock) {
synchronized (this) {
TaskRun taskRun = message.getTaskRun();
if (!executions.containsKey(taskRun.getExecutionId())) {
throw new RuntimeException("Unable to find execution '" + taskRun.getExecutionId() + "' on ExecutionState");
}
Execution execution = executions.get(taskRun.getExecutionId());
Execution newExecution = execution.withTaskRun(taskRun);
this.executionQueue.emit(newExecution);
// @FIXME: ugly hack, some time execution is not updated when the WorkerTaskResult is coming. We must sleep to wait that execution is updated.
Await.until(() -> {
try {
Execution execution = executions.get(taskRun.getExecutionId());
Execution newExecution = execution.withTaskRun(taskRun);
this.executionQueue.emit(newExecution);
return true;
} catch (IllegalArgumentException e) {
log.warn("Execution is not updated yet, sleeping !", e);
return false;
}
}, Duration.ofMillis(10));
}
});
}

View File

@@ -1,3 +1,7 @@
floworc:
queue:
type: memory
storage:
type: local
local:
base-path: /tmp/unittest

View File

@@ -6,5 +6,9 @@ include 'core'
include 'runner-memory'
include 'runner-kafka'
include 'storage-local'
include 'storage-gcs'
include 'storage-minio'
include 'repository-memory'

8
storage-gcs/build.gradle Normal file
View File

@@ -0,0 +1,8 @@
sourceCompatibility = 11
dependencies {
compile project(":core")
compile 'com.google.cloud:google-cloud-storage:1.96.0'
testCompile project(':core').sourceSets.test.output
}

View File

@@ -0,0 +1,16 @@
package org.floworc.storage.gcs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.micronaut.context.annotation.Factory;
import javax.inject.Singleton;
@Factory
@GcsStorageEnabled
public class GcsClientFactory {
@Singleton
public Storage of(GcsConfig config) {
return StorageOptions.getDefaultInstance().getService();
}
}

View File

@@ -0,0 +1,13 @@
package org.floworc.storage.gcs;
import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import javax.inject.Singleton;
@Singleton
@Getter
@ConfigurationProperties("floworc.storage.gcs")
public class GcsConfig {
String bucket;
}

View File

@@ -0,0 +1,69 @@
package org.floworc.storage.gcs;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.floworc.core.storages.StorageInterface;
import org.floworc.core.storages.StorageObject;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
@Singleton
@GcsStorageEnabled
public class GcsStorage implements StorageInterface {
@Inject
GcsClientFactory factory;
@Inject
GcsConfig config;
private Storage client() {
return factory.of(config);
}
private BlobId blob(URI uri) {
return BlobId.of(this.config.getBucket(), uri.toString());
}
@Override
public InputStream get(URI uri) throws FileNotFoundException {
Blob blob = this.client().get(this.blob(uri));
if (blob == null || !blob.exists()) {
throw new FileNotFoundException(uri.toString() + " (File not found)");
}
ReadableByteChannel reader = blob.reader();
return Channels.newInputStream(reader);
}
@Override
public StorageObject put(URI uri, InputStream data) throws IOException {
BlobInfo blobInfo = BlobInfo
.newBuilder(this.blob(uri))
.build();
try (WriteChannel writer = this.client().writer(blobInfo)) {
byte[] buffer = new byte[10_240];
int limit;
while ((limit = data.read(buffer)) >= 0) {
writer.write(ByteBuffer.wrap(buffer, 0, limit));
}
}
data.close();
return new StorageObject(this, uri);
}
}

View File

@@ -0,0 +1,12 @@
package org.floworc.storage.gcs;
import io.micronaut.context.annotation.Requires;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "floworc.storage.type", value = "gcs")
public @interface GcsStorageEnabled {
}

View File

@@ -0,0 +1,7 @@
package org.floworc.storage.gcs;
import org.floworc.core.storages.AbstractLocalStorageTest;
class GcsStorageTest extends AbstractLocalStorageTest {
}

View File

@@ -0,0 +1,6 @@
floworc:
storage:
type: gcs
gcs:
bucket: "floworc-unit-test"

View File

@@ -0,0 +1,7 @@
sourceCompatibility = 11
dependencies {
compile project(":core")
testCompile project(':core').sourceSets.test.output
}

View File

@@ -0,0 +1,25 @@
package org.floworc.storage.local;
import com.google.common.base.CharMatcher;
import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import javax.inject.Singleton;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
@Singleton
@Getter
@ConfigurationProperties("floworc.storage.local")
public class LocalConfig {
String basePath;
public URI getBasePath() {
try {
return new URI("file://" + CharMatcher.anyOf(File.separator).trimTrailingFrom(this.basePath));
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
}

View File

@@ -0,0 +1,77 @@
package org.floworc.storage.local;
import com.google.common.io.Files;
import org.floworc.core.storages.StorageInterface;
import org.floworc.core.storages.StorageObject;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
@Singleton
@LocalStorageEnabled
public class LocalStorage implements StorageInterface {
LocalConfig config;
@Inject
public LocalStorage(LocalConfig config) {
this.config = config;
this.createDirectory(null);
}
private URI getUri(URI uri) {
try {
return new URI(this.config.getBasePath() + File.separator + uri.toString());
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
@SuppressWarnings("ResultOfMethodCallIgnored")
private void createDirectory(URI append) {
File file;
if (append != null) {
try {
URI uri = new URI(config.getBasePath().toString() + File.separator + append.toString());
Path path = Paths.get(uri);
file = path.getParent().toFile();
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
} else {
file = new File(config.getBasePath());
}
if (!file.exists()) {
file.mkdirs();
}
}
@Override
public InputStream get(URI uri) throws FileNotFoundException {
return new FileInputStream(new File(getUri(uri)));
}
@Override
public StorageObject put(URI uri, InputStream data) throws IOException {
this.createDirectory(uri);
OutputStream outStream = new FileOutputStream(getUri(uri).getPath());
byte[] buffer = new byte[8 * 1024];
int bytesRead;
while ((bytesRead = data.read(buffer)) != -1) {
outStream.write(buffer, 0, bytesRead);
}
outStream.close();
data.close();
return new StorageObject(this, uri);
}
}

View File

@@ -0,0 +1,12 @@
package org.floworc.storage.local;
import io.micronaut.context.annotation.Requires;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "floworc.storage.type", value = "local")
public @interface LocalStorageEnabled {
}

View File

@@ -0,0 +1,7 @@
package org.floworc.storage.local;
import org.floworc.core.storages.AbstractLocalStorageTest;
class LocalStorageTest extends AbstractLocalStorageTest {
}

View File

@@ -0,0 +1,5 @@
floworc:
storage:
type: local
local:
base-path: /tmp/unittest

View File

@@ -0,0 +1,8 @@
sourceCompatibility = 11
dependencies {
compile project(":core")
compile 'io.minio:minio:6.0.11'
testCompile project(':core').sourceSets.test.output
}

View File

@@ -0,0 +1,29 @@
package org.floworc.storage.minio;
import io.micronaut.context.annotation.Factory;
import io.minio.MinioClient;
import javax.inject.Singleton;
@Factory
@MinioStorageEnabled
public class MinioClientFactory {
@Singleton
public MinioClient of(MinioConfig config) {
MinioClient client;
try {
client = new MinioClient(
config.getEndpoint(),
config.getPort(),
config.getAccessKey(),
config.getSecretKey(),
config.isSecure()
);
} catch (Exception e) {
throw new RuntimeException(e);
}
return client;
}
}

View File

@@ -0,0 +1,25 @@
package org.floworc.storage.minio;
import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import javax.inject.Singleton;
@Singleton
@Getter
@ConfigurationProperties("floworc.storage.minio")
public class MinioConfig {
String endpoint;
int port;
String accessKey;
String secretKey;
String region;
boolean secure;
String bucket;
}

View File

@@ -0,0 +1,57 @@
package org.floworc.storage.minio;
import io.minio.MinioClient;
import org.floworc.core.storages.StorageInterface;
import org.floworc.core.storages.StorageObject;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
@Singleton
@MinioStorageEnabled
public class MinioStorage implements StorageInterface {
@Inject
MinioClientFactory factory;
@Inject
MinioConfig config;
private MinioClient client() {
return factory.of(config);
}
@Override
public InputStream get(URI uri) throws FileNotFoundException {
try {
return client().getObject(this.config.getBucket(), uri.toString());
} catch (Throwable e) {
throw new FileNotFoundException(uri.toString() + " (" + e.getMessage() + ")");
}
}
@Override
public StorageObject put(URI uri, InputStream data) throws IOException {
try {
client().putObject(
this.config.getBucket(),
uri.toString(),
data,
null,
new HashMap<>(),
null,
null
);
data.close();
} catch (Throwable e) {
throw new IOException(e);
}
return new StorageObject(this, uri);
}
}

View File

@@ -0,0 +1,12 @@
package org.floworc.storage.minio;
import io.micronaut.context.annotation.Requires;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "floworc.storage.type", value = "minio")
public @interface MinioStorageEnabled {
}

View File

@@ -0,0 +1,24 @@
package org.floworc.storage.minio;
import io.minio.MinioClient;
import org.floworc.core.storages.AbstractLocalStorageTest;
import org.junit.jupiter.api.BeforeEach;
import javax.inject.Inject;
class MinioStorageTest extends AbstractLocalStorageTest {
@Inject
MinioClientFactory clientFactory;
@Inject
MinioConfig config;
@BeforeEach
void init() throws Exception {
MinioClient client = clientFactory.of(this.config);
if (!client.bucketExists(config.getBucket())) {
client.makeBucket(config.getBucket());
}
}
}

View File

@@ -0,0 +1,10 @@
floworc:
storage:
type: minio
minio:
endpoint: 127.8.9.13
port: 9000
accessKey: AKIAIOSFODNN7EXAMPLE
secretKey: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
bucket: "unittest" #"${random.shortuuid}"