1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Destination MSSQL: use new typing interface (#55849)

Co-authored-by: Francis Genet <francis.genet@airbyte.io>
This commit is contained in:
Edward Gao
2025-03-24 11:50:14 -07:00
committed by GitHub
parent 2401c51e6f
commit 2b2991383d
36 changed files with 579 additions and 855 deletions

View File

@@ -11,7 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.test.util.OutputRecord
@@ -38,7 +38,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
override val groupId: String? = null
}
data class LocalBatch(val records: List<DestinationRecordAirbyteValue>) : MockBatch() {
data class LocalBatch(val records: List<DestinationRecordRaw>) : MockBatch() {
override val state = Batch.State.STAGED
}
data class LocalFileBatch(val file: DestinationFile) : MockBatch() {
@@ -72,7 +72,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
}
override suspend fun processRecords(
records: Iterator<DestinationRecordAirbyteValue>,
records: Iterator<DestinationRecordRaw>,
totalSizeBytes: Long,
endOfStream: Boolean
): Batch {
@@ -84,16 +84,17 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
is LocalBatch -> {
log.info { "Persisting ${batch.records.size} records for ${stream.descriptor}" }
batch.records.forEach {
val recordAirbyteValue = it.asDestinationRecordAirbyteValue()
val filename = getFilename(it.stream.descriptor, staging = true)
val record =
OutputRecord(
UUID.randomUUID(),
Instant.ofEpochMilli(it.emittedAtMs),
Instant.ofEpochMilli(recordAirbyteValue.emittedAtMs),
Instant.ofEpochMilli(System.currentTimeMillis()),
stream.generationId,
it.data as ObjectValue,
recordAirbyteValue.data as ObjectValue,
OutputRecord.Meta(
changes = it.meta?.changes ?: listOf(),
changes = recordAirbyteValue.meta?.changes ?: listOf(),
syncId = stream.syncId
),
)

View File

@@ -159,16 +159,17 @@ private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
/**
* Represents an "enriched" (/augmented) Airbyte value with additional metadata.
*
* @property value The actual [AirbyteValue]
* @property abValue The actual [AirbyteValue]
* @property type The type ([AirbyteType]) of the [AirbyteValue]
* @property changes List of [Meta.Change]s that have been applied to this value
* @property name Field name
*/
class EnrichedAirbyteValue(
var value: AirbyteValue,
var abValue: AirbyteValue,
val type: AirbyteType,
val name: String,
val changes: MutableList<Meta.Change> = mutableListOf()
val changes: MutableList<Meta.Change> = mutableListOf(),
val airbyteMetaField: Meta.AirbyteMetaFields?,
) {
init {
require(name.isNotBlank()) { "Field name cannot be blank" }
@@ -182,7 +183,7 @@ class EnrichedAirbyteValue(
fun nullify(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR) {
val nullChange = Meta.Change(field = name, change = Change.NULLED, reason = reason)
value = NullValue
abValue = NullValue
changes.add(nullChange)
}
@@ -199,7 +200,7 @@ class EnrichedAirbyteValue(
val truncateChange = Meta.Change(field = name, change = Change.TRUNCATED, reason = reason)
// Return a copy with null value and the new change added to the changes list
value = newValue
abValue = newValue
changes.add(truncateChange)
}
}

View File

@@ -23,6 +23,7 @@ import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.json.toAirbyteValue
import io.airbyte.cdk.load.data.toAirbyteValues
@@ -225,9 +226,37 @@ data class EnrichedDestinationRecordAirbyteValue(
val declaredFields: Map<String, EnrichedAirbyteValue>,
val undeclaredFields: Map<String, JsonNode>,
val emittedAtMs: Long,
/**
* The airbyte_meta field as received by the destination connector. Note that this field is NOT
* updated by [EnrichedAirbyteValue.nullify] / [EnrichedAirbyteValue.truncate].
*
* If you want an up-to-date view of airbyte_meta, including any changes that were done to the
* values within the destination connector, you should use [airbyteMeta].
*/
val meta: Meta?,
val serializedSizeBytes: Long = 0L,
) {
val airbyteMeta: EnrichedAirbyteValue
get() =
EnrichedAirbyteValue(
ObjectValue(
linkedMapOf(
"sync_id" to IntegerValue(stream.syncId),
"changes" to
ArrayValue(
(meta?.changes?.toAirbyteValues()
?: emptyList()) +
declaredFields
.map { it.value.changes.toAirbyteValues() }
.flatten(),
),
),
),
Meta.AirbyteMetaFields.META.type,
name = Meta.COLUMN_NAME_AB_META,
airbyteMetaField = Meta.AirbyteMetaFields.META,
)
val airbyteMetaFields: Map<String, EnrichedAirbyteValue>
get() =
mapOf(
@@ -236,36 +265,22 @@ data class EnrichedDestinationRecordAirbyteValue(
StringValue(UUID.randomUUID().toString()),
Meta.AirbyteMetaFields.RAW_ID.type,
name = Meta.COLUMN_NAME_AB_RAW_ID,
airbyteMetaField = Meta.AirbyteMetaFields.RAW_ID,
),
Meta.COLUMN_NAME_AB_EXTRACTED_AT to
EnrichedAirbyteValue(
IntegerValue(emittedAtMs),
Meta.AirbyteMetaFields.EXTRACTED_AT.type,
name = Meta.COLUMN_NAME_AB_EXTRACTED_AT,
airbyteMetaField = Meta.AirbyteMetaFields.EXTRACTED_AT,
),
Meta.COLUMN_NAME_AB_META to
EnrichedAirbyteValue(
ObjectValue(
linkedMapOf(
"sync_id" to IntegerValue(stream.syncId),
"changes" to
ArrayValue(
(meta?.changes?.toAirbyteValues()
?: emptyList()) +
declaredFields
.map { it.value.changes.toAirbyteValues() }
.flatten()
)
)
),
Meta.AirbyteMetaFields.META.type,
name = Meta.COLUMN_NAME_AB_META,
),
Meta.COLUMN_NAME_AB_META to airbyteMeta,
Meta.COLUMN_NAME_AB_GENERATION_ID to
EnrichedAirbyteValue(
IntegerValue(stream.generationId),
Meta.AirbyteMetaFields.GENERATION_ID.type,
name = Meta.COLUMN_NAME_AB_GENERATION_ID,
airbyteMetaField = Meta.AirbyteMetaFields.GENERATION_ID,
),
)
@@ -296,6 +311,13 @@ data class DestinationRecordRaw(
)
}
/**
* Convert this record to an EnrichedRecord. Crucially, after this conversion, all entries in
* [EnrichedDestinationRecordAirbyteValue.allTypedFields] are guaranteed to have
* [EnrichedAirbyteValue.abValue] either be [NullValue], or match [EnrichedAirbyteValue.type]
* (e.g. if `type` is [TimestampTypeWithTimezone], then `value` is either `NullValue`, or
* [TimestampWithTimezoneValue]).
*/
fun asEnrichedDestinationRecordAirbyteValue(): EnrichedDestinationRecordAirbyteValue {
val rawJson = asRawJson()
@@ -322,12 +344,13 @@ data class DestinationRecordRaw(
val enrichedValue =
EnrichedAirbyteValue(
value = NullValue,
abValue = NullValue,
type = fieldType,
name = fieldName,
airbyteMetaField = null,
)
AirbyteValueCoercer.coerce(fieldValue.toAirbyteValue(), fieldType)?.let {
enrichedValue.value = it
enrichedValue.abValue = it
}
?: enrichedValue.nullify(Reason.DESTINATION_SERIALIZATION_ERROR)

View File

@@ -10,7 +10,7 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.DestinationRecordStreamComplete
import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete
import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
@@ -75,7 +75,7 @@ class DefaultProcessRecordsTask(
file.localFile.inputStream().use {
val records =
if (file.isEmpty) {
emptyList<DestinationRecordAirbyteValue>().listIterator()
emptyList<DestinationRecordRaw>().listIterator()
} else {
it.toRecordIterator()
}
@@ -97,7 +97,7 @@ class DefaultProcessRecordsTask(
accumulators.forEach { (streamDescriptor, acc) ->
val finalBatch =
acc.processRecords(
emptyList<DestinationRecordAirbyteValue>().listIterator(),
emptyList<DestinationRecordRaw>().listIterator(),
0,
true
)
@@ -122,7 +122,7 @@ class DefaultProcessRecordsTask(
}
}
private fun InputStream.toRecordIterator(): Iterator<DestinationRecordAirbyteValue> {
private fun InputStream.toRecordIterator(): Iterator<DestinationRecordRaw> {
return lineSequence()
.map {
when (val message = deserializer.deserialize(it)) {
@@ -136,7 +136,7 @@ class DefaultProcessRecordsTask(
.takeWhile {
it !is DestinationRecordStreamComplete && it !is DestinationRecordStreamIncomplete
}
.map { (it as DestinationRecord).asRecordMarshaledToAirbyteValue() }
.map { (it as DestinationRecord).asDestinationRecordRaw() }
.iterator()
}
}

View File

@@ -8,7 +8,7 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamProcessingFailed
@@ -61,7 +61,7 @@ interface StreamLoader : BatchAccumulator, FileBatchAccumulator {
interface BatchAccumulator {
suspend fun processRecords(
records: Iterator<DestinationRecordAirbyteValue>,
records: Iterator<DestinationRecordRaw>,
totalSizeBytes: Long,
endOfStream: Boolean = false
): Batch =

View File

@@ -17,10 +17,10 @@ class EnrichedAirbyteValueTest {
val type = StringType
val name = "testField"
val enriched = EnrichedAirbyteValue(initialValue, type, name)
val enriched = EnrichedAirbyteValue(initialValue, type, name, airbyteMetaField = null)
enriched.nullify(Reason.DESTINATION_SERIALIZATION_ERROR)
assertEquals(NullValue, enriched.value)
assertEquals(NullValue, enriched.abValue)
assertEquals(1, enriched.changes.size)
val change = enriched.changes[0]
@@ -35,10 +35,10 @@ class EnrichedAirbyteValueTest {
val type = IntegerType
val name = "testField"
val enriched = EnrichedAirbyteValue(initialValue, type, name)
val enriched = EnrichedAirbyteValue(initialValue, type, name, airbyteMetaField = null)
enriched.nullify()
assertEquals(NullValue, enriched.value)
assertEquals(NullValue, enriched.abValue)
assertEquals(1, enriched.changes.size)
val change = enriched.changes[0]
@@ -53,10 +53,10 @@ class EnrichedAirbyteValueTest {
val type = BooleanType
val name = "testField"
val enriched = EnrichedAirbyteValue(initialValue, type, name)
val enriched = EnrichedAirbyteValue(initialValue, type, name, airbyteMetaField = null)
enriched.nullify(Reason.DESTINATION_FIELD_SIZE_LIMITATION)
assertEquals(NullValue, enriched.value)
assertEquals(NullValue, enriched.abValue)
assertEquals(1, enriched.changes.size)
val change = enriched.changes[0]
@@ -72,10 +72,10 @@ class EnrichedAirbyteValueTest {
val name = "testField"
val truncatedValue = StringValue("This is a very...")
val enriched = EnrichedAirbyteValue(initialValue, type, name)
val enriched = EnrichedAirbyteValue(initialValue, type, name, airbyteMetaField = null)
enriched.truncate(Reason.DESTINATION_RECORD_SIZE_LIMITATION, truncatedValue)
assertEquals(truncatedValue, enriched.value)
assertEquals(truncatedValue, enriched.abValue)
assertEquals(1, enriched.changes.size)
val change = enriched.changes[0]
@@ -91,10 +91,10 @@ class EnrichedAirbyteValueTest {
val name = "testField"
val truncatedValue = StringValue("This is a very...")
val enriched = EnrichedAirbyteValue(initialValue, type, name)
val enriched = EnrichedAirbyteValue(initialValue, type, name, airbyteMetaField = null)
enriched.truncate(newValue = truncatedValue)
assertEquals(truncatedValue, enriched.value)
assertEquals(truncatedValue, enriched.abValue)
assertEquals(1, enriched.changes.size)
val change = enriched.changes[0]
@@ -110,10 +110,10 @@ class EnrichedAirbyteValueTest {
val name = "testField"
val truncatedValue = StringValue("This is a very...")
val enriched = EnrichedAirbyteValue(initialValue, type, name)
val enriched = EnrichedAirbyteValue(initialValue, type, name, airbyteMetaField = null)
enriched.truncate(Reason.DESTINATION_SERIALIZATION_ERROR, truncatedValue)
assertEquals(truncatedValue, enriched.value)
assertEquals(truncatedValue, enriched.abValue)
assertEquals(1, enriched.changes.size)
val change = enriched.changes[0]
@@ -128,7 +128,7 @@ class EnrichedAirbyteValueTest {
val type = StringType
val name = "testField"
val enriched = EnrichedAirbyteValue(initialValue, type, name)
val enriched = EnrichedAirbyteValue(initialValue, type, name, airbyteMetaField = null)
// First change - truncate
val truncatedValue = StringValue("Init...")
@@ -138,7 +138,7 @@ class EnrichedAirbyteValueTest {
enriched.nullify(Reason.DESTINATION_SERIALIZATION_ERROR)
// Verify final state
assertEquals(NullValue, enriched.value)
assertEquals(NullValue, enriched.abValue)
assertEquals(2, enriched.changes.size)
// First change

View File

@@ -51,27 +51,27 @@ class EnrichedDestinationRecordAirbyteValueTest {
// Check the types of meta fields
val rawIdField = metaFields[Meta.COLUMN_NAME_AB_RAW_ID]!!
assertTrue(rawIdField.value is StringValue)
assertTrue(rawIdField.abValue is StringValue)
assertEquals(Meta.AirbyteMetaFields.RAW_ID.type, rawIdField.type)
val extractedAtField = metaFields[Meta.COLUMN_NAME_AB_EXTRACTED_AT]!!
assertTrue(extractedAtField.value is IntegerValue)
assertEquals(emittedAtMs, (extractedAtField.value as IntegerValue).value.toLong())
assertTrue(extractedAtField.abValue is IntegerValue)
assertEquals(emittedAtMs, (extractedAtField.abValue as IntegerValue).value.toLong())
assertEquals(Meta.AirbyteMetaFields.EXTRACTED_AT.type, extractedAtField.type)
val metaField = metaFields[Meta.COLUMN_NAME_AB_META]!!
assertTrue(metaField.value is ObjectValue)
val metaObj = metaField.value as ObjectValue
assertTrue(metaField.abValue is ObjectValue)
val metaObj = metaField.abValue as ObjectValue
assertEquals(2, metaObj.values.size)
assertEquals(IntegerValue(destinationStream.syncId), metaObj.values["sync_id"])
assertTrue(metaObj.values["changes"] is ArrayValue)
assertEquals(Meta.AirbyteMetaFields.META.type, metaField.type)
val generationIdField = metaFields[Meta.COLUMN_NAME_AB_GENERATION_ID]!!
assertTrue(generationIdField.value is IntegerValue)
assertTrue(generationIdField.abValue is IntegerValue)
assertEquals(
destinationStream.generationId,
(generationIdField.value as IntegerValue).value.toLong()
(generationIdField.abValue as IntegerValue).value.toLong()
)
assertEquals(Meta.AirbyteMetaFields.GENERATION_ID.type, generationIdField.type)
}
@@ -80,8 +80,20 @@ class EnrichedDestinationRecordAirbyteValueTest {
fun `test allTypedFields property`() {
val declaredFields =
mapOf(
"field1" to EnrichedAirbyteValue(StringValue("value1"), StringType, "field1"),
"field2" to EnrichedAirbyteValue(IntegerValue(42), IntegerType, "field2")
"field1" to
EnrichedAirbyteValue(
StringValue("value1"),
StringType,
"field1",
airbyteMetaField = null
),
"field2" to
EnrichedAirbyteValue(
IntegerValue(42),
IntegerType,
"field2",
airbyteMetaField = null
)
)
val record =
@@ -106,7 +118,7 @@ class EnrichedDestinationRecordAirbyteValueTest {
assertEquals(value.type, allFields[key]?.type)
// Don't compare value directly for RAW_ID as it generates a random UUID
if (key != Meta.COLUMN_NAME_AB_RAW_ID) {
assertEquals(value.value::class, allFields[key]?.value!!::class)
assertEquals(value.abValue::class, allFields[key]?.abValue!!::class)
}
}
}
@@ -114,10 +126,22 @@ class EnrichedDestinationRecordAirbyteValueTest {
@Test
fun `test proper collection of changes in meta field`() {
// Create fields with changes
val field1 = EnrichedAirbyteValue(StringValue("value1"), StringType, "field1")
val field1 =
EnrichedAirbyteValue(
StringValue("value1"),
StringType,
"field1",
airbyteMetaField = null
)
field1.truncate(Reason.DESTINATION_RECORD_SIZE_LIMITATION, StringValue("val"))
val field2 = EnrichedAirbyteValue(IntegerValue(1000000), IntegerType, "field2")
val field2 =
EnrichedAirbyteValue(
IntegerValue(1000000),
IntegerType,
"field2",
airbyteMetaField = null
)
field2.nullify(Reason.DESTINATION_FIELD_SIZE_LIMITATION)
val declaredFields = mapOf("field1" to field1, "field2" to field2)
@@ -146,7 +170,7 @@ class EnrichedDestinationRecordAirbyteValueTest {
// Get the changes array from the meta field
val metaField = record.airbyteMetaFields[Meta.COLUMN_NAME_AB_META]!!
val metaObj = metaField.value as ObjectValue
val metaObj = metaField.abValue as ObjectValue
val changesArray = metaObj.values["changes"] as ArrayValue
// Should contain 3 changes total: 1 from meta and 2 from declared fields
@@ -207,9 +231,9 @@ class EnrichedDestinationRecordAirbyteValueTest {
)
val rawId1 =
(record1.airbyteMetaFields[Meta.COLUMN_NAME_AB_RAW_ID]!!.value as StringValue).value
(record1.airbyteMetaFields[Meta.COLUMN_NAME_AB_RAW_ID]!!.abValue as StringValue).value
val rawId2 =
(record2.airbyteMetaFields[Meta.COLUMN_NAME_AB_RAW_ID]!!.value as StringValue).value
(record2.airbyteMetaFields[Meta.COLUMN_NAME_AB_RAW_ID]!!.abValue as StringValue).value
// Validate UUID format
val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"

View File

@@ -135,14 +135,14 @@ class DestinationRecordRawTest {
// Check coerced string field
val stringField = enrichedRecord.declaredFields["string_field"]
assertNotNull(stringField)
assertTrue(stringField?.value is StringValue)
assertEquals("42", (stringField?.value as StringValue).value)
assertTrue(stringField?.abValue is StringValue)
assertEquals("42", (stringField?.abValue as StringValue).value)
// Check coerced integer field
val integerField = enrichedRecord.declaredFields["integer_field"]
assertNotNull(integerField)
assertTrue(integerField?.value is IntegerValue)
assertEquals(BigInteger.valueOf(123), (integerField?.value as IntegerValue).value)
assertTrue(integerField?.abValue is IntegerValue)
assertEquals(BigInteger.valueOf(123), (integerField?.abValue as IntegerValue).value)
// Check coerced boolean field - might be nullified or coerced depending on implementation
val booleanField = enrichedRecord.declaredFields["boolean_field"]
@@ -151,7 +151,7 @@ class DestinationRecordRawTest {
// Check coerced number field
val numberField = enrichedRecord.declaredFields["number_field"]
assertNotNull(numberField)
assertTrue(numberField?.value is NumberValue)
assertTrue(numberField?.abValue is NumberValue)
}
@Test
@@ -189,12 +189,12 @@ class DestinationRecordRawTest {
// Check valid field is preserved
val stringField = enrichedRecord.declaredFields["string_field"]
assertNotNull(stringField)
assertTrue(stringField?.value is StringValue)
assertTrue(stringField?.abValue is StringValue)
// Check invalid integer is nullified with change recorded
val integerField = enrichedRecord.declaredFields["integer_field"]
assertNotNull(integerField)
assertEquals(NullValue, integerField?.value)
assertEquals(NullValue, integerField?.abValue)
assertTrue(integerField?.changes?.isNotEmpty() ?: false)
assertEquals(
AirbyteRecordMessageMetaChange.Change.NULLED,
@@ -204,7 +204,7 @@ class DestinationRecordRawTest {
// Check invalid array is nullified with change recorded
val arrayField = enrichedRecord.declaredFields["array_field"]
assertNotNull(arrayField)
assertEquals(NullValue, arrayField?.value)
assertEquals(NullValue, arrayField?.abValue)
assertTrue(arrayField?.changes?.isNotEmpty() ?: false)
}
@@ -393,13 +393,13 @@ class DestinationRecordRawTest {
// Check nested object
val nestedObject = enrichedRecord.declaredFields["nested_object"]
assertNotNull(nestedObject)
assertTrue(nestedObject?.value is ObjectValue)
assertTrue(nestedObject?.abValue is ObjectValue)
// Check array of objects
val arrayOfObjects = enrichedRecord.declaredFields["array_of_objects"]
assertNotNull(arrayOfObjects)
assertTrue(arrayOfObjects?.value is ArrayValue)
assertEquals(2, (arrayOfObjects?.value as ArrayValue).values.size)
assertTrue(arrayOfObjects?.abValue is ArrayValue)
assertEquals(2, (arrayOfObjects?.abValue as ArrayValue).values.size)
}
@Test

View File

@@ -12,7 +12,7 @@ import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.MessageQueue
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.ProtocolMessageDeserializer
@@ -96,7 +96,7 @@ class ProcessRecordsTaskTest {
class MockBatch(
override val groupId: String?,
override val state: Batch.State,
recordIterator: Iterator<DestinationRecordAirbyteValue>
recordIterator: Iterator<DestinationRecordRaw>
) : Batch {
val records = recordIterator.asSequence().toList()
}
@@ -162,7 +162,11 @@ class ProcessRecordsTaskTest {
it.batch is MockBatch &&
(it.batch as MockBatch)
.records
.map { record -> (record.data as IntegerValue).value.toString() }
.map { record ->
(record.asDestinationRecordAirbyteValue().data as IntegerValue)
.value
.toString()
}
.toSet() == serializedRecords.toSet()
}

View File

@@ -302,7 +302,7 @@ abstract class IntegrationTest(
fun updateConfig(config: String): String = configUpdater.update(config)
companion object {
val randomizedNamespaceRegex = Regex("test(\\d{8})[A-Za-z]{4}")
val randomizedNamespaceRegex = Regex("test(\\d{8})[A-Za-z]{4}.*")
val randomizedNamespaceDateFormatter: DateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMdd")

View File

@@ -1990,13 +1990,38 @@ abstract class BasicFunctionalityIntegrationTest(
}
""".trimIndent()
),
// Another record that verifies numeric behavior.
// -99999999999999999999999999999999 is out of range for int64.
makeRecord(
"""
{
"id": 6,
"integer": -99999999999999999999999999999999
}
""".trimIndent(),
),
// A record with truncated timestamps (i.e. the seconds value is 0).
// Some destinations have specific formatting requirements, and it's easy
// to mess these values up.
makeRecord(
"""
{
"id": 7,
"timestamp_with_timezone": "2023-01-23T11:34:00-01:00",
"timestamp_without_timezone": "2023-01-23T12:34:00",
"time_with_timezone": "11:34:00-01:00",
"time_without_timezone": "12:34:00"
}
""".trimIndent()
),
),
)
val nestedFloat: BigDecimal
val topLevelFloat: BigDecimal
val bigInt: BigInteger?
val positiveBigInt: BigInteger?
val bigIntChanges: List<Change>
val negativeBigInt: BigInteger?
val badValuesData: Map<String, Any?>
val badValuesChanges: MutableList<Change>
when (allTypesBehavior) {
@@ -2013,12 +2038,13 @@ abstract class BasicFunctionalityIntegrationTest(
} else {
BigDecimal("50000.0000000000000001")
}
bigInt =
if (allTypesBehavior.integerCanBeLarge) {
BigInteger("99999999999999999999999999999999")
} else {
null
}
if (allTypesBehavior.integerCanBeLarge) {
positiveBigInt = BigInteger("99999999999999999999999999999999")
negativeBigInt = BigInteger("-99999999999999999999999999999999")
} else {
positiveBigInt = null
negativeBigInt = null
}
bigIntChanges =
if (allTypesBehavior.integerCanBeLarge) {
emptyList()
@@ -2073,7 +2099,8 @@ abstract class BasicFunctionalityIntegrationTest(
Untyped -> {
nestedFloat = BigDecimal("50000.0000000000000001")
topLevelFloat = BigDecimal("50000.0000000000000001")
bigInt = BigInteger("99999999999999999999999999999999")
positiveBigInt = BigInteger("99999999999999999999999999999999")
negativeBigInt = BigInteger("-99999999999999999999999999999999")
bigIntChanges = emptyList()
badValuesData =
// note that the values have different types than what's declared in the schema
@@ -2147,7 +2174,7 @@ abstract class BasicFunctionalityIntegrationTest(
"id" to 4,
"struct" to schematizedObject(linkedMapOf("foo" to nestedFloat)),
"number" to topLevelFloat,
"integer" to bigInt,
"integer" to positiveBigInt,
),
airbyteMeta = OutputRecord.Meta(syncId = 42, changes = bigIntChanges),
),
@@ -2157,6 +2184,31 @@ abstract class BasicFunctionalityIntegrationTest(
data = badValuesData,
airbyteMeta = OutputRecord.Meta(syncId = 42, changes = badValuesChanges),
),
OutputRecord(
extractedAt = 100,
generationId = 42,
data =
mapOf(
"id" to 6,
"integer" to negativeBigInt,
),
airbyteMeta = OutputRecord.Meta(syncId = 42, changes = bigIntChanges),
),
OutputRecord(
extractedAt = 100,
generationId = 42,
data =
mapOf(
"id" to 7,
"timestamp_with_timezone" to
OffsetDateTime.parse("2023-01-23T11:34:00-01:00"),
"timestamp_without_timezone" to
LocalDateTime.parse("2023-01-23T12:34:00"),
"time_with_timezone" to OffsetTime.parse("11:34:00-01:00"),
"time_without_timezone" to LocalTime.parse("12:34:00"),
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
),
stream,
primaryKey = listOf(listOf("id")),

View File

@@ -4,6 +4,7 @@
package io.airbyte.cdk.load.data.csv
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateValue
@@ -22,24 +23,26 @@ import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.util.serializeToString
fun ObjectValue.toCsvRecord(schema: ObjectType): List<Any> {
return schema.properties.map { (key, _) ->
values[key]?.let {
when (it) {
is ObjectValue -> it.toJson().serializeToString()
is ArrayValue -> it.toJson().serializeToString()
is StringValue -> it.value
is IntegerValue -> it.value
is NumberValue -> it.value
is NullValue -> ""
is TimestampWithTimezoneValue -> it.value
is TimestampWithoutTimezoneValue -> it.value
is BooleanValue -> it.value
is DateValue -> it.value
is TimeWithTimezoneValue -> it.value
is TimeWithoutTimezoneValue -> it.value
is UnknownValue -> ""
}
}
?: ""
}
return schema.properties.map { (key, _) -> values[key].toCsvValue() }
}
fun AirbyteValue?.toCsvValue(): Any {
return this?.let {
when (it) {
is ObjectValue -> it.toJson().serializeToString()
is ArrayValue -> it.toJson().serializeToString()
is StringValue -> it.value
is IntegerValue -> it.value
is NumberValue -> it.value
is NullValue -> ""
is TimestampWithTimezoneValue -> it.value
is TimestampWithoutTimezoneValue -> it.value
is BooleanValue -> it.value
is DateValue -> it.value
is TimeWithTimezoneValue -> it.value
is TimeWithoutTimezoneValue -> it.value
is UnknownValue -> ""
}
}
?: ""
}

View File

@@ -134,7 +134,7 @@ fun Map<String, EnrichedAirbyteValue>.toIcebergRecord(icebergSchema: Schema): Ge
if (value != null) {
record.setField(
field.name(),
airbyteValueToIcebergRecord.convert(value.value, field.type())
airbyteValueToIcebergRecord.convert(value.abValue, field.type())
)
}
}

View File

@@ -248,7 +248,7 @@ class IcebergUtil(private val tableIdGenerator: TableIdGenerator) {
): Operation =
if (
record.declaredFields[AIRBYTE_CDC_DELETE_COLUMN] != null &&
record.declaredFields[AIRBYTE_CDC_DELETE_COLUMN]!!.value !is NullValue
record.declaredFields[AIRBYTE_CDC_DELETE_COLUMN]!!.abValue !is NullValue
) {
Operation.DELETE
} else if (importType is Dedupe) {
@@ -306,7 +306,7 @@ fun EnrichedAirbyteValue.transformValueRecursingIntoArrays(
}
}
value = recurseArray(value, type, name)
abValue = recurseArray(abValue, type, name)
}
data class ChangeDescription(val change: Change, val reason: Reason)

View File

@@ -200,12 +200,14 @@ class AirbyteValueToIcebergRecordTest {
IntegerValue(123L),
IntegerType,
"id",
airbyteMetaField = null,
),
"name" to
EnrichedAirbyteValue(
StringValue("John Doe"),
StringType,
"name",
airbyteMetaField = null,
),
"meta" to
EnrichedAirbyteValue(
@@ -223,6 +225,7 @@ class AirbyteValueToIcebergRecordTest {
),
Meta.AirbyteMetaFields.META.type,
"meta",
airbyteMetaField = Meta.AirbyteMetaFields.META,
)
)
@@ -256,9 +259,15 @@ class AirbyteValueToIcebergRecordTest {
IntegerValue(123L),
IntegerType,
"id",
airbyteMetaField = null,
),
"name" to
EnrichedAirbyteValue(StringValue("Should be ignored"), StringType, "name"),
EnrichedAirbyteValue(
StringValue("Should be ignored"),
StringType,
"name",
airbyteMetaField = null
),
)
val result = objectValue.toIcebergRecord(schema)

View File

@@ -24,7 +24,7 @@ import io.airbyte.cdk.load.file.avro.toAvroWriter
import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader
import io.airbyte.cdk.load.file.parquet.ParquetWriter
import io.airbyte.cdk.load.file.parquet.toParquetWriter
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.cdk.load.util.write
import io.github.oshai.kotlinlogging.KotlinLogging
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.avro.Schema
interface ObjectStorageFormattingWriter : Closeable {
fun accept(record: DestinationRecordAirbyteValue)
fun accept(record: DestinationRecordRaw)
fun flush()
}
@@ -86,9 +86,13 @@ class JsonFormattingWriter(
private val rootLevelFlattening: Boolean,
) : ObjectStorageFormattingWriter {
override fun accept(record: DestinationRecordAirbyteValue) {
override fun accept(record: DestinationRecordRaw) {
val data =
record.dataWithAirbyteMeta(stream, rootLevelFlattening).toJson().serializeToString()
record
.asDestinationRecordAirbyteValue()
.dataWithAirbyteMeta(stream, rootLevelFlattening)
.toJson()
.serializeToString()
outputStream.write(data)
outputStream.write("\n")
}
@@ -110,9 +114,12 @@ class CSVFormattingWriter(
private val finalSchema = stream.schema.withAirbyteMeta(rootLevelFlattening)
private val printer = finalSchema.toCsvPrinterWithHeader(outputStream)
override fun accept(record: DestinationRecordAirbyteValue) {
override fun accept(record: DestinationRecordRaw) {
printer.printRecord(
record.dataWithAirbyteMeta(stream, rootLevelFlattening).toCsvRecord(finalSchema)
record
.asDestinationRecordAirbyteValue()
.dataWithAirbyteMeta(stream, rootLevelFlattening)
.toCsvRecord(finalSchema)
)
}
@@ -143,9 +150,11 @@ class AvroFormattingWriter(
log.info { "Generated avro schema: $avroSchema" }
}
override fun accept(record: DestinationRecordAirbyteValue) {
val dataMapped = pipeline.map(record.data, record.meta?.changes)
val withMeta = dataMapped.withAirbyteMeta(stream, record.emittedAtMs, rootLevelFlattening)
override fun accept(record: DestinationRecordRaw) {
val marshalledRecord = record.asDestinationRecordAirbyteValue()
val dataMapped = pipeline.map(marshalledRecord.data, marshalledRecord.meta?.changes)
val withMeta =
dataMapped.withAirbyteMeta(stream, marshalledRecord.emittedAtMs, rootLevelFlattening)
writer.write(withMeta.toAvroRecord(mappedSchema, avroSchema))
}
@@ -176,9 +185,11 @@ class ParquetFormattingWriter(
log.info { "Generated avro schema: $avroSchema" }
}
override fun accept(record: DestinationRecordAirbyteValue) {
val dataMapped = pipeline.map(record.data, record.meta?.changes)
val withMeta = dataMapped.withAirbyteMeta(stream, record.emittedAtMs, rootLevelFlattening)
override fun accept(record: DestinationRecordRaw) {
val marshalledRecord = record.asDestinationRecordAirbyteValue()
val dataMapped = pipeline.map(marshalledRecord.data, marshalledRecord.meta?.changes)
val withMeta =
dataMapped.withAirbyteMeta(stream, marshalledRecord.emittedAtMs, rootLevelFlattening)
writer.write(withMeta.toAvroRecord(mappedSchema, avroSchema))
}
@@ -224,7 +235,7 @@ class BufferedFormattingWriter<T : OutputStream>(
0
} else buffer.size()
override fun accept(record: DestinationRecordAirbyteValue) {
override fun accept(record: DestinationRecordRaw) {
writer.accept(record)
rowsAdded.incrementAndGet()
}

View File

@@ -104,7 +104,7 @@ class ObjectLoaderPartFormatter<T : OutputStream>(
input: DestinationRecordRaw,
state: State<T>
): BatchAccumulatorResult<State<T>, FormattedPart> {
state.writer.accept(input.asDestinationRecordAirbyteValue())
state.writer.accept(input)
val dataSufficient = state.writer.bufferSize >= partSizeBytes || batchSizeOverride != null
return if (dataSufficient) {
val part = makePart(state)

View File

@@ -10,8 +10,8 @@ import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.object_storage.PartFactory
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.object_storage.*
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.object_storage.LoadablePart
import io.airbyte.cdk.load.write.BatchAccumulator
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.OutputStream
@@ -39,7 +39,7 @@ class RecordToPartAccumulator<U : OutputStream>(
private val currentObject = ConcurrentHashMap<String, ObjectInProgress<U>>()
override suspend fun processRecords(
records: Iterator<DestinationRecordAirbyteValue>,
records: Iterator<DestinationRecordRaw>,
totalSizeBytes: Long,
endOfStream: Boolean
): Batch {

View File

@@ -5,12 +5,15 @@
package io.airbyte.cdk.load.write.object_storage
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriter
import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.object_storage.*
import io.airbyte.cdk.load.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.mockk
@@ -40,15 +43,18 @@ class RecordToPartAccumulatorTest {
coEvery { bufferedWriter.close() } returns Unit
}
private fun makeRecord(): DestinationRecordAirbyteValue =
DestinationRecordAirbyteValue(
private fun makeRecord(): DestinationRecordRaw =
DestinationRecordRaw(
stream,
ObjectValue(linkedMapOf()),
0L,
null,
AirbyteMessage()
.withRecord(
AirbyteRecordMessage().withEmittedAt(42).withData(Jsons.createObjectNode())
),
serialized = "",
ObjectTypeWithEmptySchema,
)
private fun makeRecords(n: Int): Iterator<DestinationRecordAirbyteValue> =
private fun makeRecords(n: Int): Iterator<DestinationRecordRaw> =
(0 until n).map { makeRecord() }.listIterator()
private fun makeBytes(n: Int): ByteArray? =

View File

@@ -16,7 +16,7 @@ data:
type: GSM
connectorType: destination
definitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
dockerImageTag: 2.0.5
dockerImageTag: 2.1.0
dockerRepository: airbyte/destination-mssql
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql
githubIssueLabel: destination-mssql

View File

@@ -9,14 +9,15 @@ import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.util.Jsons
import io.airbyte.integrations.destination.mssql.v2.config.AzureBlobStorageClientCreator
import io.airbyte.integrations.destination.mssql.v2.config.BulkLoadConfiguration
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLDataSourceFactory
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import jakarta.inject.Singleton
import java.io.ByteArrayOutputStream
import java.sql.Connection
@@ -130,15 +131,23 @@ class MSSQLChecker(private val dataSourceFactory: MSSQLDataSourceFactory) :
private fun createTestCsvData(stream: DestinationStream): ByteArray {
return ByteArrayOutputStream().use { outputStream ->
MSSQLCSVFormattingWriter(stream, outputStream, true).use { csvWriter ->
// TODO this is kind of dumb
val destinationRecord =
DestinationRecordAirbyteValue(
stream,
ObjectValue(
linkedMapOf(COLUMN_NAME to IntegerValue(TEST_ID_VALUE.toBigInteger()))
),
0L,
null,
)
AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(
AirbyteRecordMessage()
.withEmittedAt(0)
.withData(Jsons.valueToTree(mapOf(COLUMN_NAME to TEST_ID_VALUE)))
)
.let { message ->
DestinationRecordRaw(
stream,
message,
Jsons.writeValueAsString(message),
stream.schema
)
}
csvWriter.accept(destinationRecord)
csvWriter.flush()
}

View File

@@ -0,0 +1,121 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.mssql.v2
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.data.csv.toCsvValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
import java.math.BigInteger
import java.time.format.DateTimeFormatter
object LIMITS {
// Maximum value for BIGINT in SQL Server
val MAX_BIGINT = BigInteger("9223372036854775807")
val MIN_BIGINT = BigInteger("-9223372036854775808")
val TRUE = IntegerValue(1)
val FALSE = IntegerValue(0)
}
/**
* Creates a generator for MSSQL CSV rows.
*
* @param validateValuesPreLoad Whether to validate string values before loading them into the csv
* file.
* ```
* This is optional and disabled by default as it's a computationally
* expensive operation that can significantly impact performance.
* Only enable if strict data validation is required.
* ```
*/
class MSSQLCsvRowGenerator(private val validateValuesPreLoad: Boolean) {
fun generate(record: DestinationRecordRaw, schema: ObjectType): List<Any> {
val enrichedRecord = record.asEnrichedDestinationRecordAirbyteValue()
if (validateValuesPreLoad) {
enrichedRecord.declaredFields.values.forEach { value ->
if (value.abValue is NullValue) {
return@forEach
}
val actualValue = value.abValue
when (value.type) {
// Enforce numeric range
is IntegerType -> {
if (
(actualValue as IntegerValue).value < LIMITS.MIN_BIGINT ||
LIMITS.MAX_BIGINT < actualValue.value
) {
value.nullify(Reason.DESTINATION_FIELD_SIZE_LIMITATION)
}
}
is NumberType -> {
// Force to BigDecimal -> re-box as NumberValue
value.abValue =
NumberValue(
(actualValue as NumberValue).value.toDouble().toBigDecimal()
)
}
// SQL server expects booleans as 0 or 1
is BooleanType ->
value.abValue =
if ((actualValue as BooleanValue).value) LIMITS.TRUE else LIMITS.FALSE
// MSSQL requires a specific timestamp format - in particular,
// "2000-01-01T00:00Z" causes an error.
// MSSQL requires "2000-01-01T00:00:00Z".
is TimestampTypeWithTimezone ->
value.abValue =
StringValue(
(actualValue as TimestampWithTimezoneValue)
.value
.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)
)
is TimestampTypeWithoutTimezone ->
value.abValue =
StringValue(
(actualValue as TimestampWithoutTimezoneValue)
.value
.format(DateTimeFormatter.ISO_DATE_TIME)
)
// serialize complex types to string
is ArrayType,
is ObjectType,
is UnionType,
is UnknownType -> value.abValue = StringValue(actualValue.serializeToString())
else -> {}
}
}
}
val values = enrichedRecord.allTypedFields
return schema.properties.map { (columnName, _) ->
val value = values[columnName]
if (value == null || value.abValue is NullValue || !validateValuesPreLoad) {
return@map value?.abValue.toCsvValue()
}
value.abValue.toCsvValue()
}
}
}

View File

@@ -1,262 +0,0 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.mssql.v2
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.Meta
import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.math.BigInteger
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
private object LIMITS {
// Maximum value for BIGINT in SQL Server
val MAX_BIGINT = BigInteger("9223372036854775807")
val TRUE = IntegerValue(1)
val FALSE = IntegerValue(0)
}
/**
* Creates a validator for MSSQL CSV rows.
*
* @param validateValuesPreLoad Whether to validate string values before loading them into the csv
* file.
* ```
* This is optional and disabled by default as it's a computationally
* expensive operation that can significantly impact performance.
* Only enable if strict data validation is required.
* ```
*/
class MSSQLCsvRowValidator(private val validateValuesPreLoad: Boolean) {
fun validate(
record: DestinationRecordAirbyteValue,
schema: ObjectType
): DestinationRecordAirbyteValue {
val objectValue = record.data as? ObjectValue ?: return record
val values = objectValue.values
schema.properties.forEach { (columnName, fieldType) ->
val oldValue = values[columnName]
if (oldValue != null && oldValue !is NullValue && record.meta != null) {
values[columnName] =
oldValue.validateAndReplace(columnName, fieldType, record.meta!!)
}
}
return record
}
private fun AirbyteValue.validateAndReplace(
columnName: String,
fieldType: FieldType,
meta: Meta
): AirbyteValue =
when (this) {
is StringValue -> validateStringValue(columnName, this, fieldType, meta)
is IntegerValue -> validateIntegerValue(columnName, this, meta)
is BooleanValue -> validateBooleanValue(this)
is NumberValue -> validateNumberValue(this)
is ObjectValue -> validateObjectValue(this, fieldType, meta)
else -> this
}
private fun validateStringValue(
columnName: String,
value: StringValue,
fieldType: FieldType,
meta: Meta
): AirbyteValue {
if (!validateValuesPreLoad || fieldType.isStringColumn()) {
return value
}
val rawString = value.value
if (fieldType.isNumericColumn()) {
return runCatching { NumberValue(rawString.toBigDecimal()) }
.fold(
onSuccess = { validateNumberValue(it) },
onFailure = {
meta.nullify(
columnName,
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR
)
NullValue
}
)
} else if (fieldType.isBooleanColumn()) {
val asIntValue = parseBooleanAsIntValue(rawString)
if (asIntValue == null) {
meta.nullify(
columnName,
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR
)
return NullValue
}
return asIntValue
} else if (fieldType.isDateColumn()) {
if (!safeParseDate(rawString)) {
meta.nullify(
columnName,
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR
)
return NullValue
}
} else if (fieldType.isTimeColumn()) {
if (!safeParseTime(rawString)) {
meta.nullify(
columnName,
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR
)
return NullValue
}
} else if (fieldType.isTimestampColumn()) {
if (!safeParseTimestamp(rawString)) {
meta.nullify(
columnName,
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR
)
return NullValue
}
} else if (fieldType.isUnionColumn()) {
return StringValue(Jsons.serialize(rawString))
}
return value
}
private fun validateIntegerValue(
columnName: String,
value: IntegerValue,
meta: Meta
): AirbyteValue {
// If the integer is bigger than BIGINT, then we must nullify it.
if (value.value > LIMITS.MAX_BIGINT) {
meta.nullify(
columnName,
AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
)
return NullValue
}
return value
}
private fun validateBooleanValue(value: BooleanValue): IntegerValue {
return if (value.value) LIMITS.TRUE else LIMITS.FALSE
}
private fun validateNumberValue(value: NumberValue): NumberValue {
// Force to BigDecimal -> re-box as NumberValue
return NumberValue(value.value.toDouble().toBigDecimal())
}
private fun validateObjectValue(
value: ObjectValue,
fieldType: FieldType,
meta: Meta
): ObjectValue {
// If the schema says it's actually an object, we recursively validate.
val actualObjType = fieldType.type
if (actualObjType is ObjectType) {
val convertedValues = LinkedHashMap<String, AirbyteValue>(value.values.size)
value.values.forEach { (propName, propValue) ->
val subType = actualObjType.properties[propName]
val validated =
if (subType != null) propValue.validateAndReplace(propName, subType, meta)
else propValue
convertedValues[propName] = validated
}
return ObjectValue(convertedValues)
}
return value
}
private fun parseBooleanAsIntValue(value: String): IntegerValue? {
// Accept "1", "0", or strict booleans ("true"/"false")
return when (value) {
"1" -> LIMITS.TRUE
"0" -> LIMITS.FALSE
else -> {
val bool = value.lowercase().toBooleanStrictOrNull() ?: return null
if (bool) LIMITS.TRUE else LIMITS.FALSE
}
}
}
private fun safeParseDate(value: String): Boolean {
return runCatching { LocalDate.parse(value) }.isSuccess
}
private fun safeParseTime(value: String): Boolean {
if (runCatching { OffsetTime.parse(value) }.isSuccess) return true
return runCatching { LocalTime.parse(value) }.isSuccess
}
private fun safeParseTimestamp(value: String): Boolean {
if (runCatching { OffsetDateTime.parse(value) }.isSuccess) return true
return runCatching { LocalDateTime.parse(value) }.isSuccess
}
private fun FieldType.isNumericColumn(): Boolean {
return type is NumberType || type is IntegerType
}
private fun FieldType.isStringColumn(): Boolean {
return type is StringType
}
private fun FieldType.isUnionColumn(): Boolean {
return type is UnionType
}
private fun FieldType.isBooleanColumn(): Boolean {
return type is BooleanType
}
private fun FieldType.isDateColumn(): Boolean {
return type is DateType
}
private fun FieldType.isTimeColumn(): Boolean {
return type is TimeTypeWithTimezone || type is TimeTypeWithoutTimezone
}
private fun FieldType.isTimestampColumn(): Boolean {
return type is TimestampTypeWithTimezone || type is TimestampTypeWithoutTimezone
}
private fun Meta.nullify(fieldName: String, reason: AirbyteRecordMessageMetaChange.Reason) {
val metaChange =
Meta.Change(
field = fieldName,
change = AirbyteRecordMessageMetaChange.Change.NULLED,
reason = reason
)
(this.changes as MutableList).add(metaChange)
}
}

View File

@@ -11,31 +11,51 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.Overwrite
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateType
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteTypeToMssqlType
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setAsNullValue
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setValue
import io.airbyte.integrations.destination.mssql.v2.convert.MssqlType
import io.airbyte.integrations.destination.mssql.v2.convert.ResultSetToAirbyteValue.Companion.getAirbyteNamedValue
import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
import io.github.oshai.kotlinlogging.KotlinLogging
import java.lang.ArithmeticException
import java.sql.Connection
import java.sql.Date
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.util.UUID
private val logger = KotlinLogging.logger {}
@@ -198,19 +218,6 @@ class MSSQLQueryBuilder(
)
val airbyteFields = airbyteFinalTableFields.map { it.name }.toSet()
private fun AirbyteRecordMessageMeta.trackChange(
fieldName: String,
change: AirbyteRecordMessageMetaChange.Change,
reason: AirbyteRecordMessageMetaChange.Reason,
) {
this.changes.add(
AirbyteRecordMessageMetaChange()
.withField(fieldName)
.withChange(change)
.withReason(reason)
)
}
}
data class NamedField(val name: String, val type: FieldType)
@@ -336,61 +343,92 @@ class MSSQLQueryBuilder(
fun populateStatement(
statement: PreparedStatement,
record: DestinationRecordAirbyteValue,
plainRecord: DestinationRecordRaw,
schema: List<NamedField>
) {
val recordObject = record.data as ObjectValue
var airbyteMetaStatementIndex: Int? = null
val airbyteMeta =
AirbyteRecordMessageMeta().apply {
changes =
record.meta?.changes?.map { it.asProtocolObject() }?.toMutableList()
?: mutableListOf()
setAdditionalProperty("sync_id", stream.syncId)
}
val enrichedRecord = plainRecord.asEnrichedDestinationRecordAirbyteValue()
val populatedFields = enrichedRecord.allTypedFields
var airbyteMetaStatementIndex: Int? = null
schema.forEachIndexed { index, field ->
val statementIndex = index + 1
if (field.name in airbyteFields) {
when (field.name) {
COLUMN_NAME_AB_RAW_ID ->
statement.setString(statementIndex, UUID.randomUUID().toString())
COLUMN_NAME_AB_EXTRACTED_AT ->
statement.setLong(statementIndex, record.emittedAtMs)
COLUMN_NAME_AB_GENERATION_ID ->
statement.setLong(statementIndex, stream.generationId)
COLUMN_NAME_AB_META -> airbyteMetaStatementIndex = statementIndex
}
} else {
try {
val value = recordObject.values[field.name]
statement.setValue(statementIndex, value, field)
} catch (e: Exception) {
statement.setAsNullValue(statementIndex, field.type.type)
when (e) {
is ArithmeticException ->
airbyteMeta.trackChange(
field.name,
AirbyteRecordMessageMetaChange.Change.NULLED,
AirbyteRecordMessageMetaChange.Reason
.DESTINATION_FIELD_SIZE_LIMITATION,
)
else ->
airbyteMeta.trackChange(
field.name,
AirbyteRecordMessageMetaChange.Change.NULLED,
AirbyteRecordMessageMetaChange.Reason
.DESTINATION_SERIALIZATION_ERROR,
)
val value = populatedFields[field.name]
if (value == null || value.abValue == NullValue) {
statement.setAsNullValue(statementIndex, field.type.type)
return@forEachIndexed
}
if (value.airbyteMetaField == Meta.AirbyteMetaFields.META) {
// don't populate _airbyte_meta yet - we might run into errors in the other fields
// for this record.
// Instead, we grab the statement index, and populate airbyte_meta after processing
// all the other fields.
airbyteMetaStatementIndex = statementIndex
return@forEachIndexed
}
when (value.type) {
BooleanType ->
statement.setBoolean(statementIndex, (value.abValue as BooleanValue).value)
DateType ->
statement.setDate(
statementIndex,
Date.valueOf((value.abValue as DateValue).value)
)
IntegerType -> {
val intValue = (value.abValue as IntegerValue).value
if (intValue < LIMITS.MIN_BIGINT || LIMITS.MAX_BIGINT < intValue) {
value.nullify(Reason.DESTINATION_FIELD_SIZE_LIMITATION)
} else {
statement.setLong(statementIndex, intValue.longValueExact())
}
}
NumberType ->
statement.setDouble(
statementIndex,
(value.abValue as NumberValue).value.toDouble()
)
StringType ->
statement.setString(statementIndex, (value.abValue as StringValue).value)
TimeTypeWithTimezone ->
statement.setObject(
statementIndex,
(value.abValue as TimeWithTimezoneValue).value
)
TimeTypeWithoutTimezone ->
statement.setObject(
statementIndex,
(value.abValue as TimeWithoutTimezoneValue).value
)
TimestampTypeWithTimezone ->
statement.setObject(
statementIndex,
(value.abValue as TimestampWithTimezoneValue).value
)
TimestampTypeWithoutTimezone ->
statement.setObject(
statementIndex,
(value.abValue as TimestampWithoutTimezoneValue).value
)
// Serialize complex types to string
is ArrayType,
ArrayTypeWithoutSchema,
is ObjectType,
ObjectTypeWithEmptySchema,
ObjectTypeWithoutSchema,
is UnionType,
is UnknownType ->
statement.setString(statementIndex, value.abValue.serializeToString())
}
}
// Now that we're done processing the rest of the record, populate airbyte_meta into the
// prepared statement.
airbyteMetaStatementIndex?.let { statementIndex ->
if (airbyteMeta.changes.isEmpty()) {
airbyteMeta.changes = null
}
statement.setString(statementIndex, Jsons.serialize(airbyteMeta))
statement.setString(
statementIndex,
enrichedRecord.airbyteMeta.abValue.serializeToString()
)
}
}

View File

@@ -6,7 +6,7 @@ package io.airbyte.integrations.destination.mssql.v2
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.SimpleBatch
import javax.sql.DataSource
@@ -19,7 +19,7 @@ class MSSQLStreamLoader(
private val recordCommitBatchSize = 5_000
override suspend fun processRecords(
records: Iterator<DestinationRecordAirbyteValue>,
records: Iterator<DestinationRecordRaw>,
totalSizeBytes: Long,
endOfStream: Boolean
): Batch {

View File

@@ -5,31 +5,24 @@
package io.airbyte.integrations.destination.mssql.v2
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.csv.toCsvRecord
import io.airbyte.cdk.load.data.dataWithAirbyteMeta
import io.airbyte.cdk.load.data.withAirbyteMeta
import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader
import io.airbyte.cdk.load.file.object_storage.ObjectStorageFormattingWriter
import io.airbyte.cdk.load.file.object_storage.ObjectStorageFormattingWriterFactory
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import java.io.OutputStream
import javax.inject.Singleton
class MSSQLCSVFormattingWriter(
private val stream: DestinationStream,
stream: DestinationStream,
outputStream: OutputStream,
validateValuesPreLoad: Boolean,
) : ObjectStorageFormattingWriter {
private val finalSchema = stream.schema.withAirbyteMeta(true)
private val printer = finalSchema.toCsvPrinterWithHeader(outputStream)
private val mssqlRowValidator = MSSQLCsvRowValidator(validateValuesPreLoad)
override fun accept(record: DestinationRecordAirbyteValue) {
printer.printRecord(
mssqlRowValidator
.validate(record, this.finalSchema)
.dataWithAirbyteMeta(stream, true)
.toCsvRecord(finalSchema),
)
private val mssqlRowGenerator = MSSQLCsvRowGenerator(validateValuesPreLoad)
override fun accept(record: DestinationRecordRaw) {
printer.printRecord(mssqlRowGenerator.generate(record, finalSchema))
}
override fun flush() {
printer.flush()

View File

@@ -1,59 +0,0 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.mssql.v2.convert
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.data.UnknownValue
import io.airbyte.cdk.load.util.serializeToJsonBytes
import java.sql.Date
import java.sql.Time
import java.sql.Timestamp
/** CDK pipeline [AirbyteValue] to SQL values converter. */
class AirbyteValueToSqlValue {
/**
* Converts an [AirbyteValue] to the associated SQL value.
*
* @param airbyteValue The [AirbyteValue] from an Airbyte record
* @return The corresponding SQL value for the given [AirbyteValue].
* @throws IllegalArgumentException if the [AirbyteValue] is not supported.
*/
fun convert(airbyteValue: AirbyteValue): Any? {
return when (airbyteValue) {
is ObjectValue -> {
val convertedValues =
airbyteValue.values.entries.associate { (name, value) ->
name to convert(value)
}
convertedValues
}
is ArrayValue -> airbyteValue.values.map { convert(it) }
is BooleanValue -> airbyteValue.value
is DateValue -> Date.valueOf(airbyteValue.value)
is IntegerValue -> airbyteValue.value
is NullValue -> null
is NumberValue -> airbyteValue.value.toDouble().toBigDecimal()
is StringValue -> airbyteValue.value
is UnknownValue -> airbyteValue.value.serializeToJsonBytes()
is TimeWithTimezoneValue -> Time.valueOf(airbyteValue.value.toLocalTime())
is TimeWithoutTimezoneValue -> Time.valueOf(airbyteValue.value)
is TimestampWithTimezoneValue -> Timestamp.valueOf(airbyteValue.value.toLocalDateTime())
is TimestampWithoutTimezoneValue -> Timestamp.valueOf(airbyteValue.value)
}
}
}

View File

@@ -5,164 +5,14 @@
package io.airbyte.integrations.destination.mssql.v2.convert
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.data.UnknownValue
import io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilder
import io.airbyte.protocol.models.Jsons
import java.sql.Date
import java.sql.PreparedStatement
import java.sql.Types
class AirbyteValueToStatement {
companion object {
private val toSqlType = AirbyteTypeToMssqlType()
private val toSqlValue = AirbyteValueToSqlValue()
private val valueCoercingMapper =
AirbyteValueDeepCoercingMapper(
recurseIntoObjects = false,
recurseIntoArrays = false,
recurseIntoUnions = false,
)
fun PreparedStatement.setValue(
idx: Int,
value: AirbyteValue?,
field: MSSQLQueryBuilder.NamedField
) {
if (
value != null &&
value !is NullValue &&
(field.type.type is UnionType || field.type.type is UnknownType)
) {
setAsJsonString(idx, value)
} else {
when (value) {
is ArrayValue -> setAsJsonString(idx, value)
is BooleanValue -> setAsBooleanValue(idx, value)
is DateValue -> setAsDateValue(idx, value)
is IntegerValue -> setAsIntegerValue(idx, value)
NullValue -> setAsNullValue(idx, field.type.type)
is NumberValue -> setAsNumberValue(idx, value)
is ObjectValue -> setAsJsonString(idx, value)
is StringValue -> setAsStringValue(idx, value, field.type.type)
is TimeWithTimezoneValue -> setAsTime(idx, value)
is TimeWithoutTimezoneValue -> setAsTime(idx, value)
is TimestampWithTimezoneValue -> setAsTimestamp(idx, value)
is TimestampWithoutTimezoneValue -> setAsTimestamp(idx, value)
is UnknownValue -> setAsJsonString(idx, value)
null -> setAsNullValue(idx, field.type.type)
}
}
}
fun PreparedStatement.setAsNullValue(idx: Int, type: AirbyteType) {
val sqlType = toSqlType.convert(type)
setNull(idx, sqlType.sqlType)
}
private fun PreparedStatement.setAsBooleanValue(idx: Int, value: BooleanValue) {
setBoolean(idx, value.value)
}
private fun PreparedStatement.setAsDateValue(idx: Int, value: DateValue) {
setDate(idx, Date.valueOf(value.value))
}
private fun PreparedStatement.setAsIntegerValue(idx: Int, value: IntegerValue) {
setLong(idx, value.value.longValueExact())
}
private fun PreparedStatement.setAsJsonString(idx: Int, value: AirbyteValue) {
setString(idx, Jsons.serialize(toSqlValue.convert(value)))
}
private fun PreparedStatement.setAsNumberValue(idx: Int, value: NumberValue) {
setDouble(idx, value.value.toDouble())
}
private fun PreparedStatement.setAsStringValue(
idx: Int,
value: StringValue,
type: AirbyteType
) {
val sqlType = toSqlType.convert(type).sqlType
if (sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR) {
setString(idx, value.value)
} else {
// TODO: this is a fallback because Values aren't fully typed.
// TODO: this should get refactored once the CDK interface changed wrt types and
// values
if (
sqlType in
setOf(
Types.DATE,
Types.TIME,
Types.TIME_WITH_TIMEZONE,
Types.TIMESTAMP,
Types.TIMESTAMP_WITH_TIMEZONE
)
) {
val coercedValue = valueCoercingMapper.map(value, type)
if (coercedValue.second.isEmpty()) {
when (coercedValue.first) {
is DateValue -> setAsDateValue(idx, coercedValue.first as DateValue)
is TimeWithTimezoneValue ->
setAsTime(idx, coercedValue.first as TimeWithTimezoneValue)
is TimeWithoutTimezoneValue ->
setAsTime(idx, coercedValue.first as TimeWithoutTimezoneValue)
is TimestampWithTimezoneValue ->
setAsTimestamp(
idx,
coercedValue.first as TimestampWithTimezoneValue
)
is TimestampWithoutTimezoneValue ->
setAsTimestamp(
idx,
coercedValue.first as TimestampWithoutTimezoneValue
)
else -> throw IllegalArgumentException("$value isn't a $type")
}
} else {
throw IllegalArgumentException("$value isn't a $type")
}
} else {
throw IllegalArgumentException("$value isn't a $type")
}
}
}
private fun PreparedStatement.setAsTime(idx: Int, value: TimeWithTimezoneValue) {
setObject(idx, value.value)
}
private fun PreparedStatement.setAsTime(idx: Int, value: TimeWithoutTimezoneValue) {
setObject(idx, value.value)
}
private fun PreparedStatement.setAsTimestamp(idx: Int, value: TimestampWithTimezoneValue) {
setObject(idx, value.value)
}
private fun PreparedStatement.setAsTimestamp(
idx: Int,
value: TimestampWithoutTimezoneValue
) {
setObject(idx, value.value)
}
}
}

View File

@@ -43,7 +43,6 @@ import java.util.UUID
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
abstract class MSSQLWriterTest(
@@ -66,7 +65,8 @@ abstract class MSSQLWriterTest(
preserveUndeclaredFields = false,
supportFileTransfer = false,
commitDataIncrementally = true,
allTypesBehavior = StronglyTyped(integerCanBeLarge = false),
allTypesBehavior =
StronglyTyped(integerCanBeLarge = false, nestedFloatLosesPrecision = false),
unknownTypesBehavior = UnknownTypesBehavior.SERIALIZE,
nullEqualsUnset = true,
configUpdater = configUpdater,
@@ -302,15 +302,6 @@ internal class BulkInsert :
)
) { spec -> MSSQLConfigurationFactory().makeWithOverrides(spec, emptyMap()) },
) {
@Disabled(
"temporarily disabling while I work on implementing better type handling in bulk inserts - https://github.com/airbytehq/airbyte-internal-issues/issues/12128 / https://github.com/airbytehq/airbyte/pull/55884"
)
@Test
override fun testUnknownTypes() {
super.testUnknownTypes()
}
companion object {
const val CONFIG_FILE = "secrets/bulk_upload_config.json"
}

View File

@@ -1,131 +0,0 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.mssql.v2.convert
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.UnknownValue
import io.airbyte.cdk.load.util.Jsons
import io.airbyte.cdk.load.util.serializeToJsonBytes
import java.math.BigDecimal
import java.math.BigInteger
import java.sql.Date
import java.sql.Time
import java.sql.Timestamp
import java.time.ZoneOffset
import org.junit.jupiter.api.Assertions.assertArrayEquals
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Test
internal class AirbyteValueToSqlValueTest {
private val converter = AirbyteValueToSqlValue()
@Test
fun testConvertObjectValue() {
val objectValue =
ObjectValue(linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe")))
val result = converter.convert(objectValue)
assertEquals(LinkedHashMap::class.java, result?.javaClass)
assertEquals(mapOf("id" to 42.toBigInteger(), "name" to "John Doe"), result)
}
@Test
fun testConvertArrayValue() {
val arrayValue = ArrayValue(listOf(StringValue("John Doe"), IntegerValue(42L)))
val result = converter.convert(arrayValue)
assertEquals(ArrayList::class.java, result?.javaClass)
assertEquals(listOf("John Doe", 42.toBigInteger()), result)
}
@Test
fun testConvertDateValue() {
val dateValue = DateValue("2024-11-18")
val result = converter.convert(dateValue)
assertEquals(Date::class.java, result?.javaClass)
assertEquals(
dateValue.value.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli(),
(result as Date).time
)
}
@Test
fun testConvertIntegerValue() {
val intValue = IntegerValue(42)
val result = converter.convert(intValue)
assertEquals(BigInteger::class.java, result?.javaClass)
assertEquals(42.toBigInteger(), result)
}
@Test
fun testConvertNullValue() {
val nullValue = NullValue
val result = converter.convert(nullValue)
assertNull(result)
}
@Test
fun testConvertNumberValue() {
val numberValue = NumberValue(42.5.toBigDecimal())
val result = converter.convert(numberValue)
assertEquals(BigDecimal::class.java, result?.javaClass)
assertEquals(42.5.toBigDecimal(), result)
}
@Test
fun testConvertStringValue() {
val stringValue = StringValue("test")
val result = converter.convert(stringValue)
assertEquals(String::class.java, result?.javaClass)
assertEquals("test", result)
}
@Test
fun testConvertTimeValue() {
val timeValue = TimeWithoutTimezoneValue("12:34:56")
val result = converter.convert(timeValue)
assertEquals(Time::class.java, result?.javaClass)
assertEquals(Time.valueOf(timeValue.value).time, (result as Time).time)
}
@Test
fun testConvertTimestampValue() {
val timestampValue = TimestampWithTimezoneValue("2024-11-18T12:34:56Z")
val result = converter.convert(timestampValue)
assertEquals(Timestamp::class.java, result?.javaClass)
assertEquals(
Timestamp.valueOf(timestampValue.value.toLocalDateTime()).time,
(result as Timestamp).time
)
}
@Test
fun testConvertUnknownValue() {
val jsonNode = Jsons.createObjectNode().put("id", "unknownValue")
val unknownValue = UnknownValue(jsonNode)
val result = converter.convert(unknownValue)
assertEquals(ByteArray::class.java, result?.javaClass)
assertArrayEquals(Jsons.writeValueAsBytes(unknownValue.value), result as ByteArray)
}
@Test
fun testObjectMapToJsonBytes() {
val objectValue =
ObjectValue(linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe")))
val objectValueMap = converter.convert(objectValue)
val jsonBytes = objectValueMap?.serializeToJsonBytes()
assertNotNull(jsonBytes)
assertArrayEquals(Jsons.writeValueAsBytes(objectValueMap), jsonBytes)
}
}

View File

@@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.3.19
dockerImageTag: 0.3.20
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake

View File

@@ -210,8 +210,20 @@ internal class S3DataLakeUtilTest {
stream = airbyteStream,
declaredFields =
mapOf(
"id" to EnrichedAirbyteValue(IntegerValue(42L), IntegerType, "id"),
"name" to EnrichedAirbyteValue(StringValue("John Doe"), StringType, "name")
"id" to
EnrichedAirbyteValue(
IntegerValue(42L),
IntegerType,
"id",
airbyteMetaField = null
),
"name" to
EnrichedAirbyteValue(
StringValue("John Doe"),
StringType,
"name",
airbyteMetaField = null
)
),
undeclaredFields = emptyMap(),
emittedAtMs = System.currentTimeMillis(),
@@ -259,13 +271,26 @@ internal class S3DataLakeUtilTest {
stream = airbyteStream,
declaredFields =
mapOf(
"id" to EnrichedAirbyteValue(IntegerValue(42L), IntegerType, "id"),
"name" to EnrichedAirbyteValue(StringValue("John Doe"), StringType, "name"),
"id" to
EnrichedAirbyteValue(
IntegerValue(42L),
IntegerType,
"id",
airbyteMetaField = null
),
"name" to
EnrichedAirbyteValue(
StringValue("John Doe"),
StringType,
"name",
airbyteMetaField = null
),
AIRBYTE_CDC_DELETE_COLUMN to
EnrichedAirbyteValue(
TimestampWithTimezoneValue("2024-01-01T00:00:00Z"),
TimestampTypeWithTimezone,
AIRBYTE_CDC_DELETE_COLUMN
AIRBYTE_CDC_DELETE_COLUMN,
airbyteMetaField = null,
),
),
undeclaredFields = emptyMap(),
@@ -312,8 +337,20 @@ internal class S3DataLakeUtilTest {
stream = airbyteStream,
declaredFields =
mapOf(
"id" to EnrichedAirbyteValue(IntegerValue(42L), IntegerType, "id"),
"name" to EnrichedAirbyteValue(StringValue("John Doe"), StringType, "name"),
"id" to
EnrichedAirbyteValue(
IntegerValue(42L),
IntegerType,
"id",
airbyteMetaField = null
),
"name" to
EnrichedAirbyteValue(
StringValue("John Doe"),
StringType,
"name",
airbyteMetaField = null
),
),
undeclaredFields = emptyMap(),
emittedAtMs = System.currentTimeMillis(),

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.5
dockerImageTag: 1.5.6
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg

View File

@@ -158,6 +158,7 @@ See the [Getting Started: Configuration section](#configuration) of this guide f
| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
| 2.1.0 | 2025-03-24 | [55849](https://github.com/airbytehq/airbyte/pull/55849) | Misc. bugfixes in type-handling (esp. in complex types) |
| 2.0.5 | 2025-03-24 | [55904](https://github.com/airbytehq/airbyte/pull/55904) | Fix handling of invalid schemas (correctly JSON-serialize values) |
| 2.0.4 | 2025-03-20 | [55886](https://github.com/airbytehq/airbyte/pull/55886) | Internal refactor |
| 2.0.3 | 2025-03-18 | [55811](https://github.com/airbytehq/airbyte/pull/55811) | CDK: Pass DestinationStream around vs Descriptor |

View File

@@ -1,6 +1,6 @@
# S3 Data Lake
This page guides you through setting up the S3 Data Lake destination connector.
This page guides you through setting up the S3 Data Lake destination connector.
This connector writes the Iceberg table format to S3 or an S3-compatible storage backend using a supported Iceberg catalog. This is Airbyte's preferred Iceberg integration. It replaces the older [Iceberg](iceberg) and [S3-Glue](s3-glue) destinations, performs better, and implements Airbyte's newer core features. You should switch to this destination when you can.
@@ -205,7 +205,7 @@ To authenticate with Nessie, do two things.
### How Airbyte generates the Iceberg schema
In each stream, Airbyte maps top-level fields to Iceberg fields. Airbyte maps nested fields (objects, arrays, and unions) to string columns and writes them as serialized JSON.
In each stream, Airbyte maps top-level fields to Iceberg fields. Airbyte maps nested fields (objects, arrays, and unions) to string columns and writes them as serialized JSON.
This is the full mapping between Airbyte types and Iceberg types.
@@ -307,6 +307,7 @@ Now, you can identify the latest version of the 'Alice' record by querying wheth
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--------------------------------------------------------- |:-------------------------------------------------------------------------------|
| 0.3.20 | 2025-03-24 | [\#55849](https://github.com/airbytehq/airbyte/pull/55849) | Internal refactoring |
| 0.3.19 | 2025-03-19 | [\#55798](https://github.com/airbytehq/airbyte/pull/55798) | CDK: Typing improvements |
| 0.3.18 | 2025-03-18 | [\#55811](https://github.com/airbytehq/airbyte/pull/55811) | CDK: Pass DestinationStream around vs Descriptor |
| 0.3.17 | 2025-03-13 | [\#55737](https://github.com/airbytehq/airbyte/pull/55737) | CDK: Pass DestinationRecordRaw around instead of DestinationRecordAirbyteValue |

View File

@@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou
| Version | Date | Pull Request | Subject |
|:------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.5.6 | 2025-03-24 | [55849](https://github.com/airbytehq/airbyte/pull/55849) | Internal refactoring |
| 1.5.5 | 2025-03-20 | [55875](https://github.com/airbytehq/airbyte/pull/55875) | Bugfix: Sync Can Hang on OOM |
| 1.5.4 | 2025-03-05 | [54695](https://github.com/airbytehq/airbyte/pull/54695) | Nonfunctional changes to support performance testing |
| 1.5.3 | 2025-03-04 | [54661](https://github.com/airbytehq/airbyte/pull/54661) | Nonfunctional changes to support performance testing |