1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[Source-Mssql] Assess and map all SQLServer supported datatypes to the correct JDBC and Airbyte types for Standard and CDC mode (#31531)

Co-authored-by: nguyenaiden <nguyenaiden@users.noreply.github.com>
This commit is contained in:
Duy Nguyen
2023-11-06 16:30:08 -08:00
committed by GitHub
parent 19a29db86f
commit 1e59751bc4
13 changed files with 187 additions and 57 deletions

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 2.0.4
dockerImageTag: 3.0.0
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
@@ -29,6 +29,9 @@ data:
- language:python
releases:
breakingChanges:
3.0.0:
message: "Remapped columns of types: date, datetime, datetime2, datetimeoffset, smalldatetime, and time from `String` to their appropriate Airbyte types. Customers whose streams have columns with the affected data types must take action with their connections."
upgradeDeadline: "2023-12-07"
2.0.0:
message: "Add default cursor for cdc"
upgradeDeadline: "2023-08-23"

View File

@@ -16,6 +16,7 @@ import com.microsoft.sqlserver.jdbc.Geography;
import com.microsoft.sqlserver.jdbc.Geometry;
import com.microsoft.sqlserver.jdbc.SQLServerResultSetMetaData;
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.nio.charset.Charset;
import java.sql.JDBCType;
import java.sql.ResultSet;
@@ -88,6 +89,19 @@ public class MssqlSourceOperations extends JdbcSourceOperations {
|| typeName.equalsIgnoreCase("hierarchyid")) {
return JDBCType.VARCHAR;
}
if (typeName.equalsIgnoreCase("datetime")) {
return JDBCType.TIMESTAMP;
}
if (typeName.equalsIgnoreCase("datetimeoffset")) {
return JDBCType.TIMESTAMP_WITH_TIMEZONE;
}
if (typeName.equalsIgnoreCase("real")) {
return JDBCType.REAL;
}
return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (final IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
@@ -126,4 +140,20 @@ public class MssqlSourceOperations extends JdbcSourceOperations {
node.put(columnName, Geography.deserialize(resultSet.getBytes(index)).toString());
}
@Override
public JsonSchemaType getAirbyteType(final JDBCType jdbcType) {
return switch (jdbcType) {
case TINYINT, SMALLINT, INTEGER, BIGINT -> JsonSchemaType.INTEGER;
case DOUBLE, DECIMAL, FLOAT, NUMERIC, REAL -> JsonSchemaType.NUMBER;
case BOOLEAN, BIT -> JsonSchemaType.BOOLEAN;
case NULL -> JsonSchemaType.NULL;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaType.STRING_BASE_64;
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE;
case TIMESTAMP_WITH_TIMEZONE -> JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE;
case TIMESTAMP -> JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE;
case DATE -> JsonSchemaType.STRING_DATE;
default -> JsonSchemaType.STRING;
};
}
}

View File

@@ -142,7 +142,7 @@ public abstract class AbstractMssqlSourceDatatypeTest extends AbstractSourceData
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("date")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("'0001-01-01'", "'9999-12-31'", "'1999-01-08'", "null")
.addExpectedValues("0001-01-01", "9999-12-31", "1999-01-08", null)
.createTablePatternSql(CREATE_TABLE_SQL)
@@ -151,7 +151,7 @@ public abstract class AbstractMssqlSourceDatatypeTest extends AbstractSourceData
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("smalldatetime")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("'1900-01-01'", "'2079-06-06'", "null")
.addExpectedValues("1900-01-01T00:00:00.000000", "2079-06-06T00:00:00.000000", null)
.createTablePatternSql(CREATE_TABLE_SQL)
@@ -160,7 +160,7 @@ public abstract class AbstractMssqlSourceDatatypeTest extends AbstractSourceData
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("'1753-01-01'", "'9999-12-31'", "'9999-12-31T13:00:04'",
"'9999-12-31T13:00:04.123'", "null")
.addExpectedValues("1753-01-01T00:00:00.000000", "9999-12-31T00:00:00.000000", "9999-12-31T13:00:04",
@@ -171,7 +171,7 @@ public abstract class AbstractMssqlSourceDatatypeTest extends AbstractSourceData
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime2")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("'0001-01-01'", "'9999-12-31'", "'9999-12-31T13:00:04.123456'", "null")
.addExpectedValues("0001-01-01T00:00:00.000000", "9999-12-31T00:00:00.000000", "9999-12-31T13:00:04.123456", null)
.createTablePatternSql(CREATE_TABLE_SQL)
@@ -180,7 +180,7 @@ public abstract class AbstractMssqlSourceDatatypeTest extends AbstractSourceData
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
.addInsertValues("null", "'13:00:01'", "'13:00:04Z'", "'13:00:04.123456Z'")
.addExpectedValues(null, "13:00:01", "13:00:04", "13:00:04.123456")
.createTablePatternSql(CREATE_TABLE_SQL)
@@ -189,7 +189,7 @@ public abstract class AbstractMssqlSourceDatatypeTest extends AbstractSourceData
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetimeoffset")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)
.addInsertValues("'0001-01-10 00:00:00 +01:00'", "'9999-01-10 00:00:00 +01:00'", "null")
.addExpectedValues("0001-01-10 00:00:00.0000000 +01:00",
"9999-01-10 00:00:00.0000000 +01:00", null)

View File

@@ -26,8 +26,7 @@ public class CdcMssqlSourceDatatypeTest extends AbstractMssqlSourceDatatypeTest
@Override
protected Database setupDatabase() throws Exception {
container = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest")
.acceptLicense();
container = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense();
container.addEnv("MSSQL_AGENT_ENABLED", "True"); // need this running for cdc to work
container.start();
@@ -117,4 +116,9 @@ public class CdcMssqlSourceDatatypeTest extends AbstractMssqlSourceDatatypeTest
+ "DEALLOCATE CDC_Cursor");
}
@Override
public boolean testCatalog() {
return true;
}
}

View File

@@ -20,7 +20,6 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.DSLContext;
import org.junit.jupiter.api.AfterAll;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.utility.DockerImageName;
public class SslEnabledMssqlSourceAcceptanceTest extends MssqlSourceAcceptanceTest {
@@ -35,10 +34,7 @@ public class SslEnabledMssqlSourceAcceptanceTest extends MssqlSourceAcceptanceTe
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws SQLException {
if (db == null) {
db = new MSSQLServerContainer<>(DockerImageName
.parse("airbyte/mssql_ssltest:dev")
.asCompatibleSubstituteFor("mcr.microsoft.com/mssql/server"))
.acceptLicense().withUrlParam("trustServerCertificate", "true");
db = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2022-RTM-CU2-ubuntu-20.04").acceptLicense();
db.start();
}

View File

@@ -84,7 +84,7 @@ public class CdcMssqlSourceTest extends CdcSourceTest {
@BeforeAll
public static void createContainer() {
if (container == null) {
container = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2022-latest").acceptLicense();
container = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense();
container.addEnv("MSSQL_AGENT_ENABLED", "True"); // need this running for cdc to work
container.start();
}

View File

@@ -19,8 +19,15 @@ import io.airbyte.cdk.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.cdk.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.JDBCType;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.junit.jupiter.api.AfterAll;
@@ -41,7 +48,7 @@ public class MssqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
static void init() {
// In mssql, timestamp is generated automatically, so we need to use
// the datetime type instead so that we can set the value manually.
COL_TIMESTAMP_TYPE = "DATETIME";
COL_TIMESTAMP_TYPE = "DATETIME2";
dbContainer = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense();
dbContainer.start();
@@ -171,4 +178,34 @@ public class MssqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
assertTrue(status.getMessage().contains("State code: S0001; Error code: 4060;"));
}
@Override
protected AirbyteCatalog getCatalog(final String defaultNamespace) {
return new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}
}

View File

@@ -46,7 +46,7 @@ class MssqlSourceTest {
DB_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING),
Field.of("born", JsonSchemaType.STRING))
Field.of("born", JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))));