mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(task-serdes): introduce serdes tasks
Tasks package that will handle file conversion. - new tasks: CsvReader & CsvWriter - new tasks: JsonReader & JsonWriter - Refactor the CsvToAvro to AvroWriter and handle conversion from all type depending on schema. - Remove the task-avro in favor of a more general task-serdes
This commit is contained in:
@@ -24,6 +24,6 @@ dependencies {
|
||||
compile project(":webserver")
|
||||
|
||||
// tasks
|
||||
compile project(":task-avro")
|
||||
compile project(":task-gcp")
|
||||
compile project(":task-serdes")
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
|
||||
package org.floworc.core.serializers;
|
||||
|
||||
import io.reactivex.FlowableOnSubscribe;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
|
||||
abstract public class ObjectsSerde {
|
||||
public static void write(ObjectOutputStream output, Object row) throws IOException {
|
||||
output.writeObject(row);
|
||||
output.reset();
|
||||
}
|
||||
|
||||
public static FlowableOnSubscribe<Object> reader(ObjectInputStream input) {
|
||||
return s -> {
|
||||
try {
|
||||
Object row;
|
||||
while ((row = input.readObject()) != null) {
|
||||
s.onNext(row);
|
||||
}
|
||||
} catch (EOFException e) {
|
||||
s.onComplete();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,7 @@ include 'storage-minio'
|
||||
include 'repository-elasticsearch'
|
||||
include 'repository-memory'
|
||||
|
||||
include 'task-avro'
|
||||
include 'task-serdes'
|
||||
include 'task-gcp'
|
||||
|
||||
include 'webserver'
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,410 @@
|
||||
package org.floworc.task.serdes.avro;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import org.apache.avro.Conversions;
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.data.TimeConversions;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.util.Utf8;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.MathContext;
|
||||
import java.math.RoundingMode;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.*;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
|
||||
@Builder
|
||||
public class AvroConverter {
|
||||
@Builder.Default
|
||||
private List<String> trueValues = Arrays.asList("t", "true", "enabled", "1", "on", "yes");
|
||||
|
||||
@Builder.Default
|
||||
private List<String> falseValues = Arrays.asList("f", "false", "disabled", "0", "off", "no", "");
|
||||
|
||||
@Builder.Default
|
||||
private List<String> nullValues = Arrays.asList(
|
||||
"",
|
||||
"#N/A",
|
||||
"#N/A N/A",
|
||||
"#NA",
|
||||
"-1.#IND",
|
||||
"-1.#QNAN",
|
||||
"-NaN",
|
||||
"1.#IND",
|
||||
"1.#QNAN",
|
||||
"NA",
|
||||
"n/a",
|
||||
"nan",
|
||||
"null"
|
||||
);
|
||||
|
||||
@Builder.Default
|
||||
private String dateFormat = "yyyy-MM-dd[XXX]";
|
||||
|
||||
@Builder.Default
|
||||
private String timeFormat = "HH:mm[:ss][.SSSSSS][XXX]";
|
||||
|
||||
@Builder.Default
|
||||
private String datetimeFormat = "yyyy-MM-dd'T'HH:mm[:ss][.SSSSSS][XXX]";
|
||||
|
||||
public static GenericData genericData() {
|
||||
GenericData genericData = new GenericData();
|
||||
genericData.addLogicalTypeConversion(new Conversions.UUIDConversion());
|
||||
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
|
||||
genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
|
||||
genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
|
||||
genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
|
||||
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
|
||||
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
|
||||
|
||||
return genericData;
|
||||
}
|
||||
|
||||
public GenericData.Record fromMap(Schema schema, Map<String, Object> data) throws IllegalRowConvertion {
|
||||
GenericData.Record record = new GenericData.Record(schema);
|
||||
|
||||
for (Schema.Field field : schema.getFields()) {
|
||||
try {
|
||||
record.put(field.name(), convert(field.schema(), data.get(field.name())));
|
||||
} catch (IllegalCellConversion e) {
|
||||
throw new IllegalRowConvertion(data, e);
|
||||
}
|
||||
}
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
public GenericData.Record fromArray(Schema schema, List<String> data) throws IllegalRowConvertion {
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
int index = 0;
|
||||
for (Schema.Field field : schema.getFields()) {
|
||||
map.put(field.name(), data.get(index));
|
||||
index++;
|
||||
}
|
||||
|
||||
return this.fromMap(schema, map);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Object convert(Schema schema, Object data) throws IllegalCellConversion {
|
||||
try {
|
||||
if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) { // logical
|
||||
return this.logicalDecimal(schema, data);
|
||||
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("uuid")) {
|
||||
return this.logicalUuid(data);
|
||||
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("date")) {
|
||||
return this.logicalDate(data);
|
||||
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("time-millis")) {
|
||||
return this.logicalTimeMillis(data);
|
||||
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("time-micros")) {
|
||||
return this.logicalTimeMicros(data);
|
||||
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("timestamp-millis")) {
|
||||
return this.logicalTimestampMillis(data);
|
||||
} else if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("timestamp-micros")) {
|
||||
return this.logicalTimestampMicros(data);
|
||||
} else if (schema.getType() == Schema.Type.RECORD) { // complex
|
||||
return fromMap(schema, (Map<String, Object>) data);
|
||||
} else if (schema.getType() == Schema.Type.ARRAY) {
|
||||
return this.complexArray(schema, data);
|
||||
} else if (schema.getType() == Schema.Type.MAP) {
|
||||
return this.complexMap(schema, data);
|
||||
} else if (schema.getType() == Schema.Type.UNION) {
|
||||
return this.complexUnion(schema, data);
|
||||
} else if (schema.getType() == Schema.Type.FIXED) {
|
||||
return this.complexFixed(schema, data);
|
||||
} else if (schema.getType() == Schema.Type.ENUM) {
|
||||
return this.complexEnum(schema, data);
|
||||
} else if (schema.getType() == Schema.Type.NULL) { // primitive
|
||||
return this.primitiveNull(data);
|
||||
} else if (schema.getType() == Schema.Type.INT) {
|
||||
return this.primitiveInt(data);
|
||||
} else if (schema.getType() == Schema.Type.FLOAT) {
|
||||
return this.primitiveFloat(data);
|
||||
} else if (schema.getType() == Schema.Type.DOUBLE) {
|
||||
return this.primitiveDouble(data);
|
||||
} else if (schema.getType() == Schema.Type.LONG) {
|
||||
return this.primitiveLong(data);
|
||||
} else if (schema.getType() == Schema.Type.BOOLEAN) {
|
||||
return this.primitiveBool(data);
|
||||
} else if (schema.getType() == Schema.Type.STRING) {
|
||||
return this.primitiveString(data);
|
||||
} else if (schema.getType() == Schema.Type.BYTES) {
|
||||
return this.primitiveBytes(data);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Invalid schema \"" + schema.getType() + "\"");
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
throw new IllegalCellConversion(schema, data, e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("UnpredictableBigDecimalConstructorCall")
|
||||
private BigDecimal logicalDecimal(Schema schema, Object data) {
|
||||
int scale = ((LogicalTypes.Decimal) schema.getLogicalType()).getScale();
|
||||
int precision = ((LogicalTypes.Decimal) schema.getLogicalType()).getPrecision();
|
||||
double multiply = Math.pow(10D, precision - scale * 1D);
|
||||
|
||||
BigDecimal value;
|
||||
|
||||
if (data instanceof String) {
|
||||
value = new BigDecimal(((String) data));
|
||||
} else if (data instanceof Long) {
|
||||
value = BigDecimal.valueOf((long) ((long) data * multiply), scale);
|
||||
} else if (data instanceof Integer) {
|
||||
value = BigDecimal.valueOf((int) ((int) data * multiply), scale);
|
||||
} else if (data instanceof Double) {
|
||||
value = new BigDecimal((double) data, new MathContext(precision));
|
||||
} else if (data instanceof Float) {
|
||||
value = new BigDecimal((float) data, new MathContext(precision));
|
||||
} else {
|
||||
value = (BigDecimal) data;
|
||||
}
|
||||
|
||||
value = value.setScale(scale, RoundingMode.HALF_EVEN);
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
private UUID logicalUuid(Object data) {
|
||||
if (data instanceof String) {
|
||||
return UUID.fromString((String) data);
|
||||
} else {
|
||||
return (UUID) data;
|
||||
}
|
||||
}
|
||||
|
||||
private LocalDate logicalDate(Object data) {
|
||||
if (data instanceof String) {
|
||||
return LocalDate.parse((String) data, DateTimeFormatter.ofPattern(this.dateFormat));
|
||||
} else {
|
||||
return (LocalDate) data;
|
||||
}
|
||||
}
|
||||
|
||||
private LocalTime logicalTimeMillis(Object data) {
|
||||
if (data instanceof String) {
|
||||
return LocalTime.parse((String) data, DateTimeFormatter.ofPattern(this.timeFormat));
|
||||
} else {
|
||||
return (LocalTime) data;
|
||||
}
|
||||
}
|
||||
|
||||
private LocalTime logicalTimeMicros(Object data) {
|
||||
if (data instanceof String) {
|
||||
return LocalTime.parse((String) data, DateTimeFormatter.ofPattern(this.timeFormat));
|
||||
} else {
|
||||
return (LocalTime) data;
|
||||
}
|
||||
}
|
||||
|
||||
private Instant logicalTimestampMillis(Object data) {
|
||||
if (data instanceof String) {
|
||||
return LocalDateTime.parse((String) data, DateTimeFormatter.ofPattern(this.datetimeFormat)).toInstant(ZoneOffset.UTC);
|
||||
} else {
|
||||
return (Instant) data;
|
||||
}
|
||||
}
|
||||
|
||||
private Instant logicalTimestampMicros(Object data) {
|
||||
if (data instanceof String) {
|
||||
return LocalDateTime.parse((String) data, DateTimeFormatter.ofPattern(this.datetimeFormat)).toInstant(ZoneOffset.UTC);
|
||||
} else {
|
||||
return (Instant) data;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<Object> complexArray(Schema schema, Object data) throws IllegalCellConversion {
|
||||
Schema elementType = schema.getElementType();
|
||||
|
||||
Collection<Object> list = (Collection<Object>) data;
|
||||
List<Object> result = new ArrayList<>();
|
||||
|
||||
for (Object current : list) {
|
||||
result.add(this.convert(elementType, current));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private Object complexUnion(Schema schema, Object data) {
|
||||
for (Schema current : schema.getTypes()) {
|
||||
try {
|
||||
return this.convert(current, data);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Invalid data for schema \"" + schema.getType() + "\"");
|
||||
}
|
||||
|
||||
private GenericData.Fixed complexFixed(Schema schema, Object data) {
|
||||
ByteBuffer value = this.primitiveBytes(data);
|
||||
int fixedSize = schema.getFixedSize();
|
||||
|
||||
value.position(0);
|
||||
int size = value.remaining();
|
||||
|
||||
if (size != fixedSize) {
|
||||
throw new IllegalArgumentException("Invalid length for fixed size, found " + size + ", expexted " + fixedSize);
|
||||
}
|
||||
|
||||
return new GenericData.Fixed(schema, value.array());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<Utf8, Object> complexMap(Schema schema, Object data) throws IllegalCellConversion {
|
||||
Schema valueType = schema.getValueType();
|
||||
|
||||
Map<Object, Object> list = (Map<Object, Object>) data;
|
||||
Map<Utf8, Object> result = new HashMap<>();
|
||||
|
||||
for (Map.Entry<Object, Object> current : list.entrySet()) {
|
||||
result.put(
|
||||
new Utf8(this.primitiveString(current.getKey()).getBytes()),
|
||||
this.convert(valueType, current.getValue())
|
||||
);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private GenericData.EnumSymbol complexEnum(Schema schema, Object data) {
|
||||
String value = this.primitiveString(data);
|
||||
List<String> symbols = schema.getEnumSymbols();
|
||||
|
||||
if (!symbols.contains(value)) {
|
||||
throw new IllegalArgumentException("Invalid enum value, found " + value + ", expexted " + symbols);
|
||||
}
|
||||
|
||||
return new GenericData.EnumSymbol(schema, value);
|
||||
}
|
||||
|
||||
private Integer primitiveNull(Object data) {
|
||||
if (data instanceof String && this.contains(this.nullValues, (String) data)) {
|
||||
return null;
|
||||
} else if (data == null) {
|
||||
return null;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown type for null values, found " + data.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
||||
private Integer primitiveInt(Object data) {
|
||||
if (data instanceof String) {
|
||||
return Integer.valueOf((String) data);
|
||||
} else {
|
||||
return (int) data;
|
||||
}
|
||||
}
|
||||
|
||||
private Long primitiveLong(Object data) {
|
||||
if (data instanceof String) {
|
||||
return Long.valueOf((String) data);
|
||||
} else if (data instanceof Integer) {
|
||||
return (long) ((int) data);
|
||||
} else {
|
||||
return (long) data;
|
||||
}
|
||||
}
|
||||
|
||||
private Float primitiveFloat(Object data) {
|
||||
if (data instanceof String) {
|
||||
return Float.valueOf((String) data);
|
||||
} else if (data instanceof Integer) {
|
||||
return (float) ((int) data);
|
||||
} else if (data instanceof Double) {
|
||||
return (float) ((double) data);
|
||||
} else {
|
||||
return (float) data;
|
||||
}
|
||||
}
|
||||
|
||||
private Double primitiveDouble(Object data) {
|
||||
if (data instanceof String) {
|
||||
return Double.valueOf((String) data);
|
||||
} else if (data instanceof Integer) {
|
||||
return (double) ((int) data);
|
||||
} else if (data instanceof Float) {
|
||||
return (double) ((float) data);
|
||||
} else {
|
||||
return (double) data;
|
||||
}
|
||||
}
|
||||
|
||||
public Boolean primitiveBool(Object data) {
|
||||
if (data instanceof String && this.contains(this.trueValues, (String) data)) {
|
||||
return true;
|
||||
} else if (data instanceof String && this.contains(this.falseValues, (String) data)) {
|
||||
return false;
|
||||
} else if (data instanceof Integer && (int) data == 1) {
|
||||
return true;
|
||||
} else if (data instanceof Integer && (int) data == 0) {
|
||||
return false;
|
||||
} else {
|
||||
return (boolean) data;
|
||||
}
|
||||
}
|
||||
|
||||
public String primitiveString(Object data) {
|
||||
return String.valueOf(data);
|
||||
}
|
||||
|
||||
public ByteBuffer primitiveBytes(Object data) {
|
||||
return ByteBuffer.wrap(this.primitiveString(data).getBytes());
|
||||
}
|
||||
|
||||
private boolean contains(List<String> list, String data) {
|
||||
return list.stream().anyMatch(s -> s.equalsIgnoreCase(data));
|
||||
}
|
||||
|
||||
@Getter
|
||||
public static class IllegalRowConvertion extends Exception {
|
||||
private static ObjectMapper mapper = new ObjectMapper();
|
||||
private Object data;
|
||||
|
||||
public IllegalRowConvertion(Map<String, Object> data, Throwable e) {
|
||||
super(e);
|
||||
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
try {
|
||||
return super.toString() + " on line with data [" + mapper.writeValueAsString(data) + "]";
|
||||
} catch (JsonProcessingException e) {
|
||||
return super.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
public static class IllegalCellConversion extends Exception {
|
||||
private static ObjectMapper mapper = new ObjectMapper();
|
||||
private Object data;
|
||||
private Schema schema;
|
||||
|
||||
public IllegalCellConversion(Schema schema, Object data, Throwable e) {
|
||||
super(e);
|
||||
|
||||
this.schema = schema;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
try {
|
||||
return super.toString() + " on cols with data [" + mapper.writeValueAsString(data) + "] and schema [" + schema.toString() + "]";
|
||||
} catch (JsonProcessingException e) {
|
||||
return super.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,167 @@
|
||||
package org.floworc.task.serdes.avro;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.reactivex.BackpressureStrategy;
|
||||
import io.reactivex.Flowable;
|
||||
import io.reactivex.Single;
|
||||
import io.reactivex.functions.Function;
|
||||
import io.reactivex.schedulers.Schedulers;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.file.DataFileWriter;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.floworc.core.models.tasks.RunnableTask;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.serializers.ObjectsSerde;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.net.URI;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class AvroWriter extends Task implements RunnableTask {
|
||||
@NotNull
|
||||
private String from;
|
||||
|
||||
@NotNull
|
||||
private String schema;
|
||||
|
||||
private List<String> trueValues;
|
||||
|
||||
private List<String> falseValues;
|
||||
|
||||
private List<String> nullValues;
|
||||
|
||||
private String dateFormat;
|
||||
|
||||
private String timeFormat;
|
||||
|
||||
private String datetimeFormat;
|
||||
|
||||
@Override
|
||||
public RunOutput run(RunContext runContext) throws Exception {
|
||||
Logger logger = runContext.logger(this.getClass());
|
||||
|
||||
// temp file
|
||||
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".avro");
|
||||
BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(tempFile));
|
||||
|
||||
// avro writer
|
||||
Schema.Parser parser = new Schema.Parser();
|
||||
Schema schema = parser.parse(this.schema);
|
||||
|
||||
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema, AvroConverter.genericData());
|
||||
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
|
||||
dataFileWriter.create(schema, output);
|
||||
|
||||
// reader
|
||||
URI from = new URI(runContext.render(this.from));
|
||||
ObjectInputStream inputStream = new ObjectInputStream(runContext.uriToInputStream(from));
|
||||
|
||||
// convert
|
||||
Flowable<GenericData.Record> flowable = Flowable
|
||||
.create(ObjectsSerde.reader(inputStream), BackpressureStrategy.BUFFER)
|
||||
.observeOn(Schedulers.computation())
|
||||
.map(this.convertToAvro(schema))
|
||||
.observeOn(Schedulers.io())
|
||||
.doOnNext(datum -> {
|
||||
try {
|
||||
dataFileWriter.append(datum);
|
||||
} catch (Throwable e) {
|
||||
throw new AvroConverter.IllegalRowConvertion(
|
||||
datum.getSchema()
|
||||
.getFields()
|
||||
.stream()
|
||||
.map(field -> new AbstractMap.SimpleEntry<>(field.name(), datum.get(field.name())))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
|
||||
e
|
||||
);
|
||||
}
|
||||
})
|
||||
.doOnComplete(() -> {
|
||||
dataFileWriter.close();
|
||||
inputStream.close();
|
||||
output.close();
|
||||
});
|
||||
|
||||
// metrics & finalize
|
||||
Single<Long> count = flowable.count();
|
||||
Long lineCount = count.blockingGet();
|
||||
|
||||
return RunOutput.builder()
|
||||
.outputs(ImmutableMap.of("uri", runContext.putFile(tempFile).getUri()))
|
||||
.build();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Function<Object, GenericData.Record> convertToAvro(Schema schema) {
|
||||
AvroConverter converter = this.converter();
|
||||
|
||||
return row -> {
|
||||
GenericData.Record record = new GenericData.Record(schema);
|
||||
|
||||
if (row instanceof List) {
|
||||
List<String> casted = (List<String>) row;
|
||||
|
||||
return converter.fromArray(schema, casted);
|
||||
} else if (row instanceof Map) {
|
||||
Map<String, Object> casted = (Map<String, Object>) row;
|
||||
|
||||
return converter.fromMap(schema, casted);
|
||||
}
|
||||
|
||||
return record;
|
||||
};
|
||||
}
|
||||
|
||||
private AvroConverter converter() {
|
||||
AvroConverter.AvroConverterBuilder builder = AvroConverter.builder();
|
||||
|
||||
if (this.trueValues != null) {
|
||||
builder.trueValues(this.trueValues);
|
||||
}
|
||||
|
||||
if (this.falseValues != null) {
|
||||
builder.falseValues(this.falseValues);
|
||||
}
|
||||
|
||||
if (this.nullValues != null) {
|
||||
builder.nullValues(this.nullValues);
|
||||
}
|
||||
|
||||
if (this.dateFormat != null) {
|
||||
builder.dateFormat(this.dateFormat);
|
||||
}
|
||||
|
||||
if (this.timeFormat != null) {
|
||||
builder.timeFormat(this.timeFormat);
|
||||
}
|
||||
|
||||
if (this.datetimeFormat != null) {
|
||||
builder.datetimeFormat(this.datetimeFormat);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
@@ -1,47 +1,38 @@
|
||||
package org.floworc.task.avro;
|
||||
package org.floworc.task.serdes.csv;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import de.siegmar.fastcsv.reader.CsvParser;
|
||||
import de.siegmar.fastcsv.reader.CsvReader;
|
||||
import de.siegmar.fastcsv.reader.CsvRow;
|
||||
import io.reactivex.BackpressureStrategy;
|
||||
import io.reactivex.Flowable;
|
||||
import io.reactivex.FlowableOnSubscribe;
|
||||
import io.reactivex.Single;
|
||||
import io.reactivex.functions.Function;
|
||||
import io.reactivex.schedulers.Schedulers;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.file.DataFileWriter;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.floworc.core.models.tasks.RunnableTask;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.serializers.ObjectsSerde;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class CsvToAvro extends Task implements RunnableTask {
|
||||
public class CsvReader extends Task implements RunnableTask {
|
||||
@NotNull
|
||||
private String from;
|
||||
|
||||
@NotNull
|
||||
private String schema;
|
||||
|
||||
@Builder.Default
|
||||
private Boolean header = true;
|
||||
|
||||
@@ -58,30 +49,27 @@ public class CsvToAvro extends Task implements RunnableTask {
|
||||
public RunOutput run(RunContext runContext) throws Exception {
|
||||
// reader
|
||||
URI from = new URI(runContext.render(this.from));
|
||||
CsvReader csvReader = this.csvReader();
|
||||
de.siegmar.fastcsv.reader.CsvReader csvReader = this.csvReader();
|
||||
CsvParser csvParser = csvReader.parse(new InputStreamReader(runContext.uriToInputStream(from)));
|
||||
|
||||
// temp file
|
||||
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".avro");
|
||||
BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(tempFile));
|
||||
|
||||
// avro writer
|
||||
Schema.Parser parser = new Schema.Parser();
|
||||
Schema schema = parser.parse(this.schema);
|
||||
|
||||
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
|
||||
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
|
||||
dataFileWriter.create(schema, output);
|
||||
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".javas");
|
||||
ObjectOutputStream output = new ObjectOutputStream(new FileOutputStream(tempFile));
|
||||
|
||||
// convert
|
||||
Flowable<GenericData.Record> flowable = Flowable
|
||||
Flowable<Object> flowable = Flowable
|
||||
.create(this.nextRow(csvParser), BackpressureStrategy.BUFFER)
|
||||
.observeOn(Schedulers.computation())
|
||||
.map(this.convertToAvro(schema))
|
||||
.map(r -> {
|
||||
if (header) {
|
||||
return r.getFieldMap();
|
||||
} else {
|
||||
return r.getFields();
|
||||
}
|
||||
})
|
||||
.observeOn(Schedulers.io())
|
||||
.doOnNext(dataFileWriter::append)
|
||||
.doOnNext(row -> ObjectsSerde.write(output, row))
|
||||
.doOnComplete(() -> {
|
||||
dataFileWriter.close();
|
||||
output.close();
|
||||
csvParser.close();
|
||||
});
|
||||
|
||||
@@ -94,18 +82,6 @@ public class CsvToAvro extends Task implements RunnableTask {
|
||||
.build();
|
||||
}
|
||||
|
||||
private Function<CsvRow, GenericData.Record> convertToAvro(Schema schema) {
|
||||
return row -> {
|
||||
GenericData.Record record = new GenericData.Record(schema);
|
||||
|
||||
for (Schema.Field field : schema.getFields()) {
|
||||
record.put(field.name(), row.getField(field.name()));
|
||||
}
|
||||
|
||||
return record;
|
||||
};
|
||||
}
|
||||
|
||||
private FlowableOnSubscribe<CsvRow> nextRow(CsvParser csvParser) {
|
||||
return s -> {
|
||||
CsvRow row;
|
||||
@@ -117,8 +93,8 @@ public class CsvToAvro extends Task implements RunnableTask {
|
||||
};
|
||||
}
|
||||
|
||||
private CsvReader csvReader() {
|
||||
CsvReader csvReader = new CsvReader();
|
||||
private de.siegmar.fastcsv.reader.CsvReader csvReader() {
|
||||
de.siegmar.fastcsv.reader.CsvReader csvReader = new de.siegmar.fastcsv.reader.CsvReader();
|
||||
|
||||
if (this.header != null) {
|
||||
csvReader.setContainsHeader(this.header);
|
||||
@@ -0,0 +1,129 @@
|
||||
package org.floworc.task.serdes.csv;
|
||||
|
||||
import com.github.jknack.handlebars.internal.lang3.ArrayUtils;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import de.siegmar.fastcsv.writer.CsvAppender;
|
||||
import io.reactivex.BackpressureStrategy;
|
||||
import io.reactivex.Flowable;
|
||||
import io.reactivex.Single;
|
||||
import io.reactivex.functions.Consumer;
|
||||
import io.reactivex.schedulers.Schedulers;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.floworc.core.models.tasks.RunnableTask;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.serializers.ObjectsSerde;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.io.File;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class CsvWriter extends Task implements RunnableTask {
|
||||
@NotNull
|
||||
private String from;
|
||||
|
||||
@Builder.Default
|
||||
private Boolean header = true;
|
||||
|
||||
@Builder.Default
|
||||
private Character fieldSeparator = ",".charAt(0);
|
||||
|
||||
@Builder.Default
|
||||
private Character textDelimiter = "\"".charAt(0);
|
||||
|
||||
@Builder.Default
|
||||
private Character[] lineDelimiter = ArrayUtils.toObject("\n".toCharArray());
|
||||
|
||||
@Builder.Default
|
||||
private Boolean alwaysDelimitText = false;
|
||||
|
||||
@Override
|
||||
public RunOutput run(RunContext runContext) throws Exception {
|
||||
// temp file
|
||||
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".csv");
|
||||
|
||||
// writer
|
||||
de.siegmar.fastcsv.writer.CsvWriter csvWriter = this.csvWriter();
|
||||
CsvAppender csvAppender = csvWriter.append(tempFile, StandardCharsets.UTF_8);
|
||||
|
||||
// reader
|
||||
URI from = new URI(runContext.render(this.from));
|
||||
ObjectInputStream inputStream = new ObjectInputStream(runContext.uriToInputStream(from));
|
||||
|
||||
Flowable<Object> flowable = Flowable
|
||||
.create(ObjectsSerde.<String, String>reader(inputStream), BackpressureStrategy.BUFFER)
|
||||
.observeOn(Schedulers.io())
|
||||
.doOnNext(new Consumer<>() {
|
||||
private boolean first = false;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void accept(Object row) throws Exception {
|
||||
if (row instanceof List) {
|
||||
List<String> casted = (List<String>) row;
|
||||
|
||||
if (header) {
|
||||
throw new IllegalArgumentException("Invalid data of type List with header");
|
||||
}
|
||||
|
||||
for (final String value : casted) {
|
||||
csvAppender.appendField(value);
|
||||
}
|
||||
} else if (row instanceof Map) {
|
||||
Map<String, String> casted = (Map<String, String>) row;
|
||||
|
||||
if (!first) {
|
||||
this.first = true;
|
||||
if (header) {
|
||||
for (final String value : casted.keySet()) {
|
||||
csvAppender.appendField(value);
|
||||
}
|
||||
csvAppender.endLine();
|
||||
}
|
||||
}
|
||||
|
||||
for (final String value : casted.values()) {
|
||||
csvAppender.appendField(value);
|
||||
}
|
||||
}
|
||||
|
||||
csvAppender.endLine();
|
||||
}
|
||||
})
|
||||
.doOnComplete(() -> {
|
||||
csvAppender.close();
|
||||
inputStream.close();
|
||||
});
|
||||
|
||||
|
||||
// metrics & finalize
|
||||
Single<Long> count = flowable.count();
|
||||
Long lineCount = count.blockingGet();
|
||||
|
||||
return RunOutput.builder()
|
||||
.outputs(ImmutableMap.of("uri", runContext.putFile(tempFile).getUri()))
|
||||
.build();
|
||||
}
|
||||
|
||||
private de.siegmar.fastcsv.writer.CsvWriter csvWriter() {
|
||||
de.siegmar.fastcsv.writer.CsvWriter csvWriter = new de.siegmar.fastcsv.writer.CsvWriter();
|
||||
|
||||
csvWriter.setTextDelimiter(this.textDelimiter);
|
||||
csvWriter.setFieldSeparator(this.fieldSeparator);
|
||||
csvWriter.setLineDelimiter(ArrayUtils.toPrimitive(this.lineDelimiter));
|
||||
csvWriter.setAlwaysDelimitText(this.alwaysDelimitText);
|
||||
|
||||
return csvWriter;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package org.floworc.task.serdes.json;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.reactivex.BackpressureStrategy;
|
||||
import io.reactivex.Flowable;
|
||||
import io.reactivex.FlowableOnSubscribe;
|
||||
import io.reactivex.Single;
|
||||
import io.reactivex.schedulers.Schedulers;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.floworc.core.models.tasks.RunnableTask;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.serializers.ObjectsSerde;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class JsonReader extends Task implements RunnableTask {
|
||||
@NotNull
|
||||
private String from;
|
||||
|
||||
@Override
|
||||
public RunOutput run(RunContext runContext) throws Exception {
|
||||
// reader
|
||||
URI from = new URI(runContext.render(this.from));
|
||||
BufferedReader input = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)));
|
||||
|
||||
// temp file
|
||||
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".javas");
|
||||
ObjectOutputStream output = new ObjectOutputStream(new FileOutputStream(tempFile));
|
||||
|
||||
// convert
|
||||
Flowable<Object> flowable = Flowable
|
||||
.create(this.nextRow(input), BackpressureStrategy.BUFFER)
|
||||
.observeOn(Schedulers.io())
|
||||
.doOnNext(row -> ObjectsSerde.write(output, row))
|
||||
.doOnComplete(() -> {
|
||||
output.close();
|
||||
input.close();
|
||||
});
|
||||
|
||||
// metrics & finalize
|
||||
Single<Long> count = flowable.count();
|
||||
Long lineCount = count.blockingGet();
|
||||
|
||||
return RunOutput.builder()
|
||||
.outputs(ImmutableMap.of("uri", runContext.putFile(tempFile).getUri()))
|
||||
.build();
|
||||
}
|
||||
|
||||
private FlowableOnSubscribe<Object> nextRow(BufferedReader inputStream) {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
return s -> {
|
||||
String line;
|
||||
while ((line = inputStream.readLine()) != null) {
|
||||
s.onNext(mapper.readValue(line, Object.class));
|
||||
}
|
||||
|
||||
s.onComplete();
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package org.floworc.task.serdes.json;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.github.jknack.handlebars.internal.lang3.ArrayUtils;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.reactivex.BackpressureStrategy;
|
||||
import io.reactivex.Flowable;
|
||||
import io.reactivex.Single;
|
||||
import io.reactivex.schedulers.Schedulers;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.floworc.core.models.tasks.RunnableTask;
|
||||
import org.floworc.core.models.tasks.Task;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.serializers.ObjectsSerde;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.net.URI;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class JsonWriter extends Task implements RunnableTask {
|
||||
@NotNull
|
||||
private String from;
|
||||
|
||||
@Builder.Default
|
||||
private Boolean header = true;
|
||||
|
||||
@Builder.Default
|
||||
private Character fieldSeparator = ",".charAt(0);
|
||||
|
||||
@Builder.Default
|
||||
private Character textDelimiter = "\"".charAt(0);
|
||||
|
||||
@Builder.Default
|
||||
private Character[] lineDelimiter = ArrayUtils.toObject("\n".toCharArray());
|
||||
|
||||
@Builder.Default
|
||||
private Boolean alwaysDelimitText = false;
|
||||
|
||||
@Override
|
||||
public RunOutput run(RunContext runContext) throws Exception {
|
||||
// temp file
|
||||
File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".jsonl");
|
||||
|
||||
// writer
|
||||
BufferedWriter outfile = new BufferedWriter(new FileWriter(tempFile));
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
// reader
|
||||
URI from = new URI(runContext.render(this.from));
|
||||
ObjectInputStream inputStream = new ObjectInputStream(runContext.uriToInputStream(from));
|
||||
|
||||
Flowable<Object> flowable = Flowable
|
||||
.create(ObjectsSerde.<String, String>reader(inputStream), BackpressureStrategy.BUFFER)
|
||||
.observeOn(Schedulers.io())
|
||||
.doOnNext(o -> outfile.write(mapper.writeValueAsString(o) + "\n"))
|
||||
.doOnComplete(() -> {
|
||||
outfile.close();
|
||||
inputStream.close();
|
||||
});
|
||||
|
||||
|
||||
// metrics & finalize
|
||||
Single<Long> count = flowable.count();
|
||||
Long lineCount = count.blockingGet();
|
||||
|
||||
return RunOutput.builder()
|
||||
.outputs(ImmutableMap.of("uri", runContext.putFile(tempFile).getUri()))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package org.floworc.task.serdes;
|
||||
|
||||
import com.devskiller.friendly_id.FriendlyId;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.io.Files;
|
||||
import org.floworc.core.storages.StorageInterface;
|
||||
import org.floworc.core.storages.StorageObject;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class SerdesUtils {
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
public static String readResource(String file) throws URISyntaxException, IOException {
|
||||
return Files.asCharSource(
|
||||
SerdesUtils.resourceToFile(file),
|
||||
Charsets.UTF_8
|
||||
).read();
|
||||
}
|
||||
|
||||
public static File resourceToFile(String file) throws URISyntaxException {
|
||||
return new File(Objects.requireNonNull(SerdesUtils.class.getClassLoader()
|
||||
.getResource(file))
|
||||
.toURI());
|
||||
}
|
||||
|
||||
public StorageObject resourceToStorageObject(String file) throws URISyntaxException, IOException {
|
||||
return this.resourceToStorageObject(SerdesUtils.resourceToFile(file));
|
||||
}
|
||||
|
||||
public StorageObject resourceToStorageObject(File file) throws URISyntaxException, IOException {
|
||||
return storageInterface.put(
|
||||
new URI("/" + FriendlyId.createFriendlyId()),
|
||||
new FileInputStream(file)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
package org.floworc.task.serdes.avro;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.annotation.MicronautTest;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.BinaryDecoder;
|
||||
import org.apache.avro.io.BinaryEncoder;
|
||||
import org.apache.avro.io.DecoderFactory;
|
||||
import org.apache.avro.io.EncoderFactory;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.storages.StorageInterface;
|
||||
import org.floworc.core.storages.StorageObject;
|
||||
import org.floworc.task.serdes.SerdesUtils;
|
||||
import org.floworc.task.serdes.csv.CsvReader;
|
||||
import org.floworc.task.serdes.json.JsonReader;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@MicronautTest
|
||||
public class AvroConverterTest {
|
||||
@Inject
|
||||
ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
SerdesUtils serdesUtils;
|
||||
|
||||
@Test
|
||||
void fullCsv() throws Exception {
|
||||
String read = SerdesUtils.readResource("csv/full.avsc");
|
||||
|
||||
File sourceFile = SerdesUtils.resourceToFile("csv/full.csv");
|
||||
StorageObject csv = this.serdesUtils.resourceToStorageObject(sourceFile);
|
||||
|
||||
CsvReader reader = CsvReader.builder()
|
||||
.from(csv.getUri().toString())
|
||||
.fieldSeparator(",".charAt(0))
|
||||
.header(true)
|
||||
.build();
|
||||
RunOutput readerRunOutput = reader.run(new RunContext(this.applicationContext, ImmutableMap.of()));
|
||||
|
||||
AvroWriter task = AvroWriter.builder()
|
||||
.from(readerRunOutput.getOutputs().get("uri").toString())
|
||||
.schema(read)
|
||||
.dateFormat("yyyy/MM/dd")
|
||||
.timeFormat("H:mm")
|
||||
.build();
|
||||
|
||||
RunOutput avroRunOutput = task.run(new RunContext(this.applicationContext, ImmutableMap.of()));
|
||||
|
||||
assertThat(
|
||||
AvroWriterTest.avroSize(this.storageInterface.get((URI) avroRunOutput.getOutputs().get("uri"))),
|
||||
is(AvroWriterTest.avroSize(
|
||||
new FileInputStream(new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader()
|
||||
.getResource("csv/full.avro"))
|
||||
.toURI())))
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void fullJson() throws Exception {
|
||||
String read = SerdesUtils.readResource("csv/full.avsc");
|
||||
|
||||
File sourceFile = SerdesUtils.resourceToFile("csv/full.jsonl");
|
||||
StorageObject csv = this.serdesUtils.resourceToStorageObject(sourceFile);
|
||||
|
||||
JsonReader reader = JsonReader.builder()
|
||||
.from(csv.getUri().toString())
|
||||
.build();
|
||||
RunOutput readerRunOutput = reader.run(new RunContext(this.applicationContext, ImmutableMap.of()));
|
||||
|
||||
AvroWriter task = AvroWriter.builder()
|
||||
.from(readerRunOutput.getOutputs().get("uri").toString())
|
||||
.schema(read)
|
||||
.dateFormat("yyyy/MM/dd")
|
||||
.timeFormat("H:mm")
|
||||
.build();
|
||||
|
||||
RunOutput avroRunOutput = task.run(new RunContext(this.applicationContext, ImmutableMap.of()));
|
||||
|
||||
assertThat(
|
||||
AvroWriterTest.avroSize(this.storageInterface.get((URI) avroRunOutput.getOutputs().get("uri"))),
|
||||
is(AvroWriterTest.avroSize(
|
||||
new FileInputStream(new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader()
|
||||
.getResource("csv/full.avro"))
|
||||
.toURI())))
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public static class Utils {
|
||||
public static void oneField(Object v, Object expected, Schema type) throws AvroConverter.IllegalRowConvertion {
|
||||
AvroConverter avroConverter = AvroConverter.builder().build();
|
||||
Schema schema = oneFieldSchema(type);
|
||||
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
map.put("fieldName", v);
|
||||
|
||||
GenericData.Record record = avroConverter.fromMap(schema, map);
|
||||
GenericRecord serialized = Utils.test(schema, record);
|
||||
|
||||
assertThat(record, is(serialized));
|
||||
assertThat(serialized.get("fieldName"), is(expected));
|
||||
}
|
||||
|
||||
public static void oneFieldFailed(Object v, Schema type) {
|
||||
AvroConverter avroConverter = AvroConverter.builder().build();
|
||||
Schema schema = oneFieldSchema(type);
|
||||
|
||||
assertThrows(AvroConverter.IllegalRowConvertion.class, () -> avroConverter.fromMap(schema, ImmutableMap.of("fieldName", v)));
|
||||
}
|
||||
|
||||
public static Schema oneFieldSchema(Schema type) {
|
||||
return schema(a -> a.name("fieldName").type(type).noDefault());
|
||||
}
|
||||
|
||||
public static Schema schema(Consumer<SchemaBuilder.FieldAssembler<Schema>> consumer) {
|
||||
SchemaBuilder.FieldAssembler<Schema> b = SchemaBuilder.record("rGenericDatumWriterecordName")
|
||||
.fields();
|
||||
|
||||
consumer.accept(b);
|
||||
|
||||
return b.endRecord();
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public static GenericRecord test(Schema schema, GenericData.Record record) {
|
||||
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema, AvroConverter.genericData());
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
|
||||
writer.write(record, encoder);
|
||||
encoder.flush();
|
||||
|
||||
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema, schema, AvroConverter.genericData());
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
|
||||
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
|
||||
return reader.read(null, decoder);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package org.floworc.task.avro;
|
||||
package org.floworc.task.serdes.avro;
|
||||
|
||||
import com.devskiller.friendly_id.FriendlyId;
|
||||
import com.google.common.base.Charsets;
|
||||
@@ -29,7 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@MicronautTest
|
||||
class CsvToAvroTest {
|
||||
class AvroWriterTest {
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
@@ -37,38 +37,46 @@ class CsvToAvroTest {
|
||||
ApplicationContext applicationContext;
|
||||
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
void map() throws Exception {
|
||||
test("csv/insurance_sample.javas");
|
||||
}
|
||||
|
||||
@Test
|
||||
void array() throws Exception {
|
||||
test("csv/insurance_sample_array.javas");
|
||||
}
|
||||
|
||||
void test(String file) throws Exception {
|
||||
StorageObject source = storageInterface.put(
|
||||
new URI("/" + FriendlyId.createFriendlyId()),
|
||||
new FileInputStream(new File(Objects.requireNonNull(CsvToAvroTest.class.getClassLoader()
|
||||
.getResource("csv/insurance_sample.csv"))
|
||||
new FileInputStream(new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader()
|
||||
.getResource(file))
|
||||
.toURI()))
|
||||
);
|
||||
|
||||
CsvToAvro task = CsvToAvro.builder()
|
||||
AvroWriter task = AvroWriter.builder()
|
||||
.from(source.getUri().toString())
|
||||
.schema(
|
||||
Files.asCharSource(
|
||||
new File(Objects.requireNonNull(CsvToAvroTest.class.getClassLoader().getResource("csv/insurance_sample.avsc")).toURI()),
|
||||
new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader().getResource("csv/insurance_sample.avsc")).toURI()),
|
||||
Charsets.UTF_8
|
||||
).read()
|
||||
)
|
||||
.fieldSeparator(";".charAt(0))
|
||||
.build();
|
||||
|
||||
RunOutput run = task.run(new RunContext(this.applicationContext, ImmutableMap.of()));
|
||||
|
||||
assertThat(
|
||||
this.avroSize(this.storageInterface.get((URI) run.getOutputs().get("uri"))),
|
||||
is(this.avroSize(
|
||||
new FileInputStream(new File(Objects.requireNonNull(CsvToAvroTest.class.getClassLoader()
|
||||
AvroWriterTest.avroSize(this.storageInterface.get((URI) run.getOutputs().get("uri"))),
|
||||
is(AvroWriterTest.avroSize(
|
||||
new FileInputStream(new File(Objects.requireNonNull(AvroWriterTest.class.getClassLoader()
|
||||
.getResource("csv/insurance_sample.avro"))
|
||||
.toURI())))
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private int avroSize(InputStream inputStream) throws IOException {
|
||||
public static int avroSize(InputStream inputStream) throws IOException {
|
||||
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
|
||||
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(inputStream, datumReader);
|
||||
AtomicInteger i = new AtomicInteger();
|
||||
@@ -0,0 +1,41 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class ComplexArrayTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of(Arrays.asList("42.2", 42.2D), Arrays.asList(42.2F, 42.2F), Schema.create(Schema.Type.FLOAT)),
|
||||
Arguments.of(Arrays.asList("null", "true", true, false, null), Arrays.asList(null, true, true, false, null), Schema.createUnion(Schema.create(Schema.Type.BOOLEAN), Schema.create(Schema.Type.NULL)))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, List<Object> expected, Schema type) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, SchemaBuilder.array().items(type));
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of(Arrays.asList("a", 42.2), Schema.create(Schema.Type.FLOAT)),
|
||||
Arguments.of(Arrays.asList("null", "a"), Schema.createUnion(Schema.create(Schema.Type.BOOLEAN), Schema.create(Schema.Type.NULL)))
|
||||
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v, Schema type) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, SchemaBuilder.array().items(type));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class ComplexEnumTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("a", "a", Arrays.asList("a", "b", "c")),
|
||||
Arguments.of("ž", "ž", Arrays.asList("a", "ž", "c")),
|
||||
Arguments.of("ࠀ", "ࠀ", Arrays.asList("a", "b", "ࠀ"))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, String expected, List<String> values) throws Exception {
|
||||
Schema schema = SchemaBuilder.enumeration("enumeration").symbols(values.toArray(String[]::new));
|
||||
AvroConverterTest.Utils.oneField(v, new GenericData.EnumSymbol(schema, expected), schema);
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of("", Arrays.asList("a", "b", "c")),
|
||||
Arguments.of("ࠀ", Arrays.asList("a", "b", "c"))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v, List<String> values) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, SchemaBuilder.enumeration("enumeration").symbols(values.toArray(String[]::new)));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class ComplexFixedTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("a", "a", 1),
|
||||
Arguments.of("ž", "ž", 2),
|
||||
Arguments.of("ࠀ", "ࠀ", 3),
|
||||
Arguments.of("\uD83D\uDCA9", "\uD83D\uDCA9", 4)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, String expected, int length) throws Exception {
|
||||
Schema schema = SchemaBuilder.fixed("fixed").size(length);
|
||||
|
||||
AvroConverterTest.Utils.oneField(v, new GenericData.Fixed(schema, expected.getBytes()), schema);
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of("a", 3),
|
||||
Arguments.of("", 1),
|
||||
Arguments.of("ࠀ", 2)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v, int length) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, SchemaBuilder.fixed("fixed").size(length));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.avro.util.Utf8;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class ComplexMapTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of(
|
||||
ImmutableMap.of("a", 42.2D, "b", "42", "c", 42.2D),
|
||||
ImmutableMap.of(new Utf8("a".getBytes()), 42.2F, new Utf8("b".getBytes()), 42F, new Utf8("c".getBytes()), 42.2F),
|
||||
Schema.create(Schema.Type.FLOAT))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, Map<Utf8, Object> expected, Schema type) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, SchemaBuilder.map().values(type));
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of(ImmutableMap.of("a", 42.2D, "b", "a"), Schema.create(Schema.Type.FLOAT)),
|
||||
Arguments.of(ImmutableMap.of("a", "null", "b", "a"), Schema.createUnion(Schema.create(Schema.Type.BOOLEAN), Schema.create(Schema.Type.NULL)))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v, Schema type) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, SchemaBuilder.map().values(type));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class ComplexUnionTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("null", Arrays.asList(Schema.Type.NULL, Schema.Type.BOOLEAN), null),
|
||||
Arguments.of("null", Arrays.asList(Schema.Type.BOOLEAN, Schema.Type.NULL), null),
|
||||
Arguments.of("1", Arrays.asList(Schema.Type.INT, Schema.Type.NULL), 1)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
static void convert(Object v, List<Schema.Type> schemas, Object expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, Schema.createUnion(schemas
|
||||
.stream()
|
||||
.map(Schema::create)
|
||||
.collect(Collectors.toList())
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
|
||||
|
||||
@TestInstance(PER_CLASS)
|
||||
@Nested
|
||||
public
|
||||
class LogicalDateTest {
|
||||
private Schema schema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
|
||||
|
||||
Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("2019-12-26", LocalDate.parse("2019-12-26", DateTimeFormatter.ISO_DATE)),
|
||||
Arguments.of("2011-12-03+01:00", LocalDate.parse("2011-12-03+01:00", DateTimeFormatter.ISO_DATE))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(CharSequence v, LocalDate expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, schema);
|
||||
}
|
||||
|
||||
Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of("12-26-2019"),
|
||||
Arguments.of("2019-12+0100")
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, schema);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
||||
public class LogicalDateTimeTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("2019-12-26T12:13", LocalDateTime.parse("2019-12-26T12:13+01:00", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC)),
|
||||
Arguments.of("2019-12-26T12:13:11", LocalDateTime.parse("2019-12-26T12:13:11+01:00", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC)),
|
||||
Arguments.of("2019-12-26T12:13:11.123000", LocalDateTime.parse("2019-12-26T12:13:11.123000", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC)),
|
||||
Arguments.of("2019-12-26T12:13:11+01:00", LocalDateTime.parse("2019-12-26T12:13:11+01:00", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC)),
|
||||
Arguments.of("2019-12-26T12:13:11.123000+01:00", LocalDateTime.parse("2019-12-26T12:13:11.123000+01:00", DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(CharSequence v, Instant expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)));
|
||||
AvroConverterTest.Utils.oneField(v, expected, LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)));
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of("12:26:2019")
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
|
||||
|
||||
public class LogicalDecimalTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("12.82", new BigDecimal("12.82"), 4, 2),
|
||||
Arguments.of("12.8", new BigDecimal("12.80"), 4, 2),
|
||||
Arguments.of(12.8F, new BigDecimal("12.80"), 4, 2),
|
||||
Arguments.of("12.828282", new BigDecimal("12.828282"), 8, 6),
|
||||
Arguments.of(12L, new BigDecimal("12.00"), 4, 2),
|
||||
Arguments.of(12, new BigDecimal("12.00"), 4, 2),
|
||||
Arguments.of(12.8444D, new BigDecimal("12.84"), 4, 2),
|
||||
Arguments.of(12.8444F, new BigDecimal("12.84"), 4, 2),
|
||||
Arguments.of("2019", new BigDecimal("2019"), 4, 0)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, BigDecimal expected, Integer precision, Integer scale) throws Exception {
|
||||
Schema schema = LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES));
|
||||
AvroConverterTest.Utils.oneField(v, expected, schema);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.time.LocalTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class LogicalTimeTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("12:13", LocalTime.parse("12:13+01:00", DateTimeFormatter.ISO_TIME)),
|
||||
Arguments.of("12:13:11", LocalTime.parse("12:13:11+01:00", DateTimeFormatter.ISO_TIME)),
|
||||
Arguments.of("12:13:11.123000", LocalTime.parse("12:13:11.123000", DateTimeFormatter.ISO_TIME)),
|
||||
Arguments.of("12:13:11+01:00", LocalTime.parse("12:13:11+01:00", DateTimeFormatter.ISO_TIME)),
|
||||
Arguments.of("12:13:11.123000+01:00", LocalTime.parse("12:13:11.123000+01:00", DateTimeFormatter.ISO_TIME))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(CharSequence v, LocalTime expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)));
|
||||
AvroConverterTest.Utils.oneField(v, expected, LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)));
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of("12:26:2019"),
|
||||
Arguments.of("12+0100")
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class LogicalUuidTest {
|
||||
private Schema schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
|
||||
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("123e4567-e89b-12d3-a456-556642440000", UUID.fromString("123e4567-e89b-12d3-a456-556642440000"))
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(CharSequence v, UUID expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, schema);
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of("123e4567"),
|
||||
Arguments.of("123e4567e89b12d3a456556642440000")
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, schema);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class PrimitiveBoolTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("true", true),
|
||||
Arguments.of("True", true),
|
||||
Arguments.of("1", true),
|
||||
Arguments.of(1, true),
|
||||
Arguments.of(true, true),
|
||||
Arguments.of("False", false),
|
||||
Arguments.of("0", false),
|
||||
Arguments.of(0, false),
|
||||
Arguments.of("", false),
|
||||
Arguments.of(false, false)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, boolean expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.BOOLEAN));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class PrimitiveFloatTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of(-42F, -42F),
|
||||
Arguments.of("-42", -42F),
|
||||
Arguments.of(-42, -42F),
|
||||
Arguments.of(-42D, -42F),
|
||||
Arguments.of(42F, 42F),
|
||||
Arguments.of("42", 42F),
|
||||
Arguments.of(42, 42F),
|
||||
Arguments.of(42D, 42F)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, float expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.FLOAT));
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of("a"),
|
||||
Arguments.of(9223372036854775807L)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, Schema.create(Schema.Type.FLOAT));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class PrimitiveIntTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of(-42, -42),
|
||||
Arguments.of("-42", -42),
|
||||
Arguments.of(42, 42),
|
||||
Arguments.of("42", 42)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, int expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.INT));
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of("-42.2"),
|
||||
Arguments.of(42.2D),
|
||||
Arguments.of(42.2F),
|
||||
Arguments.of("a"),
|
||||
Arguments.of(9223372036854775807L)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, Schema.create(Schema.Type.INT));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
class PrimitiveLongTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of(-42, -42L),
|
||||
Arguments.of("-42", -42L),
|
||||
Arguments.of(42, 42L),
|
||||
Arguments.of("42", 42L),
|
||||
Arguments.of(9223372036854775807L, 9223372036854775807L)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, long expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.LONG));
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of("-42.2"),
|
||||
Arguments.of(42.2D),
|
||||
Arguments.of(42.2F),
|
||||
Arguments.of("a"),
|
||||
Arguments.of("9223372036854775808")
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, Schema.create(Schema.Type.LONG));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class PrimitiveNullTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("NULL", null),
|
||||
Arguments.of("n/a", null),
|
||||
Arguments.of("N/A", null),
|
||||
Arguments.of("", null)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, Object expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, expected, Schema.create(Schema.Type.NULL));
|
||||
}
|
||||
|
||||
static Stream<Arguments> failedSource() {
|
||||
return Stream.of(
|
||||
Arguments.of(-42),
|
||||
Arguments.of(9223372036854775807L),
|
||||
Arguments.of("a"),
|
||||
Arguments.of("42"),
|
||||
Arguments.of(42.2D),
|
||||
Arguments.of(42.2F)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("failedSource")
|
||||
void failed(Object v) {
|
||||
AvroConverterTest.Utils.oneFieldFailed(v, Schema.create(Schema.Type.NULL));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package org.floworc.task.serdes.avro.converter;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.util.Utf8;
|
||||
import org.floworc.task.serdes.avro.AvroConverterTest;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class PrimitiveStringBytesTest {
|
||||
static Stream<Arguments> source() {
|
||||
return Stream.of(
|
||||
Arguments.of("a", "a"),
|
||||
Arguments.of("true", "true"),
|
||||
Arguments.of(null, "null"),
|
||||
Arguments.of(1, "1"),
|
||||
Arguments.of(42D, "42.0"),
|
||||
Arguments.of(42F, "42.0"),
|
||||
Arguments.of(42L, "42"),
|
||||
Arguments.of(42.0D, "42.0"),
|
||||
Arguments.of("", "")
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
void convert(Object v, String expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, new Utf8(expected.getBytes()), Schema.create(Schema.Type.STRING));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("source")
|
||||
static void convertBytes(Object v, String expected) throws Exception {
|
||||
AvroConverterTest.Utils.oneField(v, ByteBuffer.wrap(new Utf8(expected.getBytes()).getBytes()), Schema.create(Schema.Type.BYTES));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
package org.floworc.task.serdes.csv;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.CharStreams;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.annotation.MicronautTest;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.storages.StorageInterface;
|
||||
import org.floworc.core.storages.StorageObject;
|
||||
import org.floworc.task.serdes.SerdesUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@MicronautTest
|
||||
class CsvReaderWriterTest {
|
||||
@Inject
|
||||
ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
SerdesUtils serdesUtils;
|
||||
|
||||
private void test(String file, boolean header) throws Exception {
|
||||
File sourceFile = SerdesUtils.resourceToFile(file);
|
||||
StorageObject source = this.serdesUtils.resourceToStorageObject(sourceFile);
|
||||
|
||||
CsvReader reader = CsvReader.builder()
|
||||
.from(source.getUri().toString())
|
||||
.fieldSeparator(";".charAt(0))
|
||||
.header(header)
|
||||
.build();
|
||||
RunOutput readerRunOutput = reader.run(new RunContext(this.applicationContext, ImmutableMap.of()));
|
||||
|
||||
CsvWriter writer = CsvWriter.builder()
|
||||
.from(readerRunOutput.getOutputs().get("uri").toString())
|
||||
.fieldSeparator(";".charAt(0))
|
||||
.alwaysDelimitText(true)
|
||||
.header(header)
|
||||
.build();
|
||||
RunOutput writerRunOutput = writer.run(new RunContext(this.applicationContext, ImmutableMap.of()));
|
||||
|
||||
assertThat(
|
||||
CharStreams.toString(new InputStreamReader(storageInterface.get((URI) writerRunOutput.getOutputs().get("uri")))),
|
||||
is(CharStreams.toString(new InputStreamReader(new FileInputStream(sourceFile))))
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void header() throws Exception {
|
||||
this.test("csv/insurance_sample.csv", true);
|
||||
}
|
||||
|
||||
@Test
|
||||
void noHeader() throws Exception {
|
||||
this.test("csv/insurance_sample_no_header.csv", false);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package org.floworc.task.serdes.json;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.CharStreams;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.annotation.MicronautTest;
|
||||
import org.floworc.core.runners.RunContext;
|
||||
import org.floworc.core.runners.RunOutput;
|
||||
import org.floworc.core.storages.StorageInterface;
|
||||
import org.floworc.core.storages.StorageObject;
|
||||
import org.floworc.task.serdes.SerdesUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@MicronautTest
|
||||
class JsonReaderWriterTest {
|
||||
@Inject
|
||||
ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
SerdesUtils serdesUtils;
|
||||
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
File sourceFile = SerdesUtils.resourceToFile("csv/full.jsonl");
|
||||
StorageObject source = this.serdesUtils.resourceToStorageObject(sourceFile);
|
||||
|
||||
JsonReader reader = JsonReader.builder()
|
||||
.from(source.getUri().toString())
|
||||
.build();
|
||||
RunOutput readerRunOutput = reader.run(new RunContext(this.applicationContext, ImmutableMap.of()));
|
||||
|
||||
JsonWriter writer = JsonWriter.builder()
|
||||
.from(readerRunOutput.getOutputs().get("uri").toString())
|
||||
.build();
|
||||
RunOutput writerRunOutput = writer.run(new RunContext(this.applicationContext, ImmutableMap.of()));
|
||||
|
||||
assertThat(
|
||||
CharStreams.toString(new InputStreamReader(storageInterface.get((URI) writerRunOutput.getOutputs().get("uri")))),
|
||||
is(CharStreams.toString(new InputStreamReader(new FileInputStream(sourceFile))))
|
||||
);
|
||||
}
|
||||
}
|
||||
BIN
task-serdes/src/test/resources/csv/full.avro
Normal file
BIN
task-serdes/src/test/resources/csv/full.avro
Normal file
Binary file not shown.
5001
task-serdes/src/test/resources/csv/full.csv
Normal file
5001
task-serdes/src/test/resources/csv/full.csv
Normal file
File diff suppressed because it is too large
Load Diff
BIN
task-serdes/src/test/resources/csv/full.javas
Normal file
BIN
task-serdes/src/test/resources/csv/full.javas
Normal file
Binary file not shown.
5000
task-serdes/src/test/resources/csv/full.jsonl
Normal file
5000
task-serdes/src/test/resources/csv/full.jsonl
Normal file
File diff suppressed because it is too large
Load Diff
BIN
task-serdes/src/test/resources/csv/insurance_sample.javas
Normal file
BIN
task-serdes/src/test/resources/csv/insurance_sample.javas
Normal file
Binary file not shown.
BIN
task-serdes/src/test/resources/csv/insurance_sample_array.javas
Normal file
BIN
task-serdes/src/test/resources/csv/insurance_sample_array.javas
Normal file
Binary file not shown.
@@ -0,0 +1,5 @@
|
||||
"119736";"FL";"CLAY COUNTY";"498960";"498960";"498960";"498960";"498960";"792148.9";"0";"9979.2";"0";"0";"30.102261";"-81.711777";"Residential";"Masonry";"1"
|
||||
"448094";"FL";"CLAY COUNTY";"1322376.3";"1322376.3";"1322376.3";"1322376.3";"1322376.3";"1438163.57";"0";"0";"0";"0";"30.063936";"-81.707664";"Residential";"Masonry";"3"
|
||||
"206893";"FL";"CLAY COUNTY";"190724.4";"190724.4";"190724.4";"190724.4";"190724.4";"192476.78";"0";"0";"0";"0";"30.089579";"-81.700455";"Residential";"Wood";"1"
|
||||
"333743";"FL";"CLAY COUNTY";"0";"79520.76";"0";"0";"79520.76";"86854.48";"0";"0";"0";"0";"30.063236";"-81.707703";"Residential";"Wood";"3"
|
||||
"172534";"FL";"CLAY COUNTY";"0";"254281.5";"0";"254281.5";"254281.5";"246144.49";"0";"0";"0";"0";"30.060614";"-81.702675";"Residential";"Wood";"1"
|
||||
|
Reference in New Issue
Block a user