1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🎉 Source Postgres: support all Postgres 14 types (#8726)

* Add skeleton to support all postgres types

* Consolidate type tests

* Fix corner cases

* Bump postgres version

* Add tests for time and timetz

* Format code

* Revert date to timestamp

* Update comment

* Fix unit tests

* 🐛 Jdbc sources: switch from "string" to "array" schema type for columns with JDBCType.ARRAY (#8749)

* support array for jdbc sources

* fixed PR comments, added test cases

* added more elements for test case

* Fix test case

* add array test case for JdbcSourceOperations

Co-authored-by: Liren Tu <tuliren.git@outlook.com>

* Revert changes to support special number values

Postgres source cannot handle these special values yet
See https://github.com/airbytehq/airbyte/issues/8902

* Revert infinity and nan assertion in unit tests

This reverts commit 3bee7d19ea.

* Update documentation

* Bump postgres source version in seed

Co-authored-by: Yurii Bidiuk <35812734+yurii-bidiuk@users.noreply.github.com>
This commit is contained in:
LiRen Tu
2021-12-19 23:59:43 -08:00
committed by GitHub
parent 6ee29b5b82
commit ff4b83bb1f
15 changed files with 582 additions and 377 deletions

View File

@@ -530,7 +530,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.17
dockerImageTag: 0.4.0
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database

View File

@@ -5281,7 +5281,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.3.17"
- dockerImage: "airbyte/source-postgres:0.4.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/postgres"
connectionSpecification:

View File

@@ -5,6 +5,8 @@
package io.airbyte.db.jdbc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
@@ -49,6 +51,15 @@ public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implement
return jsonNode;
}
protected void putArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
ArrayNode arrayNode = new ObjectMapper().createArrayNode();
ResultSet arrayResultSet = resultSet.getArray(index).getResultSet();
while (arrayResultSet.next()) {
arrayNode.add(arrayResultSet.getString(2));
}
node.set(columnName, arrayNode);
}
protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getBoolean(index));
}

View File

@@ -27,7 +27,7 @@ public class JdbcSourceOperations extends AbstractJdbcCompatibleSourceOperations
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceOperations.class);
private JDBCType safeGetJdbcType(final int columnTypeInt) {
protected JDBCType safeGetJdbcType(final int columnTypeInt) {
try {
return JDBCType.valueOf(columnTypeInt);
} catch (final Exception e) {
@@ -55,6 +55,7 @@ public class JdbcSourceOperations extends AbstractJdbcCompatibleSourceOperations
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}
@@ -115,6 +116,7 @@ public class JdbcSourceOperations extends AbstractJdbcCompatibleSourceOperations
case TIME -> JsonSchemaPrimitive.STRING;
case TIMESTAMP -> JsonSchemaPrimitive.STRING;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING_BINARY;
case ARRAY -> JsonSchemaPrimitive.ARRAY;
// since column types aren't necessarily meaningful to Airbyte, liberally convert all unrecgonised
// types to String
default -> JsonSchemaPrimitive.STRING;

View File

@@ -7,6 +7,8 @@ package io.airbyte.db.jdbc;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
@@ -115,7 +117,7 @@ public class TestJdbcUtils {
try (final Connection connection = dataSource.getConnection()) {
createTableWithAllTypes(connection);
insertRecordOfEachType(connection);
assertExpectedOutputValues(connection);
assertExpectedOutputValues(connection, jsonFieldExpectedValues());
assertExpectedOutputTypes(connection);
}
}
@@ -148,7 +150,7 @@ public class TestJdbcUtils {
ps.execute();
assertExpectedOutputValues(connection);
assertExpectedOutputValues(connection, expectedValues());
assertExpectedOutputTypes(connection);
}
}
@@ -172,7 +174,9 @@ public class TestJdbcUtils {
+ "date DATE,"
+ "time TIME,"
+ "timestamp TIMESTAMP,"
+ "binary1 bytea"
+ "binary1 bytea,"
+ "text_array _text,"
+ "int_array int[]"
+ ");");
}
@@ -194,7 +198,9 @@ public class TestJdbcUtils {
+ "date,"
+ "time,"
+ "timestamp,"
+ "binary1"
+ "binary1,"
+ "text_array,"
+ "int_array"
+ ") VALUES("
+ "1::bit(1),"
+ "true,"
@@ -211,36 +217,18 @@ public class TestJdbcUtils {
+ "'2020-11-01',"
+ "'05:00',"
+ "'2001-09-29 03:00',"
+ "decode('61616161', 'hex')"
+ "decode('61616161', 'hex'),"
+ "'{one,two,three}',"
+ "'{1,2,3}'"
+ ");");
}
private static void assertExpectedOutputValues(final Connection connection) throws SQLException {
private static void assertExpectedOutputValues(final Connection connection, final ObjectNode expected) throws SQLException {
final ResultSet resultSet = connection.createStatement().executeQuery("SELECT * FROM data;");
resultSet.next();
final JsonNode actual = sourceOperations.rowToJson(resultSet);
final ObjectNode expected = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
expected.put("bit", true);
expected.put("boolean", true);
expected.put("smallint", (short) 1);
expected.put("int", 1);
expected.put("bigint", (long) 1);
expected.put("float", (double) 1.0);
expected.put("double", (double) 1.0);
expected.put("real", (float) 1.0);
expected.put("numeric", new BigDecimal(1));
expected.put("decimal", new BigDecimal(1));
expected.put("char", "a");
expected.put("varchar", "a");
// todo (cgardens) we should parse this to a date string
expected.put("date", "2020-11-01T00:00:00Z");
// todo (cgardens) we should parse this to a time string
expected.put("time", "1970-01-01T05:00:00Z");
expected.put("timestamp", "2001-09-29T03:00:00Z");
expected.put("binary1", "aaaa".getBytes(Charsets.UTF_8));
// field-wise comparison to make debugging easier.
MoreStreams.toStream(expected.fields()).forEach(e -> assertEquals(e.getValue(), actual.get(e.getKey()), "key: " + e.getKey()));
assertEquals(expected, actual);
@@ -273,9 +261,51 @@ public class TestJdbcUtils {
.put("time", JsonSchemaPrimitive.STRING)
.put("timestamp", JsonSchemaPrimitive.STRING)
.put("binary1", JsonSchemaPrimitive.STRING_BINARY)
.put("text_array", JsonSchemaPrimitive.ARRAY)
.put("int_array", JsonSchemaPrimitive.ARRAY)
.build();
assertEquals(actual, expected);
}
private ObjectNode jsonFieldExpectedValues() {
final ObjectNode expected = expectedValues();
ArrayNode arrayNode = new ObjectMapper().createArrayNode();
arrayNode.add("one");
arrayNode.add("two");
arrayNode.add("three");
expected.set("text_array", arrayNode);
ArrayNode arrayNode2 = new ObjectMapper().createArrayNode();
arrayNode2.add("1");
arrayNode2.add("2");
arrayNode2.add("3");
expected.set("int_array", arrayNode2);
return expected;
}
private ObjectNode expectedValues() {
final ObjectNode expected = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
expected.put("bit", true);
expected.put("boolean", true);
expected.put("smallint", (short) 1);
expected.put("int", 1);
expected.put("bigint", (long) 1);
expected.put("float", (double) 1.0);
expected.put("double", (double) 1.0);
expected.put("real", (float) 1.0);
expected.put("numeric", new BigDecimal(1));
expected.put("decimal", new BigDecimal(1));
expected.put("char", "a");
expected.put("varchar", "a");
// todo (cgardens) we should parse this to a date string
expected.put("date", "2020-11-01T00:00:00Z");
// todo (cgardens) we should parse this to a time string
expected.put("time", "1970-01-01T05:00:00Z");
expected.put("timestamp", "2001-09-29T03:00:00Z");
expected.put("binary1", "aaaa".getBytes(Charsets.UTF_8));
return expected;
}
}

View File

@@ -209,7 +209,8 @@ public class TestDataHolder {
}
public String getNameWithTestPrefix() {
return nameSpace + "_" + testNumber + "_" + sourceType;
// source type may include space (e.g. "character varying")
return nameSpace + "_" + testNumber + "_" + sourceType.replaceAll("\\s", "_");
}
public String getCreateSqlQuery() {

View File

@@ -194,9 +194,10 @@ public class JsonToAvroSchemaConverter {
}
case ARRAY -> {
final JsonNode items = fieldDefinition.get("items");
Preconditions.checkNotNull(items, "Array field %s misses the items property.", fieldName);
if (items.isObject()) {
if (items == null) {
LOGGER.warn("Source connector provided schema for ARRAY with missed \"items\", will assume that it's a String type");
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else if (items.isObject()) {
fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items));
} else if (items.isArray()) {
final List<Schema> arrayElementTypes = getSchemasFromTypes(fieldName, (ArrayNode) items);

View File

@@ -774,5 +774,48 @@
"time_field": 44581541000,
"_airbyte_additional_properties": null
}
},
{
"schemaName": "array_without_items_in_schema",
"namespace": "namespace16",
"appendAirbyteFields": false,
"jsonSchema": {
"type": "object",
"properties": {
"identifier": {
"type": "array"
}
}
},
"jsonObject": {
"identifier": ["151", 152, true, { "id": 153 }]
},
"avroSchema": {
"type": "record",
"name": "array_without_items_in_schema",
"namespace": "namespace16",
"fields": [
{
"name": "identifier",
"type": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
],
"default": null
},
{
"name": "_airbyte_additional_properties",
"type": ["null", { "type": "map", "values": "string" }],
"default": null
}
]
},
"avroObject": {
"identifier": ["151", "152", "true", "{\"id\":153}"],
"_airbyte_additional_properties": null
}
}
]

View File

@@ -199,5 +199,18 @@
{ "type": "long", "logicalType": "time-micros" },
"string"
]
},
{
"fieldName": "array_field_without_items_type",
"jsonFieldSchema": {
"type": "array"
},
"avroFieldType": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
]
}
]

View File

@@ -83,9 +83,9 @@ public class CockroachDbSourceDatatypeTest extends AbstractSourceDatabaseTypeTes
TestDataHolder.builder()
.sourceType("array")
.fullSourceDataType("STRING[]")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("ARRAY['sky', 'road', 'car']", "null")
.addExpectedValues("{sky,road,car}", null)
.addExpectedValues("[\"sky\",\"road\",\"car\"]", null)
.build());
addDataTypeTestData(
@@ -152,7 +152,7 @@ public class CockroachDbSourceDatatypeTest extends AbstractSourceDatabaseTypeTes
.fullSourceDataType("bytea[]")
.airbyteType(JsonSchemaPrimitive.OBJECT)
.addInsertValues("ARRAY['☃'::bytes, 'ї'::bytes]")
.addExpectedValues("{\"\\\\xe29883\",\"\\\\xd197\"}")
.addExpectedValues("[\"\\\\xe29883\",\"\\\\xd197\"]")
.build());
addDataTypeTestData(
@@ -352,9 +352,18 @@ public class CockroachDbSourceDatatypeTest extends AbstractSourceDatabaseTypeTes
TestDataHolder.builder()
.sourceType("text")
.fullSourceDataType("text[]")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("'{10000, 10000, 10000, 10000}'", "null")
.addExpectedValues("{10000,10000,10000,10000}", null)
.addExpectedValues("[\"10000\",\"10000\",\"10000\",\"10000\"]", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("int")
.fullSourceDataType("int[]")
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("'{10000, 10000, 10000, 10000}'", "null")
.addExpectedValues("[\"10000\",\"10000\",\"10000\",\"10000\"]", null)
.build());
}

View File

@@ -4,7 +4,8 @@ WORKDIR /airbyte
ENV APPLICATION source-postgres
ADD build/distributions/${APPLICATION}*.tar /airbyte
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.3.17
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/source-postgres

View File

@@ -4,19 +4,33 @@
package io.airbyte.integrations.source.postgres;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.math.BigDecimal;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PostgresSourceOperations extends JdbcSourceOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceOperations.class);
@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
@@ -25,16 +39,23 @@ public class PostgresSourceOperations extends JdbcSourceOperations {
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
for (int i = 1; i <= columnCount; i++) {
final String columnType = metadata.getColumnTypeName(i);
// attempt to access the column. this allows us to know if it is null before we do type-specific
// parsing. if it is null, we can move on. while awkward, this seems to be the agreed upon way of
// checking for null values with jdbc.
if (metadata.getColumnTypeName(i).equalsIgnoreCase("money")) {
if (columnType.equalsIgnoreCase("money")) {
// when a column is of type MONEY, getObject will throw exception
// this is a bug that will not be fixed:
// https://github.com/pgjdbc/pgjdbc/issues/425
// https://github.com/pgjdbc/pgjdbc/issues/1835
queryContext.getString(i);
} else if (columnType.equalsIgnoreCase("bit")) {
// getObject will fail as it tries to parse the value as boolean
queryContext.getString(i);
} else if (columnType.equalsIgnoreCase("numeric") || columnType.equalsIgnoreCase("decimal")) {
// getObject will fail when the value is 'infinity'
queryContext.getDouble(i);
} else {
queryContext.getObject(i);
}
@@ -49,6 +70,92 @@ public class PostgresSourceOperations extends JdbcSourceOperations {
return jsonNode;
}
@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();
final String columnName = metadata.getColumnName(colIndex);
final String columnTypeName = metadata.getColumnTypeName(colIndex);
final JDBCType columnType = safeGetJdbcType(metadata.getColumnType(colIndex));
if (columnTypeName.equalsIgnoreCase("bool") || columnTypeName.equalsIgnoreCase("boolean")) {
putBoolean(json, columnName, resultSet, colIndex);
} else if (columnTypeName.equalsIgnoreCase("bytea")) {
putString(json, columnName, resultSet, colIndex);
} else if (columnTypeName.equalsIgnoreCase("time") || columnTypeName.equalsIgnoreCase("timetz")) {
putString(json, columnName, resultSet, colIndex);
} else {
// https://www.postgresql.org/docs/14/datatype.html
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex);
case INTEGER -> putInteger(json, columnName, resultSet, colIndex);
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex);
case REAL -> putFloat(json, columnName, resultSet, colIndex);
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex);
// BIT is a bit string in Postgres, e.g. '0100'
case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}
}
@Override
public JDBCType getFieldType(final JsonNode field) {
try {
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText();
// Postgres boolean is mapped to JDBCType.BIT, but should be BOOLEAN
if (typeName.equalsIgnoreCase("bool") || typeName.equalsIgnoreCase("boolean")) {
return JDBCType.BOOLEAN;
} else if (typeName.equalsIgnoreCase("bytea")) {
// BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a").
// It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR.
// https://www.postgresql.org/docs/14/datatype-binary.html
return JDBCType.VARCHAR;
}
return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (final IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
field.get(INTERNAL_COLUMN_NAME),
field.get(INTERNAL_SCHEMA_NAME),
field.get(INTERNAL_TABLE_NAME),
field.get(INTERNAL_COLUMN_TYPE)));
return JDBCType.VARCHAR;
}
}
@Override
public JsonSchemaPrimitive getJsonType(final JDBCType jdbcType) {
return switch (jdbcType) {
case BOOLEAN -> JsonSchemaPrimitive.BOOLEAN;
case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaPrimitive.NUMBER;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING_BINARY;
case ARRAY -> JsonSchemaPrimitive.ARRAY;
default -> JsonSchemaPrimitive.STRING;
};
}
protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t"));
}
protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
node.put(columnName, bigDecimal);
} else {
// Special values (Infinity, -Infinity, and NaN) is default to null for now.
// https://github.com/airbytehq/airbyte/issues/8902
node.put(columnName, (BigDecimal) null);
}
}
@Override
protected void putDouble(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
if (resultSet.getMetaData().getColumnTypeName(index).equals("money")) {

View File

@@ -430,7 +430,7 @@ public class CdcPostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTes
TestDataHolder.builder()
.sourceType("text")
.fullSourceDataType("text[]")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("'{10000, 10000, 10000, 10000}'", "null")
.addExpectedValues("[\"10000\",\"10000\",\"10000\",\"10000\"]", null)
.build());

View File

@@ -14,21 +14,21 @@ import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.sql.SQLException;
import java.util.Set;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
public class PostgresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
private PostgreSQLContainer<?> container;
private JsonNode config;
private static final Logger LOGGER = LoggerFactory
.getLogger(PostresSourceDatatypeTest.class);
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceDatatypeTest.class);
@Override
protected Database setupDatabase() throws SQLException {
container = new PostgreSQLContainer<>("postgres:13-alpine");
container = new PostgreSQLContainer<>("postgres:14-alpine");
container.start();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "Standard")
@@ -42,7 +42,6 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
.put("ssl", false)
.put("replication_method", replicationMethod)
.build());
LOGGER.warn("PPP:config:" + config);
final Database database = Databases.createDatabase(
config.get("username").asText(),
@@ -59,13 +58,14 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
ctx.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');");
ctx.execute("CREATE TYPE inventory_item AS (name text, supplier_id integer, price numeric);");
// In one of the test case, we have some money values with currency symbol. Postgres can only
// understand
// those money values if the symbol corresponds to the monetary locale setting. For example, if the
// locale
// is 'en_GB', '£100' is valid, but '$100' is not. So setting the monetary locate is necessary here
// to
// make sure the unit test can pass, no matter what the locale the runner VM has.
// understand those money values if the symbol corresponds to the monetary locale setting. For
// example,
// if the locale is 'en_GB', '£100' is valid, but '$100' is not. So setting the monetary locate is
// necessary here to make sure the unit test can pass, no matter what the locale the runner VM has.
ctx.execute("SET lc_monetary TO 'en_US.utf8';");
// Set up a fixed timezone here so that timetz and timestamptz always have the same time zone
// wherever the tests are running on.
ctx.execute("SET TIMEZONE TO 'America/Los_Angeles'");
return null;
});
@@ -92,6 +92,13 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
container.close();
}
@Override
public boolean testCatalog() {
return true;
}
// Test cases are sorted alphabetically based on the source type
// See https://www.postgresql.org/docs/14/datatype.html
@Override
protected void initTests() {
addDataTypeTestData(
@@ -112,105 +119,101 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("serial")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("1", "2147483647", "0", "-2147483647")
.addExpectedValues("1", "2147483647", "0", "-2147483647")
.sourceType("bit")
.fullSourceDataType("BIT(1)")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("B'0'")
.addExpectedValues("0")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("smallserial")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("1", "32767", "0", "-32767")
.addExpectedValues("1", "32767", "0", "-32767")
.sourceType("bit")
.fullSourceDataType("BIT(3)")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("B'101'")
.addExpectedValues("101")
.build());
// BUG https://github.com/airbytehq/airbyte/issues/3932
// BIT type is currently parsed as a Boolean which is incorrect
// addDataTypeTestData(
// TestDataHolder.builder()
// .sourceType("bit")
// .fullSourceDataType("BIT(3)")
// .airbyteType(JsonSchemaPrimitive.NUMBER)
// .addInsertValues("B'101'")
// //.addExpectedValues("101")
// - .build());
for (final String type : Set.of("bit varying", "varbit")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("bit_varying")
.fullSourceDataType("BIT VARYING(5)")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("B'101'", "null")
.addExpectedValues("101", null)
.build());
}
for (final String type : Set.of("boolean", "bool")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.BOOLEAN)
.addInsertValues("true", "'yes'", "'1'", "false", "'no'", "'0'", "null")
.addExpectedValues("true", "true", "true", "false", "false", "false", null)
.build());
}
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("bit_varying")
.fullSourceDataType("BIT VARYING(5)")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("B'101'", "null")
.addExpectedValues("101", null)
.sourceType("box")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null")
.addExpectedValues("(15,18),(3,7)", "(0,0),(0,0)", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("boolean")
.airbyteType(JsonSchemaPrimitive.BOOLEAN)
.addInsertValues("true", "'yes'", "'1'", "false", "'no'", "'0'", "null")
.addExpectedValues("true", "true", "true", "false", "false", "false", null)
.build());
// BUG: The represented value is encoded by Base64
// https://github.com/airbytehq/airbyte/issues/7905
// bytea stores variable length binary string
// https://www.postgresql.org/docs/14/datatype-binary.html
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("bytea")
.airbyteType(JsonSchemaPrimitive.OBJECT)
.addInsertValues("decode('1234', 'hex')")
.addExpectedValues("EjQ=")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("character")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'a'", "'*'", "null")
.addExpectedValues("a", "*", null)
.addInsertValues("null", "decode('1234', 'hex')", "'1234'", "'abcd'", "'\\xabcd'")
.addExpectedValues(null, "\\x1234", "\\x31323334", "\\x61626364", "\\xabcd")
.build());
for (final String type : Set.of("character", "char")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'a'", "'*'", "null")
.addExpectedValues("a", "*", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.fullSourceDataType(type + "(8)")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'{asb123}'", "'{asb12}'")
.addExpectedValues("{asb123}", "{asb12} ")
.build());
}
for (final String type : Set.of("varchar", "text")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'a'", "'abc'", "'Миші йдуть на південь, не питай чому;'", "'櫻花分店'",
"''", "null", "'\\xF0\\x9F\\x9A\\x80'")
.addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "",
null, "\\xF0\\x9F\\x9A\\x80")
.build());
}
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("character")
.fullSourceDataType("character(8)")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'{asb123}'", "'{asb12}'")
.addExpectedValues("{asb123}", "{asb12} ")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("character_varying")
.fullSourceDataType("character varying(8)")
.sourceType("varchar")
.fullSourceDataType("character varying(10)")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'{asb123}'", "'{asb12}'")
.addExpectedValues("{asb123}", "{asb12}")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("varchar")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'a'", "'abc'", "'Миші йдуть на південь, не питай чому;'", "'櫻花分店'",
"''", "null", "'\\xF0\\x9F\\x9A\\x80'")
.addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "",
null, "\\xF0\\x9F\\x9A\\x80")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("varchar")
.fullSourceDataType("character(12)")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'a'", "'abc'", "'Миші йдуть;'", "'櫻花分店'",
"''", "null")
.addExpectedValues("a ", "abc ", "Миші йдуть; ", "櫻花分店 ",
" ", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("cidr")
@@ -221,48 +224,37 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
"128.1.0.0/16", "2001:4f8:3:ba::/64")
.build());
// JdbcUtils-> DATE_FORMAT is set as ""yyyy-MM-dd'T'HH:mm:ss'Z'"" so it doesnt suppose to handle BC
// dates
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("circle")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'(5,7),10'", "'(0,0),0'", "'(-10,-4),10'", "null")
.addExpectedValues("<(5,7),10>", "<(0,0),0>", "<(-10,-4),10>", null)
.build());
// DataTypeUtils#DATE_FORMAT is set as "yyyy-MM-dd'T'HH:mm:ss'Z'", so currently the Postgres source
// returns a date value as a datetime. It cannot handle BC dates (e.g. 199-10-10 BC).
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("date")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'1999-01-08'", "null") // "'199-10-10 BC'"
.addExpectedValues("1999-01-08T00:00:00Z", null) // , "199-10-10 BC")
.addInsertValues("'1999-01-08'", "null")
.addExpectedValues("1999-01-08T00:00:00Z", null)
.build());
// Values "'-Infinity'", "'Infinity'", "'Nan'" will not be parsed due to:
// JdbcUtils -> setJsonField contains:
// case FLOAT, DOUBLE -> o.put(columnName, nullIfInvalid(() -> r.getDouble(i), Double::isFinite));
// https://github.com/airbytehq/airbyte/issues/7871
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("float8")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("'123'", "'1234567890.1234567'", "null")
.addExpectedValues("123.0", "1.2345678901234567E9", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("double_precision")
.fullSourceDataType("double precision")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("'123'", "'1234567890.1234567'", "null")
.addExpectedValues("123.0", "1.2345678901234567E9", null)
.build());
// Values "'-Infinity'", "'Infinity'", "'Nan'" will not be parsed due to:
// JdbcUtils -> setJsonField contains:
// case FLOAT, DOUBLE -> o.put(columnName, nullIfInvalid(() -> r.getDouble(i), Double::isFinite));
// https://github.com/airbytehq/airbyte/issues/7871
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("float")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("'123'", "'1234567890.1234567'", "null")
.addExpectedValues("123.0", "1.2345678901234567E9", null)
.build());
for (final String type : Set.of("double precision", "float", "float8")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues(
"null", "'123'", "'1234567890.1234567'",
// Postgres source does not support these special values yet
// https://github.com/airbytehq/airbyte/issues/8902
"'infinity'", "'-infinity'", "'nan'")
.addExpectedValues(null, "123.0", "1.2345678901234567E9", null, null, null)
.build());
}
addDataTypeTestData(
TestDataHolder.builder()
@@ -272,21 +264,15 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
.addExpectedValues("198.24.10.0/24", "198.24.10.0", "198.10.0.0/8", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("int")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("null", "-2147483648", "2147483647")
.addExpectedValues(null, "-2147483648", "2147483647")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("integer")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("null", "-2147483648", "2147483647")
.addExpectedValues(null, "-2147483648", "2147483647")
.build());
for (final String type : Set.of("integer", "int", "int4")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("null", "1001", "-2147483648", "2147483647")
.addExpectedValues(null, "1001", "-2147483648", "2147483647")
.build());
}
addDataTypeTestData(
TestDataHolder.builder()
@@ -312,6 +298,22 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
.addExpectedValues(null, "[1, 2, 3]")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("line")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'{4,5,6}'", "'{0,1,0}'", "null")
.addExpectedValues("{4,5,6}", "{0,1,0}", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("lseg")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null")
.addExpectedValues("[(3,7),(15,18)]", "[(0,0),(0,0)]", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("macaddr")
@@ -350,24 +352,26 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
Double.toString(-92233720368547758.08), Double.toString(92233720368547758.07))
.build());
// The numeric type in Postres may contain 'Nan' type, but in JdbcUtils-> rowToJson
// we try to map it like this, so it fails
// case NUMERIC, DECIMAL -> o.put(columnName, nullIfInvalid(() -> r.getBigDecimal(i)));
// https://github.com/airbytehq/airbyte/issues/7871
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("numeric")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("'99999'", "999999999999.9999999999", "10000000000000000000000000000000000000", "null")
.addExpectedValues("99999", "1.0E12", "1.0E37", null)
.build());
for (final String type : Set.of("numeric", "decimal")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues(
"'123'", "null", "'1234567890.1234567'",
// Postgres source does not support these special values yet
// https://github.com/airbytehq/airbyte/issues/8902
"'infinity'", "'-infinity'", "'nan'")
.addExpectedValues("123", null, "1.2345678901234567E9", null, null, null)
.build());
}
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("real")
.sourceType("path")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'123'", "'1234567890.1234567'", "null")
.addExpectedValues("123.0", "1.23456794E9", null)
.addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null")
.addExpectedValues("((3,7),(15,18))", "((0,0),(0,0))", null)
.build());
addDataTypeTestData(
@@ -378,71 +382,122 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
.addExpectedValues("7/A25801C8", "0/0", null)
.build());
// The numeric type in Postres may contain 'Nan' type, but in JdbcUtils-> rowToJson
// we try to map it like this, so it fails
// case NUMERIC, DECIMAL -> o.put(columnName, nullIfInvalid(() -> r.getBigDecimal(i)));
// https://github.com/airbytehq/airbyte/issues/7871
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("decimal")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("99999", "5.1", "0", "null")
.addExpectedValues("99999", "5.1", "0", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("smallint")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("null", "-32768", "32767")
.addExpectedValues(null, "-32768", "32767")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("text")
.sourceType("point")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'a'", "'abc'", "'Миші йдуть;'", "'櫻花分店'",
"''", "null", "'\\xF0\\x9F\\x9A\\x80'")
.addExpectedValues("a", "abc", "Миші йдуть;", "櫻花分店", "", null, "\\xF0\\x9F\\x9A\\x80")
.build());
// JdbcUtils-> DATE_FORMAT is set as ""yyyy-MM-dd'T'HH:mm:ss'Z'"" for both Date and Time types.
// So Time only (04:05:06) would be represented like "1970-01-01T04:05:06Z" which is incorrect
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("null")
.addNullExpectedValue()
.build());
// JdbcUtils-> DATE_FORMAT is set as ""yyyy-MM-dd'T'HH:mm:ss'Z'"" for both Date and Time types.
// So Time only (04:05:06) would be represented like "1970-01-01T04:05:06Z" which is incorrect
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timetz")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("null")
.addNullExpectedValue()
.addInsertValues("'(3,7)'", "'(0,0)'", "'(999999999999999999999999,0)'", "null")
.addExpectedValues("(3,7)", "(0,0)", "(1e+24,0)", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timestamp")
.sourceType("polygon")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("TIMESTAMP '2004-10-19 10:23:54'", "null")
.addExpectedValues("2004-10-19T10:23:54Z", null)
.addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'",
"'((0,0),(999999999999999999999999,0))'", "null")
.addExpectedValues("((3,7),(15,18))", "((0,0),(0,0))", "((0,0),(1e+24,0))", null)
.build());
// May be run locally, but correct the timezone aacording to your location
// addDataTypeTestData(
// TestDataHolder.builder()
// .sourceType("timestamptz")
// .airbyteType(JsonSchemaPrimitive.STRING)
// .addInsertValues("TIMESTAMP '2004-10-19 10:23:54+02'", "null")
// .addExpectedValues("2004-10-19T07:23:54Z", null)
// .build());
for (final String type : Set.of("real", "float4")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("null", "3.4145")
.addExpectedValues(null, "3.4145")
.build());
}
for (final String type : Set.of("smallint", "int2")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("null", "-32768", "32767")
.addExpectedValues(null, "-32768", "32767")
.build());
}
for (final String type : Set.of("smallserial", "serial2")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("1", "32767", "0", "-32767")
.addExpectedValues("1", "32767", "0", "-32767")
.build());
}
for (final String type : Set.of("serial", "serial4")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("1", "2147483647", "0", "-2147483647")
.addExpectedValues("1", "2147483647", "0", "-2147483647")
.build());
}
// time without time zone
for (final String fullSourceType : Set.of("time", "time without time zone")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaPrimitive.STRING)
// time column will ignore time zone
.addInsertValues("null", "'13:00:01'", "'13:00:02+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05Z+8'", "'13:00:06Z-8'")
.addExpectedValues(null, "13:00:01", "13:00:02", "13:00:03", "13:00:04", "13:00:05", "13:00:06")
.build());
}
// time with time zone
for (final String fullSourceType : Set.of("timetz", "time with time zone")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timetz")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("null", "'13:00:01'", "'13:00:02+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05Z+8'", "'13:00:06Z-8'")
// A time value without time zone will use the time zone set on the database, which is Z-8,
// so 13:00:01 is returned as 13:00:01-08.
.addExpectedValues(null, "13:00:01-08", "13:00:02+08", "13:00:03-08", "13:00:04+00", "13:00:05-08", "13:00:06+08")
.build());
}
// timestamp without time zone
for (final String fullSourceType : Set.of("timestamp", "timestamp without time zone")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timestamp")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("TIMESTAMP '2004-10-19 10:23:54'", "null")
.addExpectedValues("2004-10-19T10:23:54Z", null)
.build());
}
// timestamp with time zone
for (final String fullSourceType : Set.of("timestamptz", "timestamp with time zone")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timestamptz")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("TIMESTAMP '2004-10-19 10:23:54-08'", "null")
// 2004-10-19T10:23:54Z-8 = 2004-10-19T17:23:54Z
.addExpectedValues("2004-10-19T17:23:54Z", null)
.build());
}
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("tsquery")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("null", "'fat & (rat | cat)'::tsquery", "'fat:ab & cat'::tsquery")
.addExpectedValues(null, "'fat' & ( 'rat' | 'cat' )", "'fat':AB & 'cat'")
.build());
addDataTypeTestData(
TestDataHolder.builder()
@@ -452,14 +507,6 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
.addExpectedValues("'brown':3 'dog':9 'fox':4 'jump':5 'lazi':8 'quick':2")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("tsquery")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("to_tsquery('fat & rat')", "to_tsquery('Fat:ab & Cats')", "null")
.addExpectedValues("'fat' & 'rat'", "'fat':AB & 'cat'", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("uuid")
@@ -478,7 +525,7 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
.addExpectedValues("<book><title>Manual</title><chapter>...</chapter></book>", null, "")
.build());
// preconditions for this test are set at the time of database creation (setupDatabase method)
// enum type
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("mood")
@@ -487,23 +534,7 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
.addExpectedValues("happy", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("text")
.fullSourceDataType("text[]")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'{10000, 10000, 10000, 10000}'", "null")
.addExpectedValues("{10000,10000,10000,10000}", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("inventory_item")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null")
.addExpectedValues("(\"fuzzy dice\",42,1.99)", null)
.build());
// range
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("tsrange")
@@ -512,61 +543,23 @@ public class PostresSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
.addExpectedValues("(\"2010-01-01 14:30:00\",\"2010-01-01 15:30:00\")", null)
.build());
// array
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("box")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null")
.addExpectedValues("(15,18),(3,7)", "(0,0),(0,0)", null)
.sourceType("text")
.fullSourceDataType("text[]")
.airbyteType(JsonSchemaPrimitive.ARRAY)
.addInsertValues("'{10001, 10002, 10003, 10004}'", "null")
.addExpectedValues("[\"10001\",\"10002\",\"10003\",\"10004\"]", null)
.build());
// composite type
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("circle")
.sourceType("inventory_item")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'(5,7),10'", "'(0,0),0'", "'(-10,-4),10'", "null")
.addExpectedValues("<(5,7),10>", "<(0,0),0>", "<(-10,-4),10>", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("line")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'{4,5,6}'", "'{0,1,0}'", "null")
.addExpectedValues("{4,5,6}", "{0,1,0}", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("lseg")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null")
.addExpectedValues("[(3,7),(15,18)]", "[(0,0),(0,0)]", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("path")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'", "null")
.addExpectedValues("((3,7),(15,18))", "((0,0),(0,0))", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("point")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("'(3,7)'", "'(0,0)'", "'(999999999999999999999999,0)'", "null")
.addExpectedValues("(3,7)", "(0,0)", "(1e+24,0)", null)
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("polygon")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'((3,7),(15,18))'", "'((0,0),(0,0))'",
"'((0,0),(999999999999999999999999,0))'", "null")
.addExpectedValues("((3,7),(15,18))", "((0,0),(0,0))", "((0,0),(1e+24,0))", null)
.addInsertValues("ROW('fuzzy dice', 42, 1.99)", "null")
.addExpectedValues("(\"fuzzy dice\",42,1.99)", null)
.build());
}

View File

@@ -203,67 +203,61 @@ This produces the private key in pem format, and the public key remains in the s
## Data type mapping
Postgres data types are mapped to the following data types when synchronizing data. You can check the test values examples [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostresSourceComprehensiveTest.java). If you can't find the data type you are looking for or have any problems feel free to add a new test!
According to Postgres [documentation](https://www.postgresql.org/docs/14/datatype.html), Postgres data types are mapped to the following data types when synchronizing data. You can check the test values examples [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java). If you can't find the data type you are looking for or have any problems feel free to add a new test!
| Postgres Type | Resulting Type | Notes |
| :--- | :--- | :--- |
| `bigint` | number | |
| `bigserial` | number | |
| `bit` | boolean | bit is mapped to boolean type and `read` is failed for n-bit values |
| `blob` | boolean | |
| `boolean` | boolean | |
| `box` | string | |
| `bytea` | object | parsed value is encoded by Base64 [#7905](https://github.com/airbytehq/airbyte/issues/7905)|
| `character` | string | |
| `character varying` | string | |
| `cidr` | string | |
| `circle` | string | |
| `citext` | string | |
| `date` | string | |
| `double precision` | string | Values `-Infinity`, `Infinity`, `Nan` will not be parsed correctly. Parsed values for all of them are null [#7871](https://github.com/airbytehq/airbyte/issues/7871) |
| `enum` | number | |
| `float` | number | Values `-Infinity`, `Infinity`, `Nan` will not be parsed correctly. Parsed values for all of them are null [#7871](https://github.com/airbytehq/airbyte/issues/7871) |
| `float8` | number | Values `-Infinity`, `Infinity`, `Nan` will not be parsed correctly. Parsed values for all of them are null [#7871](https://github.com/airbytehq/airbyte/issues/7871) |
| `hstore` | object | may be de-nested depending on the destination you are syncing into |
| `inet` | string | |
| `int` | number | |
| `interval` | string | |
| `inventory_item` | string | |
| `json` | string | |
| `jsonb` | string | |
| `line` | string | |
| `lseg` | string | |
| `macaddr` | string | |
| `macaddr8` | string | |
| `money` | string | When running logical replication (CDC), `money` values larger than 999999999999999 (15 nines) or smaller than -999999999999999 (15 nines) are transmitted as null; When running default mode `money` value fail when amount is > 1000 [#7870](https://github.com/airbytehq/airbyte/issues/7870) |
| `mood` | string | |
| `numeric` | number | |
| `path` | string | |
| `point` | number | |
| `polygon` | number | |
| `real` | number | Values `-Infinity`, `Infinity`, `Nan` will not be parsed correctly. Parsed values for all of them are null [#7871](https://github.com/airbytehq/airbyte/issues/7871) |
| `serial` | number | |
| `smallint` | number | |
| `smallserial` | number | |
| `text` | string | |
| `text[]` | string | |
| `time` | string | |
| `timez` | string | |
| `time with timezone` | string | may be written as a native date type depending on the destination |
| `time without timezone` | string | may be written as a native date type depending on the destination |
| `timestamp with timezone` | string | may be written as a native date type depending on the destination |
| `timestamp without timezone` | string | may be written as a native date type depending on the destination |
| `tsrange` | string | |
| `tsvector` | string | |
| `tsquery` | string | is not supported with CDC node. Parsed value is null [#7911](https://github.com/airbytehq/airbyte/issues/7911) |
| `uuid` | string | |
| `varchar` | string | |
| `xml` | string | |
| Postgres Type | Resulting Type | Notes |
|:--------------------------------------|:---------------|:------------------------------------------------------------------------------------------------------------|
| `bigint` | number | |
| `bigserial`, `serial8` | number | |
| `bit` | string | Fixed-length bit string (e.g. "0100"). |
| `bit varying`, `varbit` | string | Variable-length bit string (e.g. "0100"). |
| `boolean`, `bool` | boolean | |
| `box` | string | |
| `bytea` | string | Variable length binary string with hex output format prefixed with "\x" (e.g. "\x6b707a"). |
| `character`, `char` | string | |
| `character varying`, `varchar` | string | |
| `cidr` | string | |
| `circle` | string | |
| `date` | string | Parsed as ISO8601 date time at midnight. Does not support B.C. dates. Issue: [#8903](https://github.com/airbytehq/airbyte/issues/8903). |
| `double precision`, `float`, `float8` | number | `Infinity`, `-Infinity`, and `NaN` are not supported and converted to `null`. Issue: [#8902](https://github.com/airbytehq/airbyte/issues/8902). |
| `inet` | string | |
| `integer`, `int`, `int4` | number | |
| `interval` | string | |
| `json` | string | |
| `jsonb` | string | |
| `line` | string | |
| `lseg` | string | |
| `macaddr` | string | |
| `macaddr8` | string | |
| `money` | number | |
| `numeric`, `decimal` | number | `Infinity`, `-Infinity`, and `NaN` are not supported and converted to `null`. Issue: [#8902](https://github.com/airbytehq/airbyte/issues/8902). |
| `path` | string | |
| `pg_lsn` | string | |
| `point` | string | |
| `polygon` | string | |
| `real`, `float4` | number | |
| `smallint`, `int2` | number | |
| `smallserial`, `serial2` | number | |
| `serial`, `serial4` | number | |
| `text` | string | |
| `time` | string | |
| `timetz` | string | |
| `timestamp` | string | |
| `timestamptz` | string | |
| `tsquery` | string | Not supported with CDC node. Parsed value is null. Issue: [#7911](https://github.com/airbytehq/airbyte/issues/7911) |
| `tsvector` | string | |
| `uuid` | string | |
| `xml` | string | |
| `enum` | string | |
| `tsrange` | string | |
| array | string | E.g. "{10001,10002,10003,10004}". |
| composite type | string | |
## Changelog
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 0.4.0 | 2021-12-13 | [8726](https://github.com/airbytehq/airbyte/pull/8726) | Support all Postgres types |
| 0.3.17 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key |
| 0.3.16 | 2021-11-28 | [7995](https://github.com/airbytehq/airbyte/pull/7995) | Fixed money type with amount > 1000 |
| 0.3.15 | 2021-11-26 | [8066](https://github.com/airbytehq/airbyte/pull/8266) | Fixed the case, when Views are not listed during schema discovery |