[Destination MSSQL] v2 rc8 (#54186)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
@@ -1597,6 +1597,148 @@ abstract class BasicFunctionalityIntegrationTest(
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
open fun testDedupWithStringKey() {
|
||||
assumeTrue(supportsDedup)
|
||||
fun makeStream(syncId: Long) =
|
||||
DestinationStream(
|
||||
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
|
||||
importType =
|
||||
Dedupe(
|
||||
primaryKey = listOf(listOf("id1"), listOf("id2")),
|
||||
cursor = listOf("updated_at"),
|
||||
),
|
||||
schema =
|
||||
ObjectType(
|
||||
properties =
|
||||
linkedMapOf(
|
||||
"id1" to stringType,
|
||||
"id2" to intType,
|
||||
"updated_at" to timestamptzType,
|
||||
"name" to stringType,
|
||||
"_ab_cdc_deleted_at" to timestamptzType,
|
||||
)
|
||||
),
|
||||
generationId = 42,
|
||||
minimumGenerationId = 0,
|
||||
syncId = syncId,
|
||||
)
|
||||
fun makeRecord(data: String, extractedAt: Long) =
|
||||
InputRecord(
|
||||
randomizedNamespace,
|
||||
"test_stream",
|
||||
data,
|
||||
emittedAtMs = extractedAt,
|
||||
)
|
||||
|
||||
val sync1Stream = makeStream(syncId = 42)
|
||||
runSync(
|
||||
updatedConfig,
|
||||
sync1Stream,
|
||||
listOf(
|
||||
// emitted_at:1000 is equal to 1970-01-01 00:00:01Z.
|
||||
// This obviously makes no sense in relation to updated_at being in the year 2000,
|
||||
// but that's OK because (from destinations POV) updated_at has no relation to
|
||||
// extractedAt.
|
||||
makeRecord(
|
||||
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice1", "_ab_cdc_deleted_at": null}""",
|
||||
extractedAt = 1000,
|
||||
),
|
||||
// Emit a second record for id=(1,200) with a different updated_at.
|
||||
makeRecord(
|
||||
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice2", "_ab_cdc_deleted_at": null}""",
|
||||
extractedAt = 1000,
|
||||
),
|
||||
// Emit a record with no _ab_cdc_deleted_at field. CDC sources typically emit an
|
||||
// explicit null, but we should handle both cases.
|
||||
makeRecord(
|
||||
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob1"}""",
|
||||
extractedAt = 1000,
|
||||
),
|
||||
),
|
||||
)
|
||||
dumpAndDiffRecords(
|
||||
parsedConfig,
|
||||
listOf(
|
||||
// Alice has only the newer record, and Bob also exists
|
||||
OutputRecord(
|
||||
extractedAt = 1000,
|
||||
generationId = 42,
|
||||
data =
|
||||
mapOf(
|
||||
"id1" to "9cf974de-52cf-4194-9f3d-7efa76ba4d84",
|
||||
"id2" to 200,
|
||||
"updated_at" to TimestampWithTimezoneValue("2000-01-01T00:01:00Z"),
|
||||
"name" to "Alice2",
|
||||
"_ab_cdc_deleted_at" to null
|
||||
),
|
||||
airbyteMeta = OutputRecord.Meta(syncId = 42),
|
||||
),
|
||||
OutputRecord(
|
||||
extractedAt = 1000,
|
||||
generationId = 42,
|
||||
data =
|
||||
mapOf(
|
||||
"id1" to "9cf974de-52cf-4194-9f3d-7efa76ba4d84",
|
||||
"id2" to 201,
|
||||
"updated_at" to TimestampWithTimezoneValue("2000-01-01T00:02:00Z"),
|
||||
"name" to "Bob1"
|
||||
),
|
||||
airbyteMeta = OutputRecord.Meta(syncId = 42),
|
||||
),
|
||||
),
|
||||
sync1Stream,
|
||||
primaryKey = listOf(listOf("id1"), listOf("id2")),
|
||||
cursor = listOf("updated_at"),
|
||||
)
|
||||
|
||||
val sync2Stream = makeStream(syncId = 43)
|
||||
runSync(
|
||||
updatedConfig,
|
||||
sync2Stream,
|
||||
listOf(
|
||||
// Update both Alice and Bob
|
||||
makeRecord(
|
||||
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice3", "_ab_cdc_deleted_at": null}""",
|
||||
extractedAt = 2000,
|
||||
),
|
||||
makeRecord(
|
||||
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob2"}""",
|
||||
extractedAt = 2000,
|
||||
),
|
||||
// And delete Bob. Again, T+D doesn't check the actual _value_ of deleted_at (i.e.
|
||||
// the fact that it's in the past is irrelevant). It only cares whether deleted_at
|
||||
// is non-null. So the destination should delete Bob.
|
||||
makeRecord(
|
||||
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}""",
|
||||
extractedAt = 2000,
|
||||
),
|
||||
),
|
||||
)
|
||||
dumpAndDiffRecords(
|
||||
parsedConfig,
|
||||
listOf(
|
||||
// Alice still exists (and has been updated to the latest version), but Bob is gone
|
||||
OutputRecord(
|
||||
extractedAt = 2000,
|
||||
generationId = 42,
|
||||
data =
|
||||
mapOf(
|
||||
"id1" to "9cf974de-52cf-4194-9f3d-7efa76ba4d84",
|
||||
"id2" to 200,
|
||||
"updated_at" to TimestampWithTimezoneValue("2000-01-02T00:00:00Z"),
|
||||
"name" to "Alice3",
|
||||
"_ab_cdc_deleted_at" to null
|
||||
),
|
||||
airbyteMeta = OutputRecord.Meta(syncId = 43),
|
||||
)
|
||||
),
|
||||
sync2Stream,
|
||||
primaryKey = listOf(listOf("id1"), listOf("id2")),
|
||||
cursor = listOf("updated_at"),
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the cursor column in the second sync to a column that doesn't exist in the first sync.
|
||||
* Verify that we overwrite everything correctly.
|
||||
|
||||
@@ -16,7 +16,7 @@ data:
|
||||
type: GSM
|
||||
connectorType: destination
|
||||
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
|
||||
dockerImageTag: 0.1.9
|
||||
dockerImageTag: 0.1.10
|
||||
dockerRepository: airbyte/destination-mssql-v2
|
||||
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql-v2
|
||||
githubIssueLabel: destination-mssql-v2
|
||||
|
||||
@@ -23,12 +23,11 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID
|
||||
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META
|
||||
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
|
||||
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration
|
||||
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteTypeToSqlType
|
||||
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteTypeToMssqlType
|
||||
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setAsNullValue
|
||||
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setValue
|
||||
import io.airbyte.integrations.destination.mssql.v2.convert.MssqlType
|
||||
import io.airbyte.integrations.destination.mssql.v2.convert.ResultSetToAirbyteValue.Companion.getAirbyteNamedValue
|
||||
import io.airbyte.integrations.destination.mssql.v2.convert.SqlTypeToMssqlType
|
||||
import io.airbyte.protocol.models.Jsons
|
||||
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
|
||||
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
|
||||
@@ -230,9 +229,9 @@ class MSSQLQueryBuilder(
|
||||
Append -> emptyList()
|
||||
Overwrite -> emptyList()
|
||||
}
|
||||
private val indexedColumns: Set<String> = uniquenessKey.toSet()
|
||||
|
||||
private val toSqlType = AirbyteTypeToSqlType()
|
||||
private val toMssqlType = SqlTypeToMssqlType()
|
||||
private val toMssqlType = AirbyteTypeToMssqlType()
|
||||
|
||||
val finalTableSchema: List<NamedField> =
|
||||
airbyteFinalTableFields + extractFinalTableSchema(stream.schema)
|
||||
@@ -251,9 +250,7 @@ class MSSQLQueryBuilder(
|
||||
}
|
||||
|
||||
private fun getSchema(): List<NamedSqlField> =
|
||||
finalTableSchema.map {
|
||||
NamedSqlField(it.name, toMssqlType.convert(toSqlType.convert(it.type.type)))
|
||||
}
|
||||
finalTableSchema.map { NamedSqlField(it.name, toMssqlType.convert(it.type.type)) }
|
||||
|
||||
fun updateSchema(connection: Connection) {
|
||||
val existingSchema = getExistingSchema(connection)
|
||||
@@ -486,7 +483,12 @@ class MSSQLQueryBuilder(
|
||||
separator: String = DEFAULT_SEPARATOR
|
||||
): String {
|
||||
return schema.joinToString(separator = separator) {
|
||||
"[${it.name}] ${toMssqlType.convert(toSqlType.convert(it.type.type)).sqlString} NULL"
|
||||
val mssqlType =
|
||||
toMssqlType.convert(
|
||||
it.type.type,
|
||||
isIndexed = indexedColumns.contains(it.name),
|
||||
)
|
||||
"[${it.name}] ${mssqlType.sqlString} NULL"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.mssql.v2.convert
|
||||
|
||||
import io.airbyte.cdk.load.data.AirbyteType
|
||||
import io.airbyte.cdk.load.data.ArrayType
|
||||
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
|
||||
import io.airbyte.cdk.load.data.BooleanType
|
||||
import io.airbyte.cdk.load.data.DateType
|
||||
import io.airbyte.cdk.load.data.IntegerType
|
||||
import io.airbyte.cdk.load.data.NumberType
|
||||
import io.airbyte.cdk.load.data.ObjectType
|
||||
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
|
||||
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
|
||||
import io.airbyte.cdk.load.data.StringType
|
||||
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
|
||||
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
|
||||
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
|
||||
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
|
||||
import io.airbyte.cdk.load.data.UnionType
|
||||
import io.airbyte.cdk.load.data.UnknownType
|
||||
import java.sql.Types
|
||||
|
||||
enum class MssqlType(val sqlType: Int, val sqlStringOverride: String? = null) {
|
||||
TEXT(Types.LONGVARCHAR),
|
||||
BIT(Types.BOOLEAN),
|
||||
DATE(Types.DATE),
|
||||
BIGINT(Types.BIGINT),
|
||||
DECIMAL(Types.DECIMAL, sqlStringOverride = "DECIMAL(18, 8)"),
|
||||
VARCHAR(Types.VARCHAR, sqlStringOverride = "VARCHAR(MAX)"),
|
||||
VARCHAR_INDEX(Types.VARCHAR, sqlStringOverride = "VARCHAR(200)"),
|
||||
DATETIMEOFFSET(Types.TIMESTAMP_WITH_TIMEZONE),
|
||||
TIME(Types.TIME),
|
||||
DATETIME(Types.TIMESTAMP);
|
||||
|
||||
val sqlString: String = sqlStringOverride ?: name
|
||||
}
|
||||
|
||||
class AirbyteTypeToMssqlType {
|
||||
fun convert(airbyteSchema: AirbyteType, isIndexed: Boolean = false): MssqlType {
|
||||
return when (airbyteSchema) {
|
||||
is ObjectType -> MssqlType.TEXT
|
||||
is ArrayType -> MssqlType.TEXT
|
||||
is ArrayTypeWithoutSchema -> MssqlType.TEXT
|
||||
is BooleanType -> MssqlType.BIT
|
||||
is DateType -> MssqlType.DATE
|
||||
is IntegerType -> MssqlType.BIGINT
|
||||
is NumberType -> MssqlType.DECIMAL
|
||||
is ObjectTypeWithEmptySchema -> MssqlType.TEXT
|
||||
is ObjectTypeWithoutSchema -> MssqlType.TEXT
|
||||
is StringType -> if (isIndexed) MssqlType.VARCHAR_INDEX else MssqlType.VARCHAR
|
||||
is TimeTypeWithTimezone -> MssqlType.DATETIMEOFFSET
|
||||
is TimeTypeWithoutTimezone -> MssqlType.TIME
|
||||
is TimestampTypeWithTimezone -> MssqlType.DATETIMEOFFSET
|
||||
is TimestampTypeWithoutTimezone -> MssqlType.DATETIME
|
||||
is UnionType -> MssqlType.TEXT
|
||||
is UnknownType -> MssqlType.TEXT
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,84 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.mssql.v2.convert
|
||||
|
||||
import io.airbyte.cdk.load.data.AirbyteType
|
||||
import io.airbyte.cdk.load.data.ArrayType
|
||||
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
|
||||
import io.airbyte.cdk.load.data.BooleanType
|
||||
import io.airbyte.cdk.load.data.DateType
|
||||
import io.airbyte.cdk.load.data.IntegerType
|
||||
import io.airbyte.cdk.load.data.NumberType
|
||||
import io.airbyte.cdk.load.data.ObjectType
|
||||
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
|
||||
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
|
||||
import io.airbyte.cdk.load.data.StringType
|
||||
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
|
||||
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
|
||||
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
|
||||
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
|
||||
import io.airbyte.cdk.load.data.UnionType
|
||||
import io.airbyte.cdk.load.data.UnknownType
|
||||
import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn
|
||||
import io.airbyte.integrations.destination.mssql.v2.model.SqlTable
|
||||
import java.sql.Types
|
||||
|
||||
/** CDK pipeline [AirbyteType] to SQL [Types] converter. */
|
||||
class AirbyteTypeToSqlType {
|
||||
|
||||
/**
|
||||
* Converts an [AirbyteType] to the associated SQL [Types] value.
|
||||
*
|
||||
* @param airbyteSchema The stream's Airbyte schema, represented as an [AirbyteType]
|
||||
* @return The associated SQL [Types] value.
|
||||
* @throws IllegalArgumentException if the [AirbyteType] is not supported.
|
||||
*/
|
||||
fun convert(airbyteSchema: AirbyteType): Int {
|
||||
return when (airbyteSchema) {
|
||||
is ObjectType -> Types.LONGVARCHAR
|
||||
is ArrayType -> Types.LONGVARCHAR
|
||||
is ArrayTypeWithoutSchema -> Types.LONGVARCHAR
|
||||
is BooleanType -> Types.BOOLEAN
|
||||
is DateType -> Types.DATE
|
||||
is IntegerType -> Types.BIGINT
|
||||
is NumberType -> Types.DECIMAL
|
||||
is ObjectTypeWithEmptySchema -> Types.LONGVARCHAR
|
||||
is ObjectTypeWithoutSchema -> Types.LONGVARCHAR
|
||||
is StringType -> Types.VARCHAR
|
||||
is TimeTypeWithTimezone -> Types.TIME_WITH_TIMEZONE
|
||||
is TimeTypeWithoutTimezone -> Types.TIME
|
||||
is TimestampTypeWithTimezone -> Types.TIMESTAMP_WITH_TIMEZONE
|
||||
is TimestampTypeWithoutTimezone -> Types.TIMESTAMP
|
||||
is UnionType -> Types.LONGVARCHAR
|
||||
is UnknownType -> Types.LONGVARCHAR
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function that converts an [ObjectType] into a [SqlTable] that can be used to define a
|
||||
* SQL table.
|
||||
*
|
||||
* @param primaryKeys The list of configured primary key properties that should be treated as
|
||||
* primary keys in the generated [SqlTable]
|
||||
* @return The [SqlTable] that represents the table to be mapped to the stream represented by the
|
||||
* [ObjectType].
|
||||
*/
|
||||
fun ObjectType.toSqlTable(primaryKeys: List<List<String>>): SqlTable {
|
||||
val identifierFieldNames = primaryKeys.flatten().toSet()
|
||||
val sqlTypeConverter = AirbyteTypeToSqlType()
|
||||
val columns =
|
||||
this.properties.entries.map { (name, field) ->
|
||||
val isPrimaryKey = identifierFieldNames.contains(name)
|
||||
val isNullable = !isPrimaryKey && field.nullable
|
||||
SqlColumn(
|
||||
name = name,
|
||||
type = sqlTypeConverter.convert(field.type),
|
||||
isPrimaryKey = isPrimaryKey,
|
||||
isNullable = isNullable
|
||||
)
|
||||
}
|
||||
return SqlTable(columns = columns)
|
||||
}
|
||||
@@ -19,9 +19,6 @@ import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
|
||||
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
|
||||
import io.airbyte.cdk.load.data.UnknownValue
|
||||
import io.airbyte.cdk.load.util.serializeToJsonBytes
|
||||
import io.airbyte.integrations.destination.mssql.v2.model.SqlTable
|
||||
import io.airbyte.integrations.destination.mssql.v2.model.SqlTableRow
|
||||
import io.airbyte.integrations.destination.mssql.v2.model.SqlTableRowValue
|
||||
import java.sql.Date
|
||||
import java.sql.Time
|
||||
import java.sql.Timestamp
|
||||
@@ -60,33 +57,3 @@ class AirbyteValueToSqlValue {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function that converts an [ObjectValue] into a row of SQL values.
|
||||
*
|
||||
* @param sqlTable The [SqlTable] that contains data type information for each column. This is used
|
||||
* to filter the [ObjectValue]'s values to only those that exist in the table.
|
||||
* @return A [SqlTableRow] that contains values converted to their SQL data type equivalents from
|
||||
* the provided [ObjectValue].
|
||||
*/
|
||||
fun ObjectValue.toSqlValue(sqlTable: SqlTable): SqlTableRow {
|
||||
val converter = AirbyteValueToSqlValue()
|
||||
return SqlTableRow(
|
||||
values =
|
||||
this.values
|
||||
.filter { (name, _) -> sqlTable.columns.find { it.name == name } != null }
|
||||
.map { (name, value) ->
|
||||
val dataType = sqlTable.columns.find { it.name == name }!!.type
|
||||
val converted =
|
||||
when (value) {
|
||||
is ObjectValue ->
|
||||
(converter.convert(value) as LinkedHashMap<*, *>)
|
||||
.serializeToJsonBytes()
|
||||
is ArrayValue ->
|
||||
(converter.convert(value) as List<*>).serializeToJsonBytes()
|
||||
else -> converter.convert(value)
|
||||
}
|
||||
SqlTableRowValue(name = name, value = converted, type = dataType)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ import java.sql.Types
|
||||
|
||||
class AirbyteValueToStatement {
|
||||
companion object {
|
||||
private val toSqlType = AirbyteTypeToSqlType()
|
||||
private val toSqlType = AirbyteTypeToMssqlType()
|
||||
private val toSqlValue = AirbyteValueToSqlValue()
|
||||
private val valueCoercingMapper =
|
||||
AirbyteValueDeepCoercingMapper(
|
||||
@@ -67,7 +67,7 @@ class AirbyteValueToStatement {
|
||||
|
||||
fun PreparedStatement.setAsNullValue(idx: Int, type: AirbyteType) {
|
||||
val sqlType = toSqlType.convert(type)
|
||||
setNull(idx, sqlType)
|
||||
setNull(idx, sqlType.sqlType)
|
||||
}
|
||||
|
||||
private fun PreparedStatement.setAsBooleanValue(idx: Int, value: BooleanValue) {
|
||||
@@ -95,7 +95,7 @@ class AirbyteValueToStatement {
|
||||
value: StringValue,
|
||||
type: AirbyteType
|
||||
) {
|
||||
val sqlType = toSqlType.convert(type)
|
||||
val sqlType = toSqlType.convert(type).sqlType
|
||||
if (sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR) {
|
||||
setString(idx, value.value)
|
||||
} else {
|
||||
|
||||
@@ -1,37 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.mssql.v2.convert
|
||||
|
||||
import java.sql.Types
|
||||
|
||||
enum class MssqlType(val sqlType: Int, val sqlStringOverride: String? = null) {
|
||||
TEXT(Types.LONGVARCHAR),
|
||||
BIT(Types.BOOLEAN),
|
||||
DATE(Types.DATE),
|
||||
BIGINT(Types.BIGINT),
|
||||
DECIMAL(Types.DECIMAL, sqlStringOverride = "DECIMAL(18, 8)"),
|
||||
VARCHAR(Types.VARCHAR, sqlStringOverride = "VARCHAR(MAX)"),
|
||||
DATETIMEOFFSET(Types.TIMESTAMP_WITH_TIMEZONE),
|
||||
TIME(Types.TIME),
|
||||
DATETIME(Types.TIMESTAMP);
|
||||
|
||||
val sqlString: String = sqlStringOverride ?: name
|
||||
|
||||
companion object {
|
||||
val fromSqlType: Map<Int, MssqlType> =
|
||||
entries
|
||||
.associateByTo(mutableMapOf()) { it.sqlType }
|
||||
// Manually adding an extra mapping because we since represent both
|
||||
// sqlTypes TIMESTAMP_WITH_TIMEZONE and TIME_WITH_TIMEZONE as DATETIMEOFFSET
|
||||
// the auto generated reverse map is missing the nuance.
|
||||
.apply { this[Types.TIME_WITH_TIMEZONE] = DATETIMEOFFSET }
|
||||
.toMap()
|
||||
}
|
||||
}
|
||||
|
||||
class SqlTypeToMssqlType {
|
||||
fun convert(type: Int): MssqlType =
|
||||
MssqlType.fromSqlType.get(type) ?: throw IllegalArgumentException("type $type not found")
|
||||
}
|
||||
@@ -1,45 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.mssql.v2.model
|
||||
|
||||
import java.sql.Types
|
||||
|
||||
/**
|
||||
* Representation of a colum in a SQL table.
|
||||
*
|
||||
* @param name The name of the column
|
||||
* @param type The data type of the column (see [Types] for values).
|
||||
* @param isPrimaryKey Whether the column represents a primary key.
|
||||
* @param isNullable Whether the column's value supports null values.
|
||||
*/
|
||||
data class SqlColumn(
|
||||
val name: String,
|
||||
val type: Int,
|
||||
val isPrimaryKey: Boolean = false,
|
||||
val isNullable: Boolean = false
|
||||
)
|
||||
|
||||
/**
|
||||
* Representation of a SQL table.
|
||||
*
|
||||
* @param columns The list of columns in the table.
|
||||
*/
|
||||
data class SqlTable(val columns: List<SqlColumn>)
|
||||
|
||||
/**
|
||||
* Representation of a value in a SQL row/column cell.
|
||||
*
|
||||
* @param name The name of the column.
|
||||
* @param value The value of the row/column cell.
|
||||
* @param type The SQL type of the row/column cell (see [Types] for values).
|
||||
*/
|
||||
data class SqlTableRowValue(val name: String, val value: Any?, val type: Int)
|
||||
|
||||
/**
|
||||
* Representation of a row of values in a SQL table.
|
||||
*
|
||||
* @param values A list of values stored in the row.
|
||||
*/
|
||||
data class SqlTableRow(val values: List<SqlTableRowValue>)
|
||||
@@ -23,14 +23,12 @@ import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
|
||||
import io.airbyte.cdk.load.data.UnionType
|
||||
import io.airbyte.cdk.load.data.UnknownType
|
||||
import io.mockk.mockk
|
||||
import java.sql.Types
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.Assertions.assertNotNull
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
class AirbyteTypeToSqlTypeTest {
|
||||
class AirbyteTypeToMsqlTypeTest {
|
||||
|
||||
private val converter = AirbyteTypeToSqlType()
|
||||
private val converter = AirbyteTypeToMssqlType()
|
||||
|
||||
@Test
|
||||
fun testConvertObjectType() {
|
||||
@@ -42,136 +40,118 @@ class AirbyteTypeToSqlTypeTest {
|
||||
),
|
||||
)
|
||||
val result = converter.convert(objectType)
|
||||
assertEquals(Types.LONGVARCHAR, result)
|
||||
assertEquals(MssqlType.TEXT, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertArrayType() {
|
||||
val arrayType = ArrayType(FieldType(IntegerType, false))
|
||||
val result = converter.convert(arrayType)
|
||||
assertEquals(Types.LONGVARCHAR, result)
|
||||
assertEquals(MssqlType.TEXT, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertArrayTypeWithoutSchema() {
|
||||
val arrayType = ArrayTypeWithoutSchema
|
||||
val result = converter.convert(arrayType)
|
||||
assertEquals(Types.LONGVARCHAR, result)
|
||||
assertEquals(MssqlType.TEXT, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertBooleanType() {
|
||||
val booleanType = BooleanType
|
||||
val result = converter.convert(booleanType)
|
||||
assertEquals(Types.BOOLEAN, result)
|
||||
assertEquals(MssqlType.BIT, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertDateType() {
|
||||
val dateType = DateType
|
||||
val result = converter.convert(dateType)
|
||||
assertEquals(Types.DATE, result)
|
||||
assertEquals(MssqlType.DATE, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertIntegerType() {
|
||||
val integerType = IntegerType
|
||||
val result = converter.convert(integerType)
|
||||
assertEquals(Types.BIGINT, result)
|
||||
assertEquals(MssqlType.BIGINT, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertNumberType() {
|
||||
val numberType = NumberType
|
||||
val result = converter.convert(numberType)
|
||||
assertEquals(Types.DECIMAL, result)
|
||||
assertEquals(MssqlType.DECIMAL, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertObjectTypeWithEmptySchema() {
|
||||
val objectType = ObjectTypeWithEmptySchema
|
||||
val result = converter.convert(objectType)
|
||||
assertEquals(Types.LONGVARCHAR, result)
|
||||
assertEquals(MssqlType.TEXT, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertObjectTypeWithoutSchema() {
|
||||
val objectType = ObjectTypeWithoutSchema
|
||||
val result = converter.convert(objectType)
|
||||
assertEquals(Types.LONGVARCHAR, result)
|
||||
assertEquals(MssqlType.TEXT, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertStringType() {
|
||||
val stringType = StringType
|
||||
val result = converter.convert(stringType)
|
||||
assertEquals(Types.VARCHAR, result)
|
||||
assertEquals(MssqlType.VARCHAR, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertIndexedStringType() {
|
||||
val stringType = StringType
|
||||
val result = converter.convert(stringType, isIndexed = true)
|
||||
assertEquals(MssqlType.VARCHAR_INDEX, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertTimeTypeWithTimezone() {
|
||||
val timeType = TimeTypeWithTimezone
|
||||
val result = converter.convert(timeType)
|
||||
assertEquals(Types.TIME_WITH_TIMEZONE, result)
|
||||
assertEquals(MssqlType.DATETIMEOFFSET, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertTimeTypeWithoutTimezone() {
|
||||
val timeType = TimeTypeWithoutTimezone
|
||||
val result = converter.convert(timeType)
|
||||
assertEquals(Types.TIME, result)
|
||||
assertEquals(MssqlType.TIME, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertTimestampTypeWithTimezone() {
|
||||
val timestampType = TimestampTypeWithTimezone
|
||||
val result = converter.convert(timestampType)
|
||||
assertEquals(Types.TIMESTAMP_WITH_TIMEZONE, result)
|
||||
assertEquals(MssqlType.DATETIMEOFFSET, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertTimestampTypeWithoutTimezone() {
|
||||
val timestampType = TimestampTypeWithoutTimezone
|
||||
val result = converter.convert(timestampType)
|
||||
assertEquals(Types.TIMESTAMP, result)
|
||||
assertEquals(MssqlType.DATETIME, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertUnionType() {
|
||||
val unionType = UnionType(setOf(StringType, NumberType))
|
||||
val result = converter.convert(unionType)
|
||||
assertEquals(Types.LONGVARCHAR, result)
|
||||
assertEquals(MssqlType.TEXT, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvertUnknownType() {
|
||||
val unknownType = UnknownType(mockk<JsonNode>())
|
||||
val result = converter.convert(unknownType)
|
||||
assertEquals(Types.LONGVARCHAR, result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testToSqlTable() {
|
||||
val primaryKey = "id"
|
||||
val nullableColumn = "email"
|
||||
val objectType =
|
||||
ObjectType(
|
||||
linkedMapOf(
|
||||
primaryKey to FieldType(IntegerType, false),
|
||||
"age" to FieldType(IntegerType, false),
|
||||
nullableColumn to FieldType(StringType, true),
|
||||
),
|
||||
)
|
||||
val primaryKeys = listOf(listOf(primaryKey))
|
||||
val table = objectType.toSqlTable(primaryKeys = primaryKeys)
|
||||
|
||||
assertEquals(objectType.properties.size, table.columns.size)
|
||||
objectType.properties.forEach { (name, type) ->
|
||||
val column = table.columns.find { it.name == name }
|
||||
assertNotNull(column)
|
||||
assertEquals(converter.convert(type.type), column?.type)
|
||||
assertEquals(primaryKey == name, column?.isPrimaryKey)
|
||||
assertEquals(nullableColumn == name, column?.isNullable)
|
||||
}
|
||||
assertEquals(MssqlType.TEXT, result)
|
||||
}
|
||||
}
|
||||
@@ -16,14 +16,11 @@ import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
|
||||
import io.airbyte.cdk.load.data.UnknownValue
|
||||
import io.airbyte.cdk.load.util.Jsons
|
||||
import io.airbyte.cdk.load.util.serializeToJsonBytes
|
||||
import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn
|
||||
import io.airbyte.integrations.destination.mssql.v2.model.SqlTable
|
||||
import java.math.BigDecimal
|
||||
import java.math.BigInteger
|
||||
import java.sql.Date
|
||||
import java.sql.Time
|
||||
import java.sql.Timestamp
|
||||
import java.sql.Types
|
||||
import java.time.ZoneOffset
|
||||
import org.junit.jupiter.api.Assertions.assertArrayEquals
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
@@ -122,128 +119,6 @@ internal class AirbyteValueToSqlValueTest {
|
||||
assertArrayEquals(Jsons.writeValueAsBytes(unknownValue.value), result as ByteArray)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testToSqlValue() {
|
||||
val sqlTable =
|
||||
SqlTable(
|
||||
listOf(
|
||||
SqlColumn(
|
||||
name = "id",
|
||||
type = Types.INTEGER,
|
||||
isPrimaryKey = true,
|
||||
isNullable = false
|
||||
),
|
||||
SqlColumn(
|
||||
name = "name",
|
||||
type = Types.VARCHAR,
|
||||
isPrimaryKey = false,
|
||||
isNullable = true
|
||||
),
|
||||
SqlColumn(
|
||||
name = "meta",
|
||||
type = Types.BLOB,
|
||||
isPrimaryKey = false,
|
||||
isNullable = false
|
||||
),
|
||||
SqlColumn(
|
||||
name = "items",
|
||||
type = Types.BLOB,
|
||||
isPrimaryKey = false,
|
||||
isNullable = false
|
||||
)
|
||||
)
|
||||
)
|
||||
val objectValue =
|
||||
ObjectValue(
|
||||
linkedMapOf(
|
||||
"id" to IntegerValue(123L),
|
||||
"name" to StringValue("John Doe"),
|
||||
"meta" to
|
||||
ObjectValue(
|
||||
linkedMapOf(
|
||||
"sync_id" to IntegerValue(123L),
|
||||
"changes" to
|
||||
ObjectValue(
|
||||
linkedMapOf(
|
||||
"change" to StringValue("insert"),
|
||||
"reason" to StringValue("reason"),
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
"items" to ArrayValue(listOf(StringValue("item1"), StringValue("item2")))
|
||||
)
|
||||
)
|
||||
|
||||
val sqlValue = objectValue.toSqlValue(sqlTable)
|
||||
|
||||
assertEquals(sqlTable.columns.size, sqlValue.values.size)
|
||||
assertEquals(
|
||||
BigInteger::class.java,
|
||||
sqlValue.values.find { it.name == "id" }?.value?.javaClass
|
||||
)
|
||||
assertEquals(123.toBigInteger(), sqlValue.values.find { it.name == "id" }?.value)
|
||||
assertEquals(
|
||||
String::class.java,
|
||||
sqlValue.values.find { it.name == "name" }?.value?.javaClass
|
||||
)
|
||||
assertEquals("John Doe", sqlValue.values.find { it.name == "name" }?.value)
|
||||
assertEquals(
|
||||
ByteArray::class.java,
|
||||
sqlValue.values.find { it.name == "meta" }?.value?.javaClass
|
||||
)
|
||||
assertArrayEquals(
|
||||
mapOf(
|
||||
"sync_id" to 123.toBigInteger(),
|
||||
"changes" to
|
||||
mapOf(
|
||||
"change" to "insert",
|
||||
"reason" to "reason",
|
||||
)
|
||||
)
|
||||
.serializeToJsonBytes(),
|
||||
sqlValue.values.find { it.name == "meta" }?.value as ByteArray
|
||||
)
|
||||
assertEquals(
|
||||
ByteArray::class.java,
|
||||
sqlValue.values.find { it.name == "items" }?.value?.javaClass
|
||||
)
|
||||
assertArrayEquals(
|
||||
listOf("item1", "item2").serializeToJsonBytes(),
|
||||
sqlValue.values.find { it.name == "items" }?.value as ByteArray
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testToSqlValueIgnoresFieldsNotInTable() {
|
||||
val sqlTable =
|
||||
SqlTable(
|
||||
listOf(
|
||||
SqlColumn(
|
||||
name = "id",
|
||||
type = Types.INTEGER,
|
||||
isPrimaryKey = true,
|
||||
isNullable = false
|
||||
),
|
||||
)
|
||||
)
|
||||
val objectValue =
|
||||
ObjectValue(
|
||||
linkedMapOf(
|
||||
"id" to IntegerValue(123L),
|
||||
"name" to StringValue("Should be ignored"),
|
||||
)
|
||||
)
|
||||
|
||||
val sqlValue = objectValue.toSqlValue(sqlTable)
|
||||
assertEquals(sqlTable.columns.size, sqlValue.values.size)
|
||||
assertEquals(
|
||||
BigInteger::class.java,
|
||||
sqlValue.values.find { it.name == "id" }?.value?.javaClass
|
||||
)
|
||||
assertEquals(123.toBigInteger(), sqlValue.values.find { it.name == "id" }?.value)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testObjectMapToJsonBytes() {
|
||||
val objectValue =
|
||||
|
||||
@@ -13,6 +13,7 @@ This connector is in early access, and SHOULD NOT be used for production workloa
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------|
|
||||
| 0.1.10 | 2025-02-20 | [54186](https://github.com/airbytehq/airbyte/pull/54186) | RC8: Fix String support. |
|
||||
| 0.1.9 | 2025-02-11 | [53364](https://github.com/airbytehq/airbyte/pull/53364) | RC7: Revert deletion change. |
|
||||
| 0.1.8 | 2025-02-11 | [53364](https://github.com/airbytehq/airbyte/pull/53364) | RC6: Break up deletes into loop to reduce locking. |
|
||||
| 0.1.7 | 2025-02-07 | [53236](https://github.com/airbytehq/airbyte/pull/53236) | RC5: Use rowlock hint. |
|
||||
|
||||
Reference in New Issue
Block a user