feat(avro): introduce avro tasks

This commit is contained in:
tchiotludo
2019-10-10 13:23:14 +02:00
parent 14e3384be2
commit 372d54cf78
9 changed files with 5268 additions and 0 deletions

View File

@@ -20,5 +20,6 @@ dependencies {
compile project(":storage-gcs")
compile project(":storage-minio")
compile project(":task-avro")
compile project(":task-gcp")
}

View File

@@ -12,4 +12,5 @@ include 'storage-minio'
include 'repository-memory'
include 'task-avro'
include 'task-gcp'

14
task-avro/build.gradle Normal file
View File

@@ -0,0 +1,14 @@
buildscript {
ext {
avroVersion = "1.9.0"
}
}
sourceCompatibility = 11
dependencies {
compile project(":core")
compile group: "org.apache.avro", name: "avro", version: avroVersion
compile 'de.siegmar:fastcsv:1.0.3'
}

View File

@@ -0,0 +1,119 @@
package org.floworc.task.avro;
import de.siegmar.fastcsv.reader.CsvParser;
import de.siegmar.fastcsv.reader.CsvReader;
import de.siegmar.fastcsv.reader.CsvRow;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.floworc.core.models.tasks.RunnableTask;
import org.floworc.core.models.tasks.Task;
import org.floworc.core.runners.RunContext;
import org.floworc.core.runners.RunOutput;
import org.floworc.core.storages.StorageObject;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.charset.StandardCharsets;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@Value
@Slf4j
public class CsvToAvro extends Task implements RunnableTask {
private File source;
private Schema schema;
private Boolean header;
private Character fieldSeparator;
private Character textDelimiter;
private Boolean skipEmptyRows;
@Override
public RunOutput run(RunContext runContext) throws Exception {
CsvReader csvReader = this.getReader();
CsvParser csvParser = csvReader.parse(source, StandardCharsets.UTF_8);
BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream("fileName"));
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, output);
//noinspection ResultOfMethodCallIgnored
Flowable
.create(this.nextRow(csvParser), BackpressureStrategy.BUFFER)
.observeOn(Schedulers.computation())
.map(this.convertToAvro())
.observeOn(Schedulers.io())
.subscribe(
dataFileWriter::append,
error -> {
throw new RuntimeException(error);
},
() -> {
dataFileWriter.close();
csvParser.close();
}
);
return null;
}
private Function<CsvRow, GenericData.Record> convertToAvro() {
return row -> {
GenericData.Record record = new GenericData.Record(schema);
for (Schema.Field field : schema.getFields()) {
record.put(field.name(), row.getField(field.name()));
}
return record;
};
}
private FlowableOnSubscribe<CsvRow> nextRow(CsvParser csvParser) {
return s -> {
CsvRow row;
while ((row = csvParser.nextRow()) != null) {
s.onNext(row);
}
s.onComplete();
};
}
private CsvReader getReader() {
CsvReader csvReader = new CsvReader();
if (this.header != null) {
csvReader.setContainsHeader(this.header);
}
if (this.textDelimiter != null) {
csvReader.setTextDelimiter(textDelimiter);
}
if (this.fieldSeparator != null) {
csvReader.setFieldSeparator(fieldSeparator);
}
if (this.skipEmptyRows != null) {
csvReader.setSkipEmptyRows(skipEmptyRows);
}
return csvReader;
}
}

View File

@@ -0,0 +1,22 @@
package org.floworc.task.avro;
import org.apache.avro.Schema;
import org.floworc.core.runners.RunContext;
import org.junit.jupiter.api.Test;
import java.io.File;
class CsvToAvroTest {
@Test
void run() throws Exception {
CsvToAvro bash = new CsvToAvro(
new File(CsvToAvroTest.class.getClassLoader().getResource("csv/insurance_sample.csv").toURI()),
new Schema.Parser().parse(new File(CsvToAvroTest.class.getClassLoader().getResource("csv/insurance_sample.avsc").toURI())),
true,
";".charAt(0),
"\"".charAt(0),
true
);
bash.run(new RunContext());
}
}

View File

@@ -0,0 +1,80 @@
{
"type": "record",
"name": "Full",
"namespace": "org.floworc",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "nameNullable",
"type": [
"string",
"null"
]
},
{
"name": "email",
"type": "string"
},
{
"name": "enum",
"type": {
"type": "enum",
"name": "Gender",
"symbols": [
"Female",
"Male"
]
}
},
{
"name": "long",
"type": [
"long",
"null"
]
},
{
"name": "double",
"type": "double"
},
{
"name": "boolean",
"type": "boolean"
},
{
"name": "date",
"type": {
"type": "int",
"logicalType": "date"
}
},
{
"name": "timeMillis",
"type": {
"type": "int",
"logicalType": "time-millis"
}
},
{
"name": "timestampMillis",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "timestampMicros",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}
]
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,24 @@
{
"name": "insurance",
"type": "record",
"fields": [
{"name":"policyID", "type":"string"},
{"name":"statecode", "type":"string"},
{"name":"county", "type":"string"},
{"name":"eq_site_limit", "type":"string"},
{"name":"hu_site_limit", "type":"string"},
{"name":"fl_site_limit", "type":"string"},
{"name":"fr_site_limit", "type":"string"},
{"name":"tiv_2011", "type":"string"},
{"name":"tiv_2012", "type":"string"},
{"name":"eq_site_deductible", "type":"string"},
{"name":"hu_site_deductible", "type":"string"},
{"name":"fl_site_deductible", "type":"string"},
{"name":"fr_site_deductible", "type":"string"},
{"name":"point_latitude", "type":"string"},
{"name":"point_longitude", "type":"string"},
{"name":"line", "type":"string"},
{"name":"construction", "type":"string"},
{"name":"point_granularity", "type":["null","string"]}
]
}

View File

@@ -0,0 +1,6 @@
"policyID";"statecode";"county";"eq_site_limit";"hu_site_limit";"fl_site_limit";"fr_site_limit";"tiv_2011";"tiv_2012";"eq_site_deductible";"hu_site_deductible";"fl_site_deductible";"fr_site_deductible";"point_latitude";"point_longitude";"line";"construction";"point_granularity"
"119736";"FL";"CLAY COUNTY";"498960";"498960";"498960";"498960";"498960";"792148.9";"0";"9979.2";"0";"0";"30.102261";"-81.711777";"Residential";"Masonry";"1"
"448094";"FL";"CLAY COUNTY";"1322376.3";"1322376.3";"1322376.3";"1322376.3";"1322376.3";"1438163.57";"0";"0";"0";"0";"30.063936";"-81.707664";"Residential";"Masonry";"3"
"206893";"FL";"CLAY COUNTY";"190724.4";"190724.4";"190724.4";"190724.4";"190724.4";"192476.78";"0";"0";"0";"0";"30.089579";"-81.700455";"Residential";"Wood";"1"
"333743";"FL";"CLAY COUNTY";"0";"79520.76";"0";"0";"79520.76";"86854.48";"0";"0";"0";"0";"30.063236";"-81.707703";"Residential";"Wood";"3"
"172534";"FL";"CLAY COUNTY";"0";"254281.5";"0";"254281.5";"254281.5";"246144.49";"0";"0";"0";"0";"30.060614";"-81.702675";"Residential";"Wood";"1"
1 policyID statecode county eq_site_limit hu_site_limit fl_site_limit fr_site_limit tiv_2011 tiv_2012 eq_site_deductible hu_site_deductible fl_site_deductible fr_site_deductible point_latitude point_longitude line construction point_granularity
2 119736 FL CLAY COUNTY 498960 498960 498960 498960 498960 792148.9 0 9979.2 0 0 30.102261 -81.711777 Residential Masonry 1
3 448094 FL CLAY COUNTY 1322376.3 1322376.3 1322376.3 1322376.3 1322376.3 1438163.57 0 0 0 0 30.063936 -81.707664 Residential Masonry 3
4 206893 FL CLAY COUNTY 190724.4 190724.4 190724.4 190724.4 190724.4 192476.78 0 0 0 0 30.089579 -81.700455 Residential Wood 1
5 333743 FL CLAY COUNTY 0 79520.76 0 0 79520.76 86854.48 0 0 0 0 30.063236 -81.707703 Residential Wood 3
6 172534 FL CLAY COUNTY 0 254281.5 0 254281.5 254281.5 246144.49 0 0 0 0 30.060614 -81.702675 Residential Wood 1