feat(core): standardize input type file

remove exception for `inputs.file.uri` to `inputs.file` to be consistent 

relate to #78
This commit is contained in:
Ludovic DEHON
2020-03-02 18:29:22 +01:00
committed by GitHub
parent df994c56dc
commit b2912f50dd
11 changed files with 41 additions and 146 deletions

View File

@@ -6,7 +6,6 @@ import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.ApplicationContext;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.kestra.core.metrics.MetricRegistry;
import org.kestra.core.models.executions.AbstractMetricEntry;
import org.kestra.core.models.executions.Execution;
@@ -16,7 +15,6 @@ import org.kestra.core.models.flows.Flow;
import org.kestra.core.models.tasks.ResolvedTask;
import org.kestra.core.serializers.JacksonMapper;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import java.io.*;
import java.net.URI;
@@ -171,32 +169,6 @@ public class RunContext {
return builder.build();
}
@SneakyThrows
private Map<String, Object> resolveObjectStorage(Map<String, Object> variables) {
return variables
.entrySet()
.stream()
.map(r -> {
if (r.getValue() instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) r.getValue();
if (map.containsKey("type") && map.get("type").equals(StorageObject.class.getName())) {
r.setValue(new StorageObject(
this.storageInterface,
URI.create((String) map.get("uri"))
));
} else {
r.setValue(resolveObjectStorage(map));
}
}
return r;
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@SuppressWarnings("unchecked")
public RunContext forWorker(ApplicationContext applicationContext, TaskRun taskRun) {
this.init(applicationContext);
@@ -205,18 +177,6 @@ public class RunContext {
clone.remove("taskrun");
clone.put("taskrun", this.variables(taskRun));
if (variables.containsKey("inputs")) {
Map<String, Object> inputs = resolveObjectStorage((Map<String, Object>) variables.get("inputs"));
clone.remove("inputs");
clone.put("inputs", inputs);
}
if (variables.containsKey("outputs")) {
Map<String, Object> outputs = resolveObjectStorage((Map<String, Object>) variables.get("outputs"));
clone.remove("outputs");
clone.put("outputs", outputs);
}
this.variables = ImmutableMap.copyOf(clone);
return this;
@@ -277,11 +237,11 @@ public class RunContext {
* @return the {@code StorageObject} created
* @throws IOException If the temporary file can't be read
*/
public StorageObject putTempFile(File file) throws IOException {
public URI putTempFile(File file) throws IOException {
URI uri = URI.create(this.storageOutputPrefix.toString());
URI resolve = uri.resolve(uri.getPath() + "/" + file.getName());
StorageObject put = this.storageInterface.put(resolve, new FileInputStream(file));
URI put = this.storageInterface.put(resolve, new FileInputStream(file));
boolean delete = file.delete();
if (!delete) {

View File

@@ -15,13 +15,9 @@ import org.kestra.core.queues.QueueFactoryInterface;
import org.kestra.core.queues.QueueInterface;
import org.kestra.core.repositories.FlowRepositoryInterface;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import org.kestra.core.utils.Await;
import org.reactivestreams.Publisher;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import java.io.File;
import java.net.URI;
import java.time.Duration;
@@ -36,6 +32,9 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
@Singleton
public class RunnerUtils {
@@ -65,13 +64,13 @@ public class RunnerUtils {
throw new RuntimeException("Can't upload");
}
StorageObject from = storageInterface.from(flow, execution, file.getFilename(), tempFile);
URI from = storageInterface.from(flow, execution, file.getFilename(), tempFile);
//noinspection ResultOfMethodCallIgnored
tempFile.delete();
return new AbstractMap.SimpleEntry<>(
file.getFilename(),
from.getUri().toString()
from.toString()
);
})
.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)
@@ -137,7 +136,7 @@ public class RunnerUtils {
if (uri.getScheme() != null && uri.getScheme().equals("kestra")) {
return Optional.of(new AbstractMap.SimpleEntry<String, Object>(
input.getName(),
new StorageObject(this.storageInterface, uri)
uri
));
} else {
return Optional.of(new AbstractMap.SimpleEntry<String, Object>(

View File

@@ -18,7 +18,7 @@ import java.util.Arrays;
public interface StorageInterface {
InputStream get(URI uri) throws FileNotFoundException;
StorageObject put(URI uri, InputStream data) throws IOException;
URI put(URI uri, InputStream data) throws IOException;
default URI uri(Flow flow, Execution execution, String inputName, String file) throws URISyntaxException {
return new URI("/" + String.join(
@@ -35,7 +35,7 @@ public interface StorageInterface {
));
}
default StorageObject from(Flow flow, Execution execution, String input, File file) throws IOException {
default URI from(Flow flow, Execution execution, String input, File file) throws IOException {
try {
return this.put(
this.uri(flow, execution, input, file.getName()),
@@ -46,7 +46,7 @@ public interface StorageInterface {
}
}
default StorageObject from(Flow flow, Execution execution, Input input, File file) throws IOException {
default URI from(Flow flow, Execution execution, Input input, File file) throws IOException {
return this.from(flow, execution, input.getName(), file);
}

View File

@@ -1,48 +0,0 @@
package org.kestra.core.storages;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.io.CharStreams;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
public class StorageObject {
private String type = StorageObject.class.getName();
private URI uri;
private StorageInterface storageInterface;
public StorageObject(StorageInterface storageInterface, URI uri) {
this.storageInterface = storageInterface;
this.uri = uri;
}
public String getType() {
return type;
}
public URI getUri() {
return uri;
}
@JsonIgnore
public String getContent() throws IOException {
InputStreamReader inputStreamReader = new InputStreamReader(storageInterface.get(this.uri));
String content = CharStreams.toString(inputStreamReader);
inputStreamReader.close();;
return content;
}
@JsonIgnore
public InputStream getInputStream() throws FileNotFoundException {
return storageInterface.get(this.uri);
}
@Override
public String toString() {
return this.uri.toString();
}
}

View File

@@ -6,11 +6,11 @@ import org.junit.jupiter.api.Test;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.flows.State;
import org.kestra.core.repositories.FlowRepositoryInterface;
import org.kestra.core.storages.StorageObject;
import org.kestra.core.storages.StorageInterface;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
@@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@@ -38,6 +39,9 @@ public class InputsTest extends AbstractMemoryRunnerTest {
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private StorageInterface storageInterface;
private Map<String, Object> typedInputs(Map<String, String> map) {
return runnerUtils.typedInputs(
flowRepository.findById("org.kestra.tests", "inputs").get(),
@@ -84,16 +88,13 @@ public class InputsTest extends AbstractMemoryRunnerTest {
@Test
void inputFile() throws URISyntaxException, IOException {
Map<String, Object> typeds = typedInputs(inputs);
StorageObject file = (StorageObject) typeds.get("file");
URI file = (URI) typeds.get("file");
assertThat(file.getUri(), is(new URI("kestra:///org/kestra/tests/inputs/executions/test/inputs/file/application.yml")));
assertThat(file.getClass(), is(StorageObject.class));
assertThat(file, is(new URI("kestra:///org/kestra/tests/inputs/executions/test/inputs/file/application.yml")));
InputStream inputStream = storageInterface.get(file);
assertThat(
file.getContent(),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(inputs.get("file")))))
);
assertThat(
CharStreams.toString(new InputStreamReader(file.getInputStream())),
CharStreams.toString(new InputStreamReader(inputStream)),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(inputs.get("file")))))
);
}
@@ -107,13 +108,11 @@ public class InputsTest extends AbstractMemoryRunnerTest {
(flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs)
);
assertThat(execution.getTaskRunList(), hasSize(7));
assertThat(execution.getTaskRunList(), hasSize(5));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Arrays.asList("file-uri", "file").forEach(o ->
assertThat(
(String) execution.findTaskRunsByTaskId(o).get(0).getOutputs().get("value"),
(String) execution.findTaskRunsByTaskId("file").get(0).getOutputs().get("value"),
matchesRegex("kestra:///org/kestra/tests/inputs/executions/.*/inputs/file/application.yml")
)
);
}
}

View File

@@ -86,6 +86,6 @@ class FlowTest extends AbstractMemoryRunnerTest {
}, Duration.ofSeconds(5)
);
assertThat(execution.getTaskRunList(), hasSize(7));
assertThat(execution.getTaskRunList(), hasSize(5));
}
}

View File

@@ -32,12 +32,6 @@ tasks:
- id: instant
type: org.kestra.core.tasks.debugs.Return
format: "{{inputs.instant}}"
- id: file-content
type: org.kestra.core.tasks.debugs.Return
format: "{{inputs.file.content}}"
- id: file
type: org.kestra.core.tasks.debugs.Return
format: "{{inputs.file}}"
- id: file-uri
type: org.kestra.core.tasks.debugs.Return
format: "{{inputs.file.uri}}"

View File

@@ -32,12 +32,6 @@ tasks:
- id: instant
type: org.kestra.core.tasks.debugs.Return
format: "{{inputs.instant}}"
- id: file-content
type: org.kestra.core.tasks.debugs.Return
format: "{{inputs.file.content}}"
- id: file
type: org.kestra.core.tasks.debugs.Return
format: "{{inputs.file}}"
- id: file-uri
type: org.kestra.core.tasks.debugs.Return
format: "{{inputs.file.uri}}"

View File

@@ -1,14 +1,13 @@
package org.kestra.storage.local;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.*;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
@LocalStorageEnabled
@@ -50,7 +49,7 @@ public class LocalStorage implements StorageInterface {
}
@Override
public StorageObject put(URI uri, InputStream data) throws IOException {
public URI put(URI uri, InputStream data) throws IOException {
this.createDirectory(uri);
OutputStream outStream = new FileOutputStream(getPath(uri).toFile());
@@ -64,8 +63,6 @@ public class LocalStorage implements StorageInterface {
outStream.close();
data.close();
URI result = URI.create("kestra://" + uri.getPath());
return new StorageObject(this, result);
return URI.create("kestra://" + uri.getPath());
}
}

View File

@@ -4,9 +4,7 @@ import com.google.common.io.CharStreams;
import io.micronaut.test.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.kestra.core.storages.StorageInterface;
import org.kestra.core.storages.StorageObject;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
@@ -14,6 +12,7 @@ import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.util.Objects;
import javax.inject.Inject;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -24,7 +23,7 @@ class LocalStorageTest {
@Inject
StorageInterface storageInterface;
private StorageObject putFile(URL resource, String path) throws Exception {
private URI putFile(URL resource, String path) throws Exception {
return storageInterface.put(
new URI(path),
new FileInputStream(Objects.requireNonNull(resource).getFile())
@@ -55,10 +54,10 @@ class LocalStorageTest {
@Test
void put() throws Exception {
URL resource = LocalStorageTest.class.getClassLoader().getResource("application.yml");
StorageObject put = this.putFile(resource, "/file/storage/put.yml");
URI put = this.putFile(resource, "/file/storage/put.yml");
InputStream get = storageInterface.get(new URI("/file/storage/put.yml"));
assertThat(put.getUri().toString(), is(new URI("kestra:///file/storage/put.yml").toString()));
assertThat(put.toString(), is(new URI("kestra:///file/storage/put.yml").toString()));
assertThat(
CharStreams.toString(new InputStreamReader(get)),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile()))))

View File

@@ -31,6 +31,7 @@ import org.kestra.webserver.responses.PagedResults;
import javax.inject.Inject;
import javax.inject.Named;
import java.io.File;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -122,8 +123,8 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
assertThat(result.getState().getCurrent(), is(State.Type.CREATED));
assertThat(result.getFlowId(), is("inputs"));
assertThat(result.getInputs().get("float"), is(42.42));
assertThat(((Map<String, String>) result.getInputs().get("file")).get("uri"), startsWith("kestra:///org/kestra/tests/inputs/executions/"));
assertThat(((Map<String, String>) result.getInputs().get("optionalFile")).get("uri"), startsWith("kestra:///org/kestra/tests/inputs/executions/"));
assertThat(result.getInputs().get("file").toString(), startsWith("kestra:///org/kestra/tests/inputs/executions/"));
assertThat(result.getInputs().get("file").toString(), startsWith("kestra:///org/kestra/tests/inputs/executions/"));
}
@Test
@@ -267,7 +268,7 @@ class ExecutionControllerTest extends AbstractMemoryRunnerTest {
assertThat(finishedChildExecution, notNullValue());
assertThat(finishedChildExecution.getParentId(), is(parentExecution.getId()));
assertThat(finishedChildExecution.getTaskRunList().size(), is(7));
assertThat(finishedChildExecution.getTaskRunList().size(), is(5));
finishedChildExecution
.getTaskRunList()