1
0
mirror of synced 2025-12-23 21:03:15 -05:00

[Source-mysql/mssql] : Populate null values (#37111)

This commit is contained in:
Akash Kulkarni
2024-04-16 14:25:27 -07:00
committed by GitHub
parent 70afb9d2c7
commit bae63044cc
14 changed files with 80 additions and 74 deletions

View File

@@ -3,7 +3,7 @@ plugins {
}
airbyteJavaConnector {
cdkVersionRequired = '0.30.4'
cdkVersionRequired = '0.30.5'
features = ['db-sources']
useLocalCdk = false
}

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.12
dockerImageTag: 4.0.13
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql

View File

@@ -71,14 +71,20 @@ public class MssqlSourceOperations extends JdbcSourceOperations {
@Override
public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json)
throws SQLException {
final SQLServerResultSetMetaData metadata = (SQLServerResultSetMetaData) resultSet
.getMetaData();
final String columnName = metadata.getColumnName(colIndex);
final String columnTypeName = metadata.getColumnTypeName(colIndex);
final JDBCType columnType = safeGetJdbcType(metadata.getColumnType(colIndex));
if (columnTypeName.equalsIgnoreCase("time")) {
// Attempt to access the column. this allows us to know if it is null before we do
// type-specific parsing. If the column is null, we will populate the null value and skip attempting
// to
// parse the column value.
resultSet.getObject(colIndex);
if (resultSet.wasNull()) {
json.putNull(columnName);
} else if (columnTypeName.equalsIgnoreCase("time")) {
putTime(json, columnName, resultSet, colIndex);
} else if (columnTypeName.equalsIgnoreCase("geometry")) {
putGeometry(json, columnName, resultSet, colIndex);

View File

@@ -215,7 +215,7 @@ public class CdcMssqlSourceAcceptanceTest extends SourceAcceptanceTest {
ObjectMapper mapper = new ObjectMapper();
assertTrue(cdcFieldsOmitted(recordMessages.get(0).getData()).equals(
mapper.readTree("{\"id\":4, \"name\":\"voyager\"}")));
mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}")));
// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
@@ -229,9 +229,8 @@ public class CdcMssqlSourceAcceptanceTest extends SourceAcceptanceTest {
assertFalse(
secondSyncRecords.isEmpty(),
"Expected the second incremental sync to produce records.");
assertTrue(cdcFieldsOmitted(secondSyncRecords.get(0).getData()).equals(
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}")));
assertEquals(cdcFieldsOmitted(secondSyncRecords.get(0).getData()),
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}"));
}
private JsonNode cdcFieldsOmitted(final JsonNode node) {

View File

@@ -187,7 +187,7 @@ public class MssqlSourceAcceptanceTest extends SourceAcceptanceTest {
ObjectMapper mapper = new ObjectMapper();
assertTrue(recordMessages.get(0).getData().equals(
mapper.readTree("{\"id\":4, \"name\":\"voyager\"}")));
mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}}")));
// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
@@ -203,7 +203,7 @@ public class MssqlSourceAcceptanceTest extends SourceAcceptanceTest {
secondSyncRecords.isEmpty(),
"Expected the second incremental sync to produce records.");
assertTrue(secondSyncRecords.get(0).getData().equals(
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\"}")));
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}}")));
}