[Destination MSSQL] RC2 (#52704)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
@@ -134,16 +134,19 @@ abstract class BasicPerformanceTest(
|
||||
|
||||
@Test
|
||||
open fun testInsertRecords() {
|
||||
testInsertRecords(null)
|
||||
testInsertRecords(validation = null)
|
||||
}
|
||||
|
||||
protected fun testInsertRecords(validation: ValidationFunction?) {
|
||||
protected fun testInsertRecords(
|
||||
recordsToInsert: Long? = null,
|
||||
validation: ValidationFunction?,
|
||||
) {
|
||||
runSync(
|
||||
testScenario =
|
||||
SingleStreamInsert(
|
||||
idColumn = idColumn,
|
||||
columns = twoStringColumns,
|
||||
recordsToInsert = defaultRecordsToInsert,
|
||||
recordsToInsert = recordsToInsert ?: defaultRecordsToInsert,
|
||||
randomizedNamespace = randomizedNamespace,
|
||||
streamName = testInfo.testMethod.get().name,
|
||||
),
|
||||
|
||||
@@ -16,18 +16,13 @@ data:
|
||||
type: GSM
|
||||
connectorType: destination
|
||||
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
|
||||
dockerImageTag: 0.1.3
|
||||
dockerImageTag: 0.1.4
|
||||
dockerRepository: airbyte/destination-mssql-v2
|
||||
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql-v2
|
||||
githubIssueLabel: destination-mssql-v2
|
||||
icon: icon.svg
|
||||
license: ELv2
|
||||
name: MSSQL V2 Destination
|
||||
registryOverrides:
|
||||
cloud:
|
||||
enabled: false
|
||||
oss:
|
||||
enabled: false
|
||||
releaseStage: alpha
|
||||
supportLevel: community
|
||||
supportsRefreshes: true
|
||||
|
||||
@@ -20,6 +20,7 @@ class MSSQLStreamLoader(
|
||||
override val stream: DestinationStream,
|
||||
private val sqlBuilder: MSSQLQueryBuilder,
|
||||
) : StreamLoader {
|
||||
private val recordCommitBatchSize = 1000
|
||||
|
||||
override suspend fun start() {
|
||||
ensureTableExists(dataSource)
|
||||
@@ -38,16 +39,22 @@ class MSSQLStreamLoader(
|
||||
endOfStream: Boolean
|
||||
): Batch {
|
||||
dataSource.connection.use { connection ->
|
||||
connection.autoCommit = false
|
||||
sqlBuilder.getFinalTableInsertColumnHeader().executeUpdate(connection) { statement ->
|
||||
records.forEach { record ->
|
||||
sqlBuilder.populateStatement(statement, record, sqlBuilder.finalTableSchema)
|
||||
records.withIndex().forEach { r ->
|
||||
sqlBuilder.populateStatement(statement, r.value, sqlBuilder.finalTableSchema)
|
||||
statement.addBatch()
|
||||
if (r.index % recordCommitBatchSize == 0) {
|
||||
statement.executeBatch()
|
||||
connection.commit()
|
||||
}
|
||||
}
|
||||
statement.executeLargeBatch()
|
||||
statement.executeBatch()
|
||||
}
|
||||
if (sqlBuilder.hasCdc) {
|
||||
sqlBuilder.deleteCdc(connection)
|
||||
}
|
||||
connection.commit()
|
||||
}
|
||||
return SimpleBatch(Batch.State.COMPLETE)
|
||||
}
|
||||
|
||||
@@ -10,11 +10,9 @@ import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper
|
||||
import io.airbyte.cdk.load.data.ArrayValue
|
||||
import io.airbyte.cdk.load.data.BooleanValue
|
||||
import io.airbyte.cdk.load.data.DateValue
|
||||
import io.airbyte.cdk.load.data.FieldType
|
||||
import io.airbyte.cdk.load.data.IntegerValue
|
||||
import io.airbyte.cdk.load.data.NullValue
|
||||
import io.airbyte.cdk.load.data.NumberValue
|
||||
import io.airbyte.cdk.load.data.ObjectType
|
||||
import io.airbyte.cdk.load.data.ObjectValue
|
||||
import io.airbyte.cdk.load.data.StringValue
|
||||
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
|
||||
@@ -48,8 +46,7 @@ class AirbyteValueToStatement {
|
||||
field: MSSQLQueryBuilder.NamedField
|
||||
) {
|
||||
if (value != null && value !is NullValue && field.type.type is UnionType) {
|
||||
val objectValue = createUnionObject(field.type.type as UnionType, value)
|
||||
setAsJsonString(idx, objectValue)
|
||||
setAsJsonString(idx, value)
|
||||
} else {
|
||||
when (value) {
|
||||
is ArrayValue -> setAsJsonString(idx, value)
|
||||
@@ -164,60 +161,5 @@ class AirbyteValueToStatement {
|
||||
) {
|
||||
setTimestamp(idx, Timestamp.valueOf(value.value))
|
||||
}
|
||||
|
||||
private fun createSimpleUnionObject(value: AirbyteValue): ObjectValue {
|
||||
val unionType = value.toTypeName()
|
||||
return ObjectValue.from(mapOf("type" to StringValue(unionType), unionType to value))
|
||||
}
|
||||
|
||||
private fun createUnionObject(type: UnionType, value: AirbyteValue): AirbyteValue =
|
||||
if (type.options.all { it is ObjectType }) {
|
||||
val model =
|
||||
mutableMapOf<String, MutableSet<FieldType>>().withDefault { mutableSetOf() }
|
||||
|
||||
type.options.map {
|
||||
(it as ObjectType).properties.entries.forEach { objectEntry ->
|
||||
if (model.containsKey(objectEntry.key)) {
|
||||
model[objectEntry.key]!! += objectEntry.value
|
||||
} else {
|
||||
model[objectEntry.key] = mutableSetOf(objectEntry.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (model.values.all { it.size == 1 }) {
|
||||
value
|
||||
} else {
|
||||
val valuesWithConflicts =
|
||||
(value as ObjectValue)
|
||||
.values
|
||||
.entries
|
||||
.map { pair ->
|
||||
if (model[pair.key]?.let { it.size > 1 } == true)
|
||||
Pair(pair.key, createSimpleUnionObject(pair.value))
|
||||
else Pair(pair.key, pair.value)
|
||||
}
|
||||
.toMap()
|
||||
ObjectValue.from(valuesWithConflicts)
|
||||
}
|
||||
} else {
|
||||
createSimpleUnionObject(value)
|
||||
}
|
||||
|
||||
private fun AirbyteValue.toTypeName(): String =
|
||||
when (this) {
|
||||
is ArrayValue -> "array"
|
||||
is BooleanValue -> "boolean"
|
||||
is DateValue -> "string"
|
||||
is IntegerValue -> "integer"
|
||||
NullValue -> "null"
|
||||
is NumberValue -> "number"
|
||||
is ObjectValue -> "object"
|
||||
is StringValue -> "string"
|
||||
is TimeWithTimezoneValue -> "string"
|
||||
is TimeWithoutTimezoneValue -> "string"
|
||||
is TimestampWithTimezoneValue -> "string"
|
||||
is TimestampWithoutTimezoneValue -> "string"
|
||||
is UnknownValue -> "oneOf"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ package io.airbyte.integrations.destination.mssql.v2.convert
|
||||
import io.airbyte.cdk.load.data.AirbyteValue
|
||||
import io.airbyte.cdk.load.data.ArrayType
|
||||
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
|
||||
import io.airbyte.cdk.load.data.ArrayValue
|
||||
import io.airbyte.cdk.load.data.BooleanType
|
||||
import io.airbyte.cdk.load.data.BooleanValue
|
||||
import io.airbyte.cdk.load.data.DateType
|
||||
@@ -20,7 +19,6 @@ import io.airbyte.cdk.load.data.NumberValue
|
||||
import io.airbyte.cdk.load.data.ObjectType
|
||||
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
|
||||
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
|
||||
import io.airbyte.cdk.load.data.ObjectValue
|
||||
import io.airbyte.cdk.load.data.StringType
|
||||
import io.airbyte.cdk.load.data.StringValue
|
||||
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
|
||||
@@ -35,7 +33,6 @@ import io.airbyte.cdk.load.data.UnionType
|
||||
import io.airbyte.cdk.load.data.UnknownType
|
||||
import io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilder
|
||||
import io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilder.NamedValue
|
||||
import io.airbyte.protocol.models.Jsons
|
||||
import java.sql.ResultSet
|
||||
import java.sql.Timestamp
|
||||
import java.time.LocalTime
|
||||
@@ -47,28 +44,23 @@ class ResultSetToAirbyteValue {
|
||||
fun ResultSet.getAirbyteNamedValue(field: MSSQLQueryBuilder.NamedField): NamedValue =
|
||||
when (field.type.type) {
|
||||
is StringType -> getStringValue(field.name)
|
||||
is ArrayType -> getArrayValue(field.name)
|
||||
ArrayTypeWithoutSchema -> getArrayValue(field.name)
|
||||
is ArrayType -> getStringValue(field.name)
|
||||
ArrayTypeWithoutSchema -> getStringValue(field.name)
|
||||
BooleanType -> getBooleanValue(field.name)
|
||||
DateType -> getDateValue(field.name)
|
||||
IntegerType -> getIntegerValue(field.name)
|
||||
NumberType -> getNumberValue(field.name)
|
||||
is ObjectType -> getObjectValue(field.name)
|
||||
ObjectTypeWithEmptySchema -> getObjectValue(field.name)
|
||||
ObjectTypeWithoutSchema -> getObjectValue(field.name)
|
||||
is ObjectType -> getStringValue(field.name)
|
||||
ObjectTypeWithEmptySchema -> getStringValue(field.name)
|
||||
ObjectTypeWithoutSchema -> getStringValue(field.name)
|
||||
TimeTypeWithTimezone -> getTimeWithTimezoneValue(field.name)
|
||||
TimeTypeWithoutTimezone -> getTimeWithoutTimezoneValue(field.name)
|
||||
TimestampTypeWithTimezone -> getTimestampWithTimezoneValue(field.name)
|
||||
TimestampTypeWithoutTimezone -> getTimestampWithoutTimezoneValue(field.name)
|
||||
is UnionType -> getObjectValue(field.name)
|
||||
is UnionType -> getStringValue(field.name)
|
||||
is UnknownType -> getStringValue(field.name)
|
||||
}
|
||||
|
||||
private fun ResultSet.getArrayValue(name: String): NamedValue =
|
||||
getNullable(name, this::getString)
|
||||
?.let { ArrayValue.from(deserialize<List<Any?>>(it)) }
|
||||
.toNamedValue(name)
|
||||
|
||||
private fun ResultSet.getBooleanValue(name: String): NamedValue =
|
||||
getNullable(name, this::getBoolean)?.let { BooleanValue(it) }.toNamedValue(name)
|
||||
|
||||
@@ -83,11 +75,6 @@ class ResultSetToAirbyteValue {
|
||||
?.let { NumberValue(it.toBigDecimal()) }
|
||||
.toNamedValue(name)
|
||||
|
||||
private fun ResultSet.getObjectValue(name: String): NamedValue =
|
||||
getNullable(name, this::getString)
|
||||
?.let { ObjectValue.from(deserialize<Map<String, Any?>>(it)) }
|
||||
.toNamedValue(name)
|
||||
|
||||
private fun ResultSet.getStringValue(name: String): NamedValue =
|
||||
getNullable(name, this::getString)?.let { StringValue(it) }.toNamedValue(name)
|
||||
|
||||
@@ -111,9 +98,6 @@ class ResultSetToAirbyteValue {
|
||||
return if (wasNull()) null else value
|
||||
}
|
||||
|
||||
private inline fun <reified T> deserialize(value: String): T =
|
||||
Jsons.deserialize(value, T::class.java)
|
||||
|
||||
internal fun String.toTimeWithTimezone(): TimeWithTimezoneValue =
|
||||
TimeWithTimezoneValue(
|
||||
OffsetDateTime.parse(
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.mssql.v2
|
||||
|
||||
import io.airbyte.cdk.command.ConfigurationSpecification
|
||||
import io.airbyte.cdk.load.command.DestinationStream
|
||||
import io.airbyte.cdk.load.write.BasicPerformanceTest
|
||||
import io.airbyte.cdk.load.write.DataValidator
|
||||
import io.airbyte.integrations.destination.mssql.v2.config.DataSourceFactory
|
||||
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration
|
||||
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfigurationFactory
|
||||
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification
|
||||
import java.nio.file.Files
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
class MSSQLDataValidator : DataValidator {
|
||||
override fun count(spec: ConfigurationSpecification, stream: DestinationStream): Long? {
|
||||
val config = getConfiguration(spec = spec as MSSQLSpecification, stream = stream)
|
||||
val sqlBuilder = MSSQLQueryBuilder(config, stream)
|
||||
val dataSource = DataSourceFactory().dataSource(config)
|
||||
|
||||
return dataSource.connection.use { connection ->
|
||||
COUNT_FROM.toQuery(sqlBuilder.outputSchema, sqlBuilder.tableName).executeQuery(
|
||||
connection
|
||||
) { rs ->
|
||||
while (rs.next()) {
|
||||
return@executeQuery rs.getLong(1)
|
||||
}
|
||||
return@executeQuery null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun getConfiguration(
|
||||
spec: ConfigurationSpecification,
|
||||
stream: DestinationStream
|
||||
): MSSQLConfiguration {
|
||||
/*
|
||||
* Replace the host, port and schema to match what is exposed
|
||||
* by the container and generated by the test suite in the case of the schema name
|
||||
*/
|
||||
val configOverrides =
|
||||
mutableMapOf("host" to MSSQLContainerHelper.getHost()).apply {
|
||||
MSSQLContainerHelper.getPort()?.let { port -> put("port", port.toString()) }
|
||||
stream.descriptor.namespace?.let { schema -> put("schema", schema) }
|
||||
}
|
||||
return MSSQLConfigurationFactory()
|
||||
.makeWithOverrides(spec = spec as MSSQLSpecification, overrides = configOverrides)
|
||||
}
|
||||
}
|
||||
|
||||
class MSSQLPerformanceTest :
|
||||
BasicPerformanceTest(
|
||||
configContents = Files.readString(MSSQLTestConfigUtil.getConfigPath("check/valid.json")),
|
||||
configSpecClass = MSSQLSpecification::class.java,
|
||||
configUpdater = MSSQLConfigUpdater(),
|
||||
dataValidator = MSSQLDataValidator(),
|
||||
defaultRecordsToInsert = 10000,
|
||||
) {
|
||||
@Test
|
||||
override fun testInsertRecords() {
|
||||
testInsertRecords(recordsToInsert = 100000) {}
|
||||
}
|
||||
@Test
|
||||
override fun testInsertRecordsWithDedup() {
|
||||
testInsertRecordsWithDedup { perfSummary ->
|
||||
perfSummary.map { streamSummary ->
|
||||
assertEquals(streamSummary.expectedRecordCount, streamSummary.recordCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
@JvmStatic
|
||||
@BeforeAll
|
||||
fun beforeAll() {
|
||||
MSSQLContainerHelper.start()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -41,16 +41,16 @@ abstract class MSSQLWriterTest(
|
||||
destinationCleaner = dataCleaner,
|
||||
isStreamSchemaRetroactive = true,
|
||||
supportsDedup = true,
|
||||
stringifySchemalessObjects = false,
|
||||
stringifySchemalessObjects = true,
|
||||
preserveUndeclaredFields = false,
|
||||
commitDataIncrementally = true,
|
||||
allTypesBehavior = StronglyTyped(integerCanBeLarge = false),
|
||||
nullEqualsUnset = true,
|
||||
supportFileTransfer = false,
|
||||
configUpdater = MSSQLConfigUpdater(),
|
||||
schematizedArrayBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE,
|
||||
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
|
||||
unionBehavior = UnionBehavior.PROMOTE_TO_OBJECT,
|
||||
schematizedArrayBehavior = SchematizedNestedValueBehavior.STRINGIFY,
|
||||
schematizedObjectBehavior = SchematizedNestedValueBehavior.STRINGIFY,
|
||||
unionBehavior = UnionBehavior.STRINGIFY,
|
||||
nullUnknownTypes = false,
|
||||
)
|
||||
|
||||
|
||||
@@ -11,11 +11,12 @@ This connector is in early access, and SHOULD NOT be used for production workloa
|
||||
<details>
|
||||
<summary>Expand to review</summary>
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------| :--------------------------------------------------------- |:---------------|
|
||||
| 0.1.3 | 2025-01-24 | [52096](https://github.com/airbytehq/airbyte/pull/52096) | Release candidate |
|
||||
| 0.1.2 | 2025-01-10 | [51508](https://github.com/airbytehq/airbyte/pull/51508) | Use a non root base image |
|
||||
| 0.1.1 | 2024-12-18 | [49870](https://github.com/airbytehq/airbyte/pull/49870) | Use a base image: airbyte/java-connector-base:1.0.0 |
|
||||
| 0.1.0 | 2024-12-16 | [\#49460](https://github.com/airbytehq/airbyte/pull/49460) | Initial commit |
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------|
|
||||
| 0.1.4 | 2025-02-04 | [52704](https://github.com/airbytehq/airbyte/pull/52704) | RC2: Performance improvement |
|
||||
| 0.1.3 | 2025-01-24 | [52096](https://github.com/airbytehq/airbyte/pull/52096) | Release candidate |
|
||||
| 0.1.2 | 2025-01-10 | [51508](https://github.com/airbytehq/airbyte/pull/51508) | Use a non root base image |
|
||||
| 0.1.1 | 2024-12-18 | [49870](https://github.com/airbytehq/airbyte/pull/49870) | Use a base image: airbyte/java-connector-base:1.0.0 |
|
||||
| 0.1.0 | 2024-12-16 | [\#49460](https://github.com/airbytehq/airbyte/pull/49460) | Initial commit |
|
||||
|
||||
</details>
|
||||
|
||||
Reference in New Issue
Block a user