1
0
mirror of synced 2025-12-25 02:09:19 -05:00

S3 and GCS destinations: Updating processing data types for Avro/Parquet formats (#13483)

* S3 destination: Updating processing data types for Avro/Parquet formats

* S3 destination: handle comparing data types

* S3 destination: clean code

* S3 destination: clean code

* S3 destination: handle case with unexpected json schema type

* S3 destination: clean code

* S3 destination: Extract the same logic for Avro/Parquet formats to separate parent class

* S3 destination: clean code

* S3 destination: clean code

* GCS destination: Update data types processing for Avro/Parquet formats

* GCS destination: clean redundant code

* S3 destination: handle case with numbers inside array

* S3 destination: clean code

* S3 destination: add unit test

* S3 destination: update unit test cases with number types.

* S3 destination: update unit tests.

* S3 destination: bump version for s3 and gcs

* auto-bump connector version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
oneshcheret
2022-06-14 14:15:58 +03:00
committed by GitHub
parent ccd053d790
commit 8e54f4fd6e
21 changed files with 689 additions and 39 deletions

View File

@@ -100,7 +100,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.2.6
dockerImageTag: 0.2.7
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
resourceRequirements:
@@ -244,7 +244,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.6
dockerImageTag: 0.3.7
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
resourceRequirements:

View File

@@ -1486,7 +1486,7 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-gcs:0.2.6"
- dockerImage: "airbyte/destination-gcs:0.2.7"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
connectionSpecification:
@@ -3895,7 +3895,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.6"
- dockerImage: "airbyte/destination-s3:0.3.7"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:

View File

@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.standardtest.destination;
import java.util.stream.Stream;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
public class NumberDataTypeTestArgumentProvider implements ArgumentsProvider {
public static final String NUMBER_DATA_TYPE_TEST_CATALOG = "number_data_type_test_catalog.json";
public static final String NUMBER_DATA_TYPE_TEST_MESSAGES = "number_data_type_test_messages.txt";
public static final String NUMBER_DATA_TYPE_ARRAY_TEST_CATALOG = "number_data_type_array_test_catalog.json";
public static final String NUMBER_DATA_TYPE_ARRAY_TEST_MESSAGES = "number_data_type_array_test_messages.txt";
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
Arguments.of(NUMBER_DATA_TYPE_TEST_CATALOG, NUMBER_DATA_TYPE_TEST_MESSAGES),
Arguments.of(NUMBER_DATA_TYPE_ARRAY_TEST_CATALOG, NUMBER_DATA_TYPE_ARRAY_TEST_MESSAGES));
}
}

View File

@@ -0,0 +1,38 @@
{
"streams": [
{
"name": "array_test_1",
"json_schema": {
"properties": {
"array_number": {
"type": ["array"],
"items": {
"type": "number"
}
},
"array_float": {
"type": ["array"],
"items": {
"type": "number",
"airbyte_type": "float"
}
},
"array_integer": {
"type": ["array"],
"items": {
"type": "number",
"airbyte_type": "integer"
}
},
"array_big_integer": {
"type": ["array"],
"items": {
"type": "number",
"airbyte_type": "big_integer"
}
}
}
}
}
]
}

View File

@@ -0,0 +1,2 @@
{"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "array_number" : [-12345.678, 100000000000000000.1234],"array_float" : [-12345.678, 0, 1000000000000000000000000000000000000000000000000000.1234], "array_integer" : [42, 0, 12345], "array_big_integer" : [0, 1141241234124123141241234124] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}

View File

@@ -0,0 +1,47 @@
{
"streams": [
{
"name": "int_test",
"json_schema": {
"properties": {
"data": {
"type": "number",
"airbyte_type": "integer"
}
}
}
},
{
"name": "big_integer_test",
"json_schema": {
"properties": {
"data": {
"type": "number",
"airbyte_type": "big_integer"
}
}
}
},
{
"name": "float_test",
"json_schema": {
"properties": {
"data": {
"type": "number",
"airbyte_type": "float"
}
}
}
},
{
"name": "default_number_test",
"json_schema": {
"properties": {
"data": {
"type": "number"
}
}
}
}
]
}

View File

@@ -0,0 +1,13 @@
{"type": "RECORD", "record": {"stream": "int_test", "emitted_at": 1602637589100, "data": { "data" : 42 }}}
{"type": "RECORD", "record": {"stream": "int_test", "emitted_at": 1602637589200, "data": { "data" : 0 }}}
{"type": "RECORD", "record": {"stream": "int_test", "emitted_at": 1602637589300, "data": { "data" : -12345 }}}
{"type": "RECORD", "record": {"stream": "big_integer_test", "emitted_at": 1602637589100, "data": { "data" : 1231123412412314 }}}
{"type": "RECORD", "record": {"stream": "big_integer_test", "emitted_at": 1602637589200, "data": { "data" : 0 }}}
{"type": "RECORD", "record": {"stream": "big_integer_test", "emitted_at": 1602637589300, "data": { "data" : -1234 }}}
{"type": "RECORD", "record": {"stream": "float_test", "emitted_at": 1602637589100, "data": { "data" : 56.78 }}}
{"type": "RECORD", "record": {"stream": "float_test", "emitted_at": 1602637589200, "data": { "data" : 0 }}}
{"type": "RECORD", "record": {"stream": "float_test", "emitted_at": 1602637589300, "data": { "data" : -12345.678 }}}
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589100, "data": { "data" : 10000000000000000000000.1234 }}}
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589200, "data": { "data" : 0 }}}
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589300, "data": { "data" : -12345.678 }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}

View File

@@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs
COPY --from=build /airbyte /airbyte
LABEL io.airbyte.version=0.2.6
LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.name=airbyte/destination-gcs

View File

@@ -14,15 +14,19 @@ import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
public class GcsAvroDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {
public class GcsAvroDestinationAcceptanceTest extends GcsAvroParquetDestinationAcceptanceTest {
protected GcsAvroDestinationAcceptanceTest() {
super(S3Format.AVRO);
@@ -71,4 +75,25 @@ public class GcsAvroDestinationAcceptanceTest extends GcsDestinationAcceptanceTe
return jsonRecords;
}
@Override
protected Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final String streamName, final String namespace) throws Exception {
final List<S3ObjectSummary> objectSummaries = getAllSyncedObjects(streamName, namespace);
Map<String, Set<Type>> resultDataTypes = new HashMap<>();
for (final S3ObjectSummary objectSummary : objectSummaries) {
final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
try (final DataFileReader<Record> dataFileReader = new DataFileReader<>(
new SeekableByteArrayInput(object.getObjectContent().readAllBytes()),
new GenericDatumReader<>())) {
while (dataFileReader.hasNext()) {
final GenericData.Record record = dataFileReader.next();
Map<String, Set<Type>> actualDataTypes = getTypes(record);
resultDataTypes.putAll(actualDataTypes);
}
}
}
return resultDataTypes;
}
}

View File

@@ -0,0 +1,146 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.gcs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.JsonSchemaType;
import io.airbyte.integrations.standardtest.destination.NumberDataTypeTestArgumentProvider;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData.Record;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
public abstract class GcsAvroParquetDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {
protected GcsAvroParquetDestinationAcceptanceTest(S3Format s3Format) {
super(s3Format);
}
@ParameterizedTest
@ArgumentsSource(NumberDataTypeTestArgumentProvider.class)
public void testNumberDataType(String catalogFileName, String messagesFileName) throws Exception {
final AirbyteCatalog catalog = readCatalogFromFile(catalogFileName);
final List<AirbyteMessage> messages = readMessagesFromFile(messagesFileName);
final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
for (final AirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getName();
final String schema = stream.getNamespace() != null ? stream.getNamespace() : defaultSchema;
Map<String, Set<Type>> actualSchemaTypes = retrieveDataTypesFromPersistedFiles(streamName, schema);
Map<String, Set<Type>> expectedSchemaTypes = retrieveExpectedDataTypes(stream);
assertEquals(expectedSchemaTypes, actualSchemaTypes);
}
}
private Map<String, Set<Type>> retrieveExpectedDataTypes(AirbyteStream stream) {
Iterable<String> iterableNames = () -> stream.getJsonSchema().get("properties").fieldNames();
Map<String, JsonNode> nameToNode = StreamSupport.stream(iterableNames.spliterator(), false)
.collect(Collectors.toMap(
Function.identity(),
name -> getJsonNode(stream, name)));
return nameToNode
.entrySet()
.stream()
.collect(Collectors.toMap(
Entry::getKey,
entry -> getExpectedSchemaType(entry.getValue())));
}
private JsonNode getJsonNode(AirbyteStream stream, String name) {
JsonNode properties = stream.getJsonSchema().get("properties");
if (properties.size() == 1) {
return properties.get("data");
}
return properties.get(name).get("items");
}
private Set<Type> getExpectedSchemaType(JsonNode fieldDefinition) {
final JsonNode typeProperty = fieldDefinition.get("type");
final JsonNode airbyteTypeProperty = fieldDefinition.get("airbyte_type");
final String airbyteTypePropertyText = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText();
return Arrays.stream(JsonSchemaType.values())
.filter(
value -> value.getJsonSchemaType().equals(typeProperty.asText()) && compareAirbyteTypes(airbyteTypePropertyText, value))
.map(JsonSchemaType::getAvroType)
.collect(Collectors.toSet());
}
private boolean compareAirbyteTypes(String airbyteTypePropertyText, JsonSchemaType value) {
if (airbyteTypePropertyText == null) {
return value.getJsonSchemaAirbyteType() == null;
}
return airbyteTypePropertyText.equals(value.getJsonSchemaAirbyteType());
}
private AirbyteCatalog readCatalogFromFile(final String catalogFilename) throws IOException {
return Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
}
private List<AirbyteMessage> readMessagesFromFile(final String messagesFilename) throws IOException {
return MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
}
protected abstract Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final String streamName, final String namespace) throws Exception;
protected Map<String, Set<Type>> getTypes(Record record) {
List<Field> fieldList = record
.getSchema()
.getFields()
.stream()
.filter(field -> !field.name().startsWith("_airbyte"))
.toList();
if (fieldList.size() == 1) {
return fieldList
.stream()
.collect(
Collectors.toMap(
Field::name,
field -> field.schema().getTypes().stream().map(Schema::getType).filter(type -> !type.equals(Type.NULL))
.collect(Collectors.toSet())));
} else {
return fieldList
.stream()
.collect(
Collectors.toMap(
Field::name,
field -> field.schema().getTypes()
.stream().filter(type -> !type.getType().equals(Type.NULL))
.flatMap(type -> type.getElementType().getTypes().stream()).map(Schema::getType).filter(type -> !type.equals(Type.NULL))
.collect(Collectors.toSet())));
}
}
}

View File

@@ -13,20 +13,25 @@ import io.airbyte.integrations.destination.gcs.parquet.GcsParquetWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
public class GcsParquetDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {
public class GcsParquetDestinationAcceptanceTest extends GcsAvroParquetDestinationAcceptanceTest {
protected GcsParquetDestinationAcceptanceTest() {
super(S3Format.PARQUET);
@@ -78,4 +83,30 @@ public class GcsParquetDestinationAcceptanceTest extends GcsDestinationAcceptanc
return jsonRecords;
}
@Override
protected Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final String streamName, final String namespace) throws Exception {
final List<S3ObjectSummary> objectSummaries = getAllSyncedObjects(streamName, namespace);
final Map<String, Set<Type>> resultDataTypes = new HashMap<>();
for (final S3ObjectSummary objectSummary : objectSummaries) {
final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
final URI uri = new URI(String.format("s3a://%s/%s", object.getBucketName(), object.getKey()));
final var path = new org.apache.hadoop.fs.Path(uri);
final Configuration hadoopConfig = S3ParquetWriter.getHadoopConfig(config);
try (final ParquetReader<Record> parquetReader = ParquetReader.<GenericData.Record>builder(new AvroReadSupport<>(), path)
.withConf(hadoopConfig)
.build()) {
GenericData.Record record;
while ((record = parquetReader.read()) != null) {
Map<String, Set<Type>> actualDataTypes = getTypes(record);
resultDataTypes.putAll(actualDataTypes);
}
}
}
return resultDataTypes;
}
}

View File

@@ -16,5 +16,5 @@ ENV APPLICATION destination-s3
COPY --from=build /airbyte /airbyte
LABEL io.airbyte.version=0.3.6
LABEL io.airbyte.version=0.3.7
LABEL io.airbyte.name=airbyte/destination-s3

View File

@@ -4,6 +4,10 @@
package io.airbyte.integrations.destination.s3.avro;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
/**
@@ -11,32 +15,60 @@ import org.apache.avro.Schema;
*/
public enum JsonSchemaType {
STRING("string", true, Schema.Type.STRING),
NUMBER("number", true, Schema.Type.DOUBLE),
INTEGER("integer", true, Schema.Type.INT),
BOOLEAN("boolean", true, Schema.Type.BOOLEAN),
NULL("null", true, Schema.Type.NULL),
OBJECT("object", false, Schema.Type.RECORD),
ARRAY("array", false, Schema.Type.ARRAY),
COMBINED("combined", false, Schema.Type.UNION);
STRING("string", true, null, Schema.Type.STRING),
NUMBER_INT("number", true, "integer", Schema.Type.INT),
NUMBER_LONG("number", true, "big_integer", Schema.Type.LONG),
NUMBER_FLOAT("number", true, "float", Schema.Type.FLOAT),
NUMBER("number", true, null, Schema.Type.DOUBLE),
INTEGER("integer", true, null, Schema.Type.INT),
BOOLEAN("boolean", true, null, Schema.Type.BOOLEAN),
NULL("null", true, null, Schema.Type.NULL),
OBJECT("object", false, null, Schema.Type.RECORD),
ARRAY("array", false, null, Schema.Type.ARRAY),
COMBINED("combined", false, null, Schema.Type.UNION);
private final String jsonSchemaType;
private final boolean isPrimitive;
private final Schema.Type avroType;
private final String jsonSchemaAirbyteType;
JsonSchemaType(final String jsonSchemaType, final boolean isPrimitive, final Schema.Type avroType) {
JsonSchemaType(final String jsonSchemaType, final boolean isPrimitive, final String jsonSchemaAirbyteType, final Schema.Type avroType) {
this.jsonSchemaType = jsonSchemaType;
this.jsonSchemaAirbyteType = jsonSchemaAirbyteType;
this.isPrimitive = isPrimitive;
this.avroType = avroType;
}
public static JsonSchemaType fromJsonSchemaType(final String value) {
for (final JsonSchemaType type : values()) {
if (value.equals(type.jsonSchemaType)) {
return type;
}
public static JsonSchemaType fromJsonSchemaType(final String jsonSchemaType) {
return fromJsonSchemaType(jsonSchemaType, null);
}
public static JsonSchemaType fromJsonSchemaType(final @Nonnull String jsonSchemaType, final @Nullable String jsonSchemaAirbyteType) {
List<JsonSchemaType> matchSchemaType = null;
// Match by Type + airbyteType
if (jsonSchemaAirbyteType != null) {
matchSchemaType = Arrays.stream(values())
.filter(type -> jsonSchemaType.equals(type.jsonSchemaType))
.filter(type -> jsonSchemaAirbyteType.equals(type.jsonSchemaAirbyteType))
.toList();
}
// Match by Type are no results already
if (matchSchemaType == null || matchSchemaType.isEmpty()) {
matchSchemaType =
Arrays.stream(values()).filter(format -> jsonSchemaType.equals(format.jsonSchemaType) && format.jsonSchemaAirbyteType == null).toList();
}
if (matchSchemaType.isEmpty()) {
throw new IllegalArgumentException(
String.format("Unexpected jsonSchemaType - %s and jsonSchemaAirbyteType - %s", jsonSchemaType, jsonSchemaAirbyteType));
} else if (matchSchemaType.size() > 1) {
throw new RuntimeException(
String.format("Match with more than one json type! Matched types : %s, Inputs jsonSchemaType : %s, jsonSchemaAirbyteType : %s",
matchSchemaType, jsonSchemaType, jsonSchemaAirbyteType));
} else {
return matchSchemaType.get(0);
}
throw new IllegalArgumentException("Unexpected json schema type: " + value);
}
public String getJsonSchemaType() {
@@ -56,4 +88,8 @@ public enum JsonSchemaType {
return jsonSchemaType;
}
public String getJsonSchemaAirbyteType() {
return jsonSchemaAirbyteType;
}
}

View File

@@ -36,6 +36,8 @@ import tech.allegro.schema.json2avro.converter.AdditionalPropertyField;
*/
public class JsonToAvroSchemaConverter {
private static final String TYPE = "type";
private static final String AIRBYTE_TYPE = "airbyte_type";
private static final Schema UUID_SCHEMA = LogicalTypes.uuid()
.addToSchema(Schema.create(Schema.Type.STRING));
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
@@ -60,7 +62,9 @@ public class JsonToAvroSchemaConverter {
return Collections.singletonList(JsonSchemaType.COMBINED);
}
final JsonNode typeProperty = fieldDefinition.get("type");
final JsonNode typeProperty = fieldDefinition.get(TYPE);
final JsonNode airbyteTypeProperty = fieldDefinition.get(AIRBYTE_TYPE);
final String airbyteType = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText();
if (typeProperty == null || typeProperty.isNull()) {
LOGGER.warn("Field \"{}\" has no type specification. It will default to string", fieldName);
return Collections.singletonList(JsonSchemaType.STRING);
@@ -73,7 +77,7 @@ public class JsonToAvroSchemaConverter {
}
if (typeProperty.isTextual()) {
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText()));
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText(), airbyteType));
}
LOGGER.warn("Field \"{}\" has unexpected type {}. It will default to string.", fieldName, typeProperty);
@@ -214,7 +218,7 @@ public class JsonToAvroSchemaConverter {
final Schema fieldSchema;
switch (fieldType) {
case NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case INTEGER, NUMBER, NUMBER_INT, NUMBER_LONG, NUMBER_FLOAT, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case STRING -> {
if (fieldDefinition.has("format")) {
final String format = fieldDefinition.get("format").asText();

View File

@@ -13,16 +13,19 @@ import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
public class S3AvroDestinationAcceptanceTest extends S3DestinationAcceptanceTest {
public class S3AvroDestinationAcceptanceTest extends S3AvroParquetDestinationAcceptanceTest {
protected S3AvroDestinationAcceptanceTest() {
super(S3Format.AVRO);
@@ -73,4 +76,25 @@ public class S3AvroDestinationAcceptanceTest extends S3DestinationAcceptanceTest
return new S3AvroParquetTestDataComparator();
}
@Override
protected Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final String streamName, final String namespace) throws Exception {
final List<S3ObjectSummary> objectSummaries = getAllSyncedObjects(streamName, namespace);
Map<String, Set<Type>> resultDataTypes = new HashMap<>();
for (final S3ObjectSummary objectSummary : objectSummaries) {
final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
try (final DataFileReader<Record> dataFileReader = new DataFileReader<>(
new SeekableByteArrayInput(object.getObjectContent().readAllBytes()),
new GenericDatumReader<>())) {
while (dataFileReader.hasNext()) {
final GenericData.Record record = dataFileReader.next();
Map<String, Set<Type>> actualDataTypes = getTypes(record);
resultDataTypes.putAll(actualDataTypes);
}
}
}
return resultDataTypes;
}
}

View File

@@ -0,0 +1,145 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.s3;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.destination.s3.avro.JsonSchemaType;
import io.airbyte.integrations.standardtest.destination.NumberDataTypeTestArgumentProvider;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData.Record;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
public abstract class S3AvroParquetDestinationAcceptanceTest extends S3DestinationAcceptanceTest {
protected S3AvroParquetDestinationAcceptanceTest(S3Format s3Format) {
super(s3Format);
}
@ParameterizedTest
@ArgumentsSource(NumberDataTypeTestArgumentProvider.class)
public void testNumberDataType(String catalogFileName, String messagesFileName) throws Exception {
final AirbyteCatalog catalog = readCatalogFromFile(catalogFileName);
final List<AirbyteMessage> messages = readMessagesFromFile(messagesFileName);
final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
for (final AirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getName();
final String schema = stream.getNamespace() != null ? stream.getNamespace() : defaultSchema;
Map<String, Set<Type>> actualSchemaTypes = retrieveDataTypesFromPersistedFiles(streamName, schema);
Map<String, Set<Type>> expectedSchemaTypes = retrieveExpectedDataTypes(stream);
assertEquals(expectedSchemaTypes, actualSchemaTypes);
}
}
private Map<String, Set<Type>> retrieveExpectedDataTypes(AirbyteStream stream) {
Iterable<String> iterableNames = () -> stream.getJsonSchema().get("properties").fieldNames();
Map<String, JsonNode> nameToNode = StreamSupport.stream(iterableNames.spliterator(), false)
.collect(Collectors.toMap(
Function.identity(),
name -> getJsonNode(stream, name)));
return nameToNode
.entrySet()
.stream()
.collect(Collectors.toMap(
Entry::getKey,
entry -> getExpectedSchemaType(entry.getValue())));
}
private JsonNode getJsonNode(AirbyteStream stream, String name) {
JsonNode properties = stream.getJsonSchema().get("properties");
if (properties.size() == 1) {
return properties.get("data");
}
return properties.get(name).get("items");
}
private Set<Type> getExpectedSchemaType(JsonNode fieldDefinition) {
final JsonNode typeProperty = fieldDefinition.get("type");
final JsonNode airbyteTypeProperty = fieldDefinition.get("airbyte_type");
final String airbyteTypePropertyText = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText();
return Arrays.stream(JsonSchemaType.values())
.filter(
value -> value.getJsonSchemaType().equals(typeProperty.asText()) && compareAirbyteTypes(airbyteTypePropertyText, value))
.map(JsonSchemaType::getAvroType)
.collect(Collectors.toSet());
}
private boolean compareAirbyteTypes(String airbyteTypePropertyText, JsonSchemaType value) {
if (airbyteTypePropertyText == null) {
return value.getJsonSchemaAirbyteType() == null;
}
return airbyteTypePropertyText.equals(value.getJsonSchemaAirbyteType());
}
private AirbyteCatalog readCatalogFromFile(final String catalogFilename) throws IOException {
return Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
}
private List<AirbyteMessage> readMessagesFromFile(final String messagesFilename) throws IOException {
return MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
}
protected abstract Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final String streamName, final String namespace) throws Exception;
protected Map<String, Set<Type>> getTypes(Record record) {
List<Field> fieldList = record
.getSchema()
.getFields()
.stream()
.filter(field -> !field.name().startsWith("_airbyte"))
.toList();
if (fieldList.size() == 1) {
return fieldList
.stream()
.collect(
Collectors.toMap(
Field::name,
field -> field.schema().getTypes().stream().map(Schema::getType).filter(type -> !type.equals(Type.NULL))
.collect(Collectors.toSet())));
} else {
return fieldList
.stream()
.collect(
Collectors.toMap(
Field::name,
field -> field.schema().getTypes()
.stream().filter(type -> !type.getType().equals(Type.NULL))
.flatMap(type -> type.getElementType().getTypes().stream()).map(Schema::getType).filter(type -> !type.equals(Type.NULL))
.collect(Collectors.toSet())));
}
}
}

View File

@@ -17,15 +17,19 @@ import io.airbyte.integrations.standardtest.destination.comparator.TestDataCompa
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
public class S3ParquetDestinationAcceptanceTest extends S3DestinationAcceptanceTest {
public class S3ParquetDestinationAcceptanceTest extends S3AvroParquetDestinationAcceptanceTest {
protected S3ParquetDestinationAcceptanceTest() {
super(S3Format.PARQUET);
@@ -77,4 +81,30 @@ public class S3ParquetDestinationAcceptanceTest extends S3DestinationAcceptanceT
return new S3AvroParquetTestDataComparator();
}
@Override
protected Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final String streamName, final String namespace) throws Exception {
final List<S3ObjectSummary> objectSummaries = getAllSyncedObjects(streamName, namespace);
final Map<String, Set<Type>> resultDataTypes = new HashMap<>();
for (final S3ObjectSummary objectSummary : objectSummaries) {
final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
final URI uri = new URI(String.format("s3a://%s/%s", object.getBucketName(), object.getKey()));
final var path = new org.apache.hadoop.fs.Path(uri);
final Configuration hadoopConfig = S3ParquetWriter.getHadoopConfig(config);
try (final ParquetReader<Record> parquetReader = ParquetReader.<GenericData.Record>builder(new AvroReadSupport<>(), path)
.withConf(hadoopConfig)
.build()) {
GenericData.Record record;
while ((record = parquetReader.read()) != null) {
Map<String, Set<Type>> actualDataTypes = getTypes(record);
resultDataTypes.putAll(actualDataTypes);
}
}
}
return resultDataTypes;
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.s3.avro;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.stream.Stream;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
public class JsonSchemaTypeTest {
@ParameterizedTest
@ArgumentsSource(JsonSchemaTypeProvider.class)
public void testFromJsonSchemaType(String type, String airbyteType, JsonSchemaType expectedJsonSchemaType) {
assertEquals(
expectedJsonSchemaType,
JsonSchemaType.fromJsonSchemaType(type, airbyteType));
}
public static class JsonSchemaTypeProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
return Stream.of(
Arguments.of("number", "integer", JsonSchemaType.NUMBER_INT),
Arguments.of("number", "big_integer", JsonSchemaType.NUMBER_LONG),
Arguments.of("number", "float", JsonSchemaType.NUMBER_FLOAT),
Arguments.of("number", null, JsonSchemaType.NUMBER),
Arguments.of("string", null, JsonSchemaType.STRING),
Arguments.of("integer", null, JsonSchemaType.INTEGER),
Arguments.of("boolean", null, JsonSchemaType.BOOLEAN),
Arguments.of("null", null, JsonSchemaType.NULL),
Arguments.of("object", null, JsonSchemaType.OBJECT),
Arguments.of("array", null, JsonSchemaType.ARRAY),
Arguments.of("combined", null, JsonSchemaType.COMBINED));
}
}
}

View File

@@ -9,16 +9,26 @@
{
"fieldName": "integer_field",
"jsonFieldSchema": {
"type": "integer"
"type": "number",
"airbyte_type": "integer"
},
"avroFieldType": ["null", "int"]
},
{
"fieldName": "number_field",
"fieldName": "big_integer_field",
"jsonFieldSchema": {
"type": "number"
"type": "number",
"airbyte_type": "big_integer"
},
"avroFieldType": ["null", "double"]
"avroFieldType": ["null", "long"]
},
{
"fieldName": "float_field",
"jsonFieldSchema": {
"type": "number",
"airbyte_type": "float"
},
"avroFieldType": ["null", "float"]
},
{
"fieldName": "null_field",
@@ -60,6 +70,10 @@
},
{
"type": "number"
},
{
"type": "number",
"airbyte_type": "big_integer"
}
]
},
@@ -67,7 +81,7 @@
"null",
{
"type": "array",
"items": ["null", "string", "double"]
"items": ["null", "string", "double", "long"]
}
]
},
@@ -79,6 +93,10 @@
"id": {
"type": "integer"
},
"long_id": {
"type": "number",
"airbyte_type": "big_integer"
},
"node_id": {
"type": ["null", "string"]
}
@@ -95,6 +113,11 @@
"type": ["null", "int"],
"default": null
},
{
"name": "long_id",
"type": ["null", "long"],
"default": null
},
{
"name": "node_id",
"type": ["null", "string"],
@@ -146,23 +169,35 @@
{
"fieldName": "any_of_field",
"jsonFieldSchema": {
"anyOf": [{ "type": "string" }, { "type": "integer" }]
"anyOf": [
{ "type": "string" },
{ "type": "integer" },
{ "type": "number" }
]
},
"avroFieldType": ["null", "string", "int"]
"avroFieldType": ["null", "string", "int", "double"]
},
{
"fieldName": "all_of_field",
"jsonFieldSchema": {
"allOf": [{ "type": "string" }, { "type": "integer" }]
"allOf": [
{ "type": "string" },
{ "type": "integer" },
{ "type": "number", "airbyte_type": "float" }
]
},
"avroFieldType": ["null", "string", "int"]
"avroFieldType": ["null", "string", "int", "float"]
},
{
"fieldName": "one_of_field",
"jsonFieldSchema": {
"oneOf": [{ "type": "string" }, { "type": "integer" }]
"oneOf": [
{ "type": "string" },
{ "type": "integer" },
{ "type": "number", "airbyte_type": "big_integer" }
]
},
"avroFieldType": ["null", "string", "int"]
"avroFieldType": ["null", "string", "int", "long"]
},
{
"fieldName": "logical_type_date_time",

View File

@@ -235,6 +235,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.7 | 2022-06-14 | [\#13483](https://github.com/airbytehq/airbyte/pull/13483) | Added support for int, long, float data types to Avro/Parquet formats. |
| 0.2.6 | 2022-05-17 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.2.5 | 2022-05-04 | [\#12578](https://github.com/airbytehq/airbyte/pull/12578) | In JSON to Avro conversion, log JSON field values that do not follow Avro schema for debugging. |
| 0.2.4 | 2022-04-22 | [\#12167](https://github.com/airbytehq/airbyte/pull/12167) | Add gzip compression option for CSV and JSONL formats. |

View File

@@ -315,6 +315,7 @@ In order for everything to work correctly, it is also necessary that the user wh
| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- |:---------------------------------------------------------------------------------------------------------------------------|
| 0.3.7 | 2022-06-14 | [\#13483](https://github.com/airbytehq/airbyte/pull/13483) | Added support for int, long, float data types to Avro/Parquet formats. |
| 0.3.6 | 2022-05-19 | [\#13043](https://github.com/airbytehq/airbyte/pull/13043) | Destination S3: Remove configurable part size. |
| 0.3.5 | 2022-05-12 | [\#12797](https://github.com/airbytehq/airbyte/pull/12797) | Update spec to replace markdown. |
| 0.3.4 | 2022-05-04 | [\#12578](https://github.com/airbytehq/airbyte/pull/12578) | In JSON to Avro conversion, log JSON field values that do not follow Avro schema for debugging. |