🐛 Source IBM Db2: Connector provides wrong values for a datatype (#7670)
* added DB2 type transformation * added DB2 type transformation * fixed code style * fixed remarks * fixed remarks * fixed remarks * added java doc for certificate generation * bump new version
This commit is contained in:
@@ -2,6 +2,6 @@
|
||||
"sourceDefinitionId": "447e0381-3780-4b46-bb62-00a4e3c8b8e2",
|
||||
"name": "IBM Db2",
|
||||
"dockerRepository": "airbyte/source-db2",
|
||||
"dockerImageTag": "0.1.2",
|
||||
"dockerImageTag": "0.1.3",
|
||||
"documentationUrl": "https://docs.airbyte.io/integrations/sources/db2"
|
||||
}
|
||||
|
||||
@@ -249,7 +249,7 @@
|
||||
- name: IBM Db2
|
||||
sourceDefinitionId: 447e0381-3780-4b46-bb62-00a4e3c8b8e2
|
||||
dockerRepository: airbyte/source-db2
|
||||
dockerImageTag: 0.1.2
|
||||
dockerImageTag: 0.1.3
|
||||
documentationUrl: https://docs.airbyte.io/integrations/sources/db2
|
||||
sourceType: database
|
||||
- name: Instagram
|
||||
|
||||
@@ -2,19 +2,16 @@
|
||||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.source.db2;
|
||||
package io.airbyte.db.jdbc;
|
||||
|
||||
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
|
||||
public class Db2JdbcStreamingQueryConfiguration implements
|
||||
JdbcStreamingQueryConfiguration {
|
||||
public class Db2JdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration {
|
||||
|
||||
@Override
|
||||
public void accept(final Connection connection, final PreparedStatement preparedStatement)
|
||||
throws SQLException {
|
||||
public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException {
|
||||
connection.setAutoCommit(false);
|
||||
preparedStatement.setFetchSize(1000);
|
||||
}
|
||||
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
|
||||
|
||||
RUN tar xf ${APPLICATION}.tar --strip-components=1
|
||||
|
||||
LABEL io.airbyte.version=0.1.2
|
||||
LABEL io.airbyte.version=0.1.3
|
||||
LABEL io.airbyte.name=airbyte/source-db2
|
||||
|
||||
@@ -7,6 +7,8 @@ package io.airbyte.integrations.source.db2;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
|
||||
import io.airbyte.db.jdbc.JdbcSourceOperations;
|
||||
import io.airbyte.integrations.base.IntegrationRunner;
|
||||
import io.airbyte.integrations.base.Source;
|
||||
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
|
||||
@@ -24,12 +26,13 @@ public class Db2Source extends AbstractJdbcSource implements Source {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Db2Source.class);
|
||||
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";
|
||||
private static Db2SourceOperations operations;
|
||||
|
||||
private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8);
|
||||
private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks";
|
||||
|
||||
public Db2Source() {
|
||||
super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration());
|
||||
super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration(), new Db2SourceOperations());
|
||||
}
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.source.db2;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.db.jdbc.JdbcSourceOperations;
|
||||
import java.sql.JDBCType;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class Db2SourceOperations extends JdbcSourceOperations {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Db2SourceOperations.class);
|
||||
private static final List<String> DB2_UNIQUE_NUMBER_TYPES = List.of("DECFLOAT");
|
||||
|
||||
@Override
|
||||
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
|
||||
final int columnCount = queryContext.getMetaData().getColumnCount();
|
||||
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
|
||||
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
setFields(queryContext, i, jsonNode);
|
||||
}
|
||||
|
||||
return jsonNode;
|
||||
}
|
||||
|
||||
/* Helpers */
|
||||
|
||||
private void setFields(ResultSet queryContext, int index, ObjectNode jsonNode) throws SQLException {
|
||||
try {
|
||||
queryContext.getObject(index);
|
||||
if (!queryContext.wasNull()) {
|
||||
setJsonField(queryContext, index, jsonNode);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
if (DB2_UNIQUE_NUMBER_TYPES.contains(queryContext.getMetaData().getColumnTypeName(index))) {
|
||||
db2UniqueTypes(queryContext, index, jsonNode);
|
||||
} else {
|
||||
throw new SQLException(e.getCause());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void db2UniqueTypes(ResultSet resultSet, int index, ObjectNode jsonNode) throws SQLException {
|
||||
String columnType = resultSet.getMetaData().getColumnTypeName(index);
|
||||
String columnName = resultSet.getMetaData().getColumnName(index);
|
||||
if (DB2_UNIQUE_NUMBER_TYPES.contains(columnType)) {
|
||||
putDecfloat(jsonNode, columnName, resultSet, index);
|
||||
}
|
||||
}
|
||||
|
||||
private void putDecfloat(final ObjectNode node,
|
||||
final String columnName,
|
||||
final ResultSet resultSet,
|
||||
final int index) {
|
||||
try {
|
||||
final double value = resultSet.getDouble(index);
|
||||
node.put(columnName, value);
|
||||
} catch (final SQLException e) {
|
||||
node.put(columnName, (Double) null);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -142,13 +142,15 @@ public class Db2SourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
|
||||
"DOUBLE('1.7976931348623157E+308')")
|
||||
.addExpectedValues(null, "-1.7976931348623157E308", "-2.2250738585072014E-308", "2.2250738585072014E-308", "1.7976931348623157E308")
|
||||
.build());
|
||||
|
||||
// DECFLOAT type tests
|
||||
addDataTypeTestData(
|
||||
TestDataHolder.builder()
|
||||
.createTablePatternSql(CREATE_TABLE_SQL)
|
||||
.sourceType("DECFLOAT")
|
||||
.airbyteType(JsonSchemaPrimitive.NUMBER)
|
||||
.fullSourceDataType("DECFLOAT(16)")
|
||||
.addInsertValues("null", "0", "DECFLOAT(10E+307, 16)", "DECFLOAT(10E-307, 16)")
|
||||
.addInsertValues("null", "0", "1.0E308", "1.0E-306")
|
||||
.addExpectedValues(null, "0", "1E+308", "1E-306")
|
||||
.build());
|
||||
addDataTypeTestData(
|
||||
@@ -160,16 +162,14 @@ public class Db2SourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
|
||||
.addInsertValues("null", "0", "DECFLOAT(10E+307, 34)", "DECFLOAT(10E-307, 34)")
|
||||
.addExpectedValues(null, "0", "1E+308", "1E-306")
|
||||
.build());
|
||||
|
||||
// TODO "SNaN", "NaN", "Infinity" - fail with an exception in Db2 Driver during conversion to a
|
||||
// BigDecimal.
|
||||
// Could be fixed by mapping DECFLOAT to Double or String according to:
|
||||
// https://www.ibm.com/docs/en/db2-for-zos/12?topic=dttmddtija-retrieval-special-values-from-decfloat-columns-in-java-applications
|
||||
/*
|
||||
* addDataTypeTestData( TestDataHolder.builder() .createTablePatternSql(CREATE_TABLE_SQL)
|
||||
* .sourceType("DECFLOAT") .airbyteType(JsonSchemaPrimitive.NUMBER) .addInsertValues("SNaN", "NaN",
|
||||
* "Infinity") .addExpectedValues() .build());
|
||||
*/
|
||||
addDataTypeTestData(
|
||||
TestDataHolder.builder()
|
||||
.createTablePatternSql(CREATE_TABLE_SQL)
|
||||
.sourceType("DECFLOAT")
|
||||
.airbyteType(JsonSchemaPrimitive.NUMBER)
|
||||
.addInsertValues("SNaN", "NaN", "Infinity", "-Infinity")
|
||||
.addExpectedValues("NaN", "NaN", "Infinity", "-Infinity")
|
||||
.build());
|
||||
|
||||
// Boolean values
|
||||
addDataTypeTestData(
|
||||
|
||||
@@ -62,6 +62,7 @@ You can also enter your own password for the keystore, but if you don't, the pas
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
| :--- | :--- | :--- | :--- |
|
||||
| 0.1.3 | 2021-11-05 | [7670](https://github.com/airbytehq/airbyte/pull/7670) | Updated unique DB2 types transformation |
|
||||
| 0.1.2 | 2021-10-25 | [7355](https://github.com/airbytehq/airbyte/pull/7355) | Added ssl support |
|
||||
| 0.1.1 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |
|
||||
| 0.1.0 | 2021-06-22 | [4197](https://github.com/airbytehq/airbyte/pull/4197) | New Source: IBM DB2 |
|
||||
|
||||
Reference in New Issue
Block a user