1
0
mirror of synced 2025-12-19 18:14:56 -05:00

Bulk Load CDK: Break out InputMessage from DestinationMessage (#49962)

This commit is contained in:
Johnny Schmidt
2024-12-19 16:46:14 -08:00
committed by GitHub
parent 9a327de6a6
commit 5780fc8fd3
17 changed files with 235 additions and 186 deletions

View File

@@ -4,17 +4,15 @@
package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.Meta
class AirbyteTypeToAirbyteTypeWithMeta(private val flatten: Boolean) {
fun convert(schema: AirbyteType): ObjectType {
val properties =
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to
FieldType(StringType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
Meta.COLUMN_NAME_AB_RAW_ID to FieldType(StringType, nullable = false),
Meta.COLUMN_NAME_AB_EXTRACTED_AT to FieldType(IntegerType, nullable = false),
Meta.COLUMN_NAME_AB_META to
FieldType(
nullable = false,
type =
@@ -54,8 +52,7 @@ class AirbyteTypeToAirbyteTypeWithMeta(private val flatten: Boolean) {
)
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false)
Meta.COLUMN_NAME_AB_GENERATION_ID to FieldType(IntegerType, nullable = false)
)
if (flatten) {
if (schema is ObjectType) {
@@ -68,8 +65,7 @@ class AirbyteTypeToAirbyteTypeWithMeta(private val flatten: Boolean) {
)
}
} else {
properties[DestinationRecord.Meta.COLUMN_NAME_DATA] =
FieldType(schema, nullable = false)
properties[Meta.COLUMN_NAME_DATA] = FieldType(schema, nullable = false)
}
return ObjectType(properties)
}

View File

@@ -4,7 +4,7 @@
package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.Meta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
@@ -12,8 +12,8 @@ interface AirbyteValueMapper {
fun map(
value: AirbyteValue,
schema: AirbyteType,
changes: List<DestinationRecord.Change> = emptyList()
): Pair<AirbyteValue, List<DestinationRecord.Change>>
changes: List<Meta.Change> = emptyList()
): Pair<AirbyteValue, List<Meta.Change>>
}
/** An optimized identity mapper that just passes through. */
@@ -21,22 +21,22 @@ class AirbyteValueNoopMapper : AirbyteValueMapper {
override fun map(
value: AirbyteValue,
schema: AirbyteType,
changes: List<DestinationRecord.Change>
): Pair<AirbyteValue, List<DestinationRecord.Change>> = value to changes
changes: List<Meta.Change>
): Pair<AirbyteValue, List<Meta.Change>> = value to changes
}
open class AirbyteValueIdentityMapper : AirbyteValueMapper {
data class Context(
val nullable: Boolean = false,
val path: List<String> = emptyList(),
val changes: MutableSet<DestinationRecord.Change> = mutableSetOf(),
val changes: MutableSet<Meta.Change> = mutableSetOf(),
)
override fun map(
value: AirbyteValue,
schema: AirbyteType,
changes: List<DestinationRecord.Change>
): Pair<AirbyteValue, List<DestinationRecord.Change>> =
changes: List<Meta.Change>
): Pair<AirbyteValue, List<Meta.Change>> =
mapInner(value, schema, Context(changes = changes.toMutableSet())).let {
it.first to it.second.changes.toList()
}
@@ -46,9 +46,7 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
context: Context,
reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR
): Pair<AirbyteValue, Context> {
context.changes.add(
DestinationRecord.Change(context.path.joinToString("."), Change.NULLED, reason)
)
context.changes.add(Meta.Change(context.path.joinToString("."), Change.NULLED, reason))
return mapInner(NullValue, schema, context)
}

View File

@@ -6,7 +6,7 @@ package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecord.Meta
import io.airbyte.cdk.load.message.Meta
import java.util.*
class DestinationRecordToAirbyteValueWithMeta(
@@ -51,7 +51,7 @@ class DestinationRecordToAirbyteValueWithMeta(
}
}
fun Pair<AirbyteValue, List<DestinationRecord.Change>>.withAirbyteMeta(
fun Pair<AirbyteValue, List<Meta.Change>>.withAirbyteMeta(
stream: DestinationStream,
emittedAtMs: Long,
flatten: Boolean = false

View File

@@ -5,7 +5,7 @@
package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.DestinationRecord.Change
import io.airbyte.cdk.load.message.Meta.Change
class MapperPipeline(
inputSchema: AirbyteType,

View File

@@ -9,10 +9,9 @@ import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.json.AirbyteValueToJson
import io.airbyte.cdk.load.data.json.JsonToAirbyteValue
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.message.CheckpointMessage.Checkpoint
import io.airbyte.cdk.load.message.CheckpointMessage.Stats
import io.airbyte.cdk.load.util.deserializeToNode
@@ -47,49 +46,27 @@ sealed interface DestinationRecordDomainMessage : DestinationStreamAffinedMessag
sealed interface DestinationFileDomainMessage : DestinationStreamAffinedMessage
data class DestinationRecord(
override val stream: DestinationStream.Descriptor,
val data: AirbyteValue,
val emittedAtMs: Long,
val meta: Meta?,
val serialized: String,
) : DestinationRecordDomainMessage {
/** Convenience constructor, primarily intended for use in tests. */
constructor(
namespace: String?,
name: String,
data: String,
emittedAtMs: Long,
changes: MutableList<Change> = mutableListOf(),
) : this(
stream = DestinationStream.Descriptor(namespace, name),
data = JsonToAirbyteValue().convert(data.deserializeToNode(), ObjectTypeWithoutSchema),
emittedAtMs = emittedAtMs,
meta = Meta(changes),
serialized = "",
)
data class Meta(val changes: List<Change> = mutableListOf()) {
companion object {
const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id"
const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at"
const val COLUMN_NAME_AB_META: String = "_airbyte_meta"
const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id"
const val COLUMN_NAME_DATA: String = "_airbyte_data"
val COLUMN_NAMES =
setOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID,
)
}
fun asProtocolObject(): AirbyteRecordMessageMeta =
AirbyteRecordMessageMeta()
.withChanges(changes.map { change -> change.asProtocolObject() })
data class Meta(
val changes: List<Change> = mutableListOf(),
) {
companion object {
const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id"
const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at"
const val COLUMN_NAME_AB_META: String = "_airbyte_meta"
const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id"
const val COLUMN_NAME_DATA: String = "_airbyte_data"
val COLUMN_NAMES =
setOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID,
)
}
fun asProtocolObject(): AirbyteRecordMessageMeta =
AirbyteRecordMessageMeta().withChanges(changes.map { change -> change.asProtocolObject() })
data class Change(
val field: String,
// Using the raw protocol enums here.
@@ -100,7 +77,15 @@ data class DestinationRecord(
fun asProtocolObject(): AirbyteRecordMessageMetaChange =
AirbyteRecordMessageMetaChange().withField(field).withChange(change).withReason(reason)
}
}
data class DestinationRecord(
override val stream: DestinationStream.Descriptor,
val data: AirbyteValue,
val emittedAtMs: Long,
val meta: Meta?,
val serialized: String,
) : DestinationStreamAffinedMessage {
override fun asProtocolMessage(): AirbyteMessage =
AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
@@ -109,10 +94,10 @@ data class DestinationRecord(
.withStream(stream.name)
.withNamespace(stream.namespace)
.withEmittedAt(emittedAtMs)
.withData(AirbyteValueToJson().convert(data))
.withData(data.toJson())
.also {
if (meta != null) {
it.meta = meta.asProtocolObject()
it.withMeta(meta.asProtocolObject())
}
}
)
@@ -415,12 +400,12 @@ class DestinationMessageFactory(
?: ObjectValue(linkedMapOf()),
emittedAtMs = message.record.emittedAt,
meta =
DestinationRecord.Meta(
Meta(
changes =
message.record.meta
?.changes
?.map {
DestinationRecord.Change(
Meta.Change(
field = it.field,
change = it.change,
reason = it.reason,

View File

@@ -4,17 +4,16 @@
package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.Meta
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
internal class AirbyteTypeToAirbyteTypeWithMetaTest {
private val expectedMeta =
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to FieldType(StringType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
Meta.COLUMN_NAME_AB_RAW_ID to FieldType(StringType, nullable = false),
Meta.COLUMN_NAME_AB_EXTRACTED_AT to FieldType(IntegerType, nullable = false),
Meta.COLUMN_NAME_AB_META to
FieldType(
ObjectType(
linkedMapOf(
@@ -42,8 +41,7 @@ internal class AirbyteTypeToAirbyteTypeWithMetaTest {
),
nullable = false
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false)
Meta.COLUMN_NAME_AB_GENERATION_ID to FieldType(IntegerType, nullable = false)
)
@Test
@@ -58,8 +56,7 @@ internal class AirbyteTypeToAirbyteTypeWithMetaTest {
)
val withMeta = schema.withAirbyteMeta(flatten = false)
val expected = ObjectType(expectedMeta)
expected.properties[DestinationRecord.Meta.COLUMN_NAME_DATA] =
FieldType(schema, nullable = false)
expected.properties[Meta.COLUMN_NAME_DATA] = FieldType(schema, nullable = false)
assertEquals(expected, withMeta)
}

View File

@@ -6,6 +6,7 @@ package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.Meta
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
@@ -17,15 +18,15 @@ class DestinationRecordToAirbyteValueWithMetaTest {
val expectedMeta =
linkedMapOf(
// Don't do raw_id, we'll evict it and validate that it's a uuid
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to IntegerValue(emittedAtMs),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
Meta.COLUMN_NAME_AB_EXTRACTED_AT to IntegerValue(emittedAtMs),
Meta.COLUMN_NAME_AB_META to
ObjectValue(
linkedMapOf(
"sync_id" to IntegerValue(syncId),
"changes" to ArrayValue(emptyList())
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to IntegerValue(generationId)
Meta.COLUMN_NAME_AB_GENERATION_ID to IntegerValue(generationId)
)
@Test
@@ -39,18 +40,10 @@ class DestinationRecordToAirbyteValueWithMetaTest {
)
)
val expected = LinkedHashMap(expectedMeta)
expected[DestinationRecord.Meta.COLUMN_NAME_DATA] = data
val mockRecord =
DestinationRecord(
stream.descriptor,
data,
emittedAtMs,
DestinationRecord.Meta(),
"dummy"
)
expected[Meta.COLUMN_NAME_DATA] = data
val mockRecord = DestinationRecord(stream.descriptor, data, emittedAtMs, Meta(), "dummy")
val withMeta = mockRecord.dataWithAirbyteMeta(stream, flatten = false)
val uuid =
withMeta.values.remove(DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID) as StringValue
val uuid = withMeta.values.remove(Meta.COLUMN_NAME_AB_RAW_ID) as StringValue
Assertions.assertTrue(
uuid.value.matches(
Regex("[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}")
@@ -71,16 +64,9 @@ class DestinationRecordToAirbyteValueWithMetaTest {
)
val expected = LinkedHashMap(expectedMeta)
data.values.forEach { (name, value) -> expected[name] = value }
val mockRecord =
DestinationRecord(
stream.descriptor,
data,
emittedAtMs,
DestinationRecord.Meta(),
"dummy"
)
val mockRecord = DestinationRecord(stream.descriptor, data, emittedAtMs, Meta(), "dummy")
val withMeta = mockRecord.dataWithAirbyteMeta(stream, flatten = true)
withMeta.values.remove(DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID)
withMeta.values.remove(Meta.COLUMN_NAME_AB_RAW_ID)
Assertions.assertEquals(expected, withMeta.values)
}
}

View File

@@ -4,7 +4,7 @@
package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.test.util.Root
import io.airbyte.cdk.load.test.util.ValueTestBuilder
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
@@ -31,7 +31,7 @@ class NullOutOfRangeIntegersTest {
Assertions.assertEquals(expectedValue, actualValue)
Assertions.assertEquals(1, changes.size)
Assertions.assertEquals(
DestinationRecord.Change(
Meta.Change(
"big_integer",
Change.NULLED,
Reason.DESTINATION_FIELD_SIZE_LIMITATION,
@@ -67,12 +67,12 @@ class NullOutOfRangeIntegersTest {
Assertions.assertEquals(expectedValue, actualValue)
Assertions.assertEquals(
setOf(
DestinationRecord.Change(
Meta.Change(
"too_small",
Change.NULLED,
Reason.DESTINATION_FIELD_SIZE_LIMITATION,
),
DestinationRecord.Change(
Meta.Change(
"too_big",
Change.NULLED,
Reason.DESTINATION_FIELD_SIZE_LIMITATION,

View File

@@ -0,0 +1,101 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.message
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.json.JsonToAirbyteValue
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.message.CheckpointMessage.Checkpoint
import io.airbyte.cdk.load.message.CheckpointMessage.Stats
import io.airbyte.cdk.load.util.deserializeToNode
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
sealed interface InputMessage {
fun asProtocolMessage(): AirbyteMessage
}
data class InputRecord(
val stream: DestinationStream.Descriptor,
val data: AirbyteValue,
val emittedAtMs: Long,
val meta: Meta?,
val serialized: String,
) : InputMessage {
/** Convenience constructor, primarily intended for use in tests. */
constructor(
namespace: String?,
name: String,
data: String,
emittedAtMs: Long,
changes: MutableList<Meta.Change> = mutableListOf(),
) : this(
stream = DestinationStream.Descriptor(namespace, name),
data = JsonToAirbyteValue().convert(data.deserializeToNode(), ObjectTypeWithoutSchema),
emittedAtMs = emittedAtMs,
meta = Meta(changes),
serialized = "",
)
override fun asProtocolMessage(): AirbyteMessage =
AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withEmittedAt(emittedAtMs)
.withData(data.toJson())
.also {
if (meta != null) {
it.withMeta(meta.asProtocolObject())
}
}
)
}
data class InputFile(
val file: DestinationFile,
) : InputMessage {
constructor(
stream: DestinationStream.Descriptor,
emittedAtMs: Long,
fileMessage: DestinationFile.AirbyteRecordMessageFile,
serialized: String = ""
) : this(
DestinationFile(
stream,
emittedAtMs,
serialized,
fileMessage,
)
)
override fun asProtocolMessage(): AirbyteMessage = file.asProtocolMessage()
}
sealed interface InputCheckpoint : InputMessage
data class InputStreamCheckpoint(val checkpoint: StreamCheckpoint) : InputCheckpoint {
constructor(
streamNamespace: String?,
streamName: String,
blob: String,
sourceRecordCount: Long,
destinationRecordCount: Long? = null,
) : this(
StreamCheckpoint(
Checkpoint(
DestinationStream.Descriptor(streamNamespace, streamName),
state = blob.deserializeToNode()
),
Stats(sourceRecordCount),
destinationRecordCount?.let { Stats(it) },
emptyMap(),
)
)
override fun asProtocolMessage(): AirbyteMessage = checkpoint.asProtocolMessage()
}

View File

@@ -9,7 +9,7 @@ import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.Meta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.time.Instant
import java.util.*
@@ -17,26 +17,18 @@ import kotlin.collections.LinkedHashMap
class AirbyteValueWithMetaToOutputRecord {
fun convert(value: ObjectValue): OutputRecord {
val meta = value.values[DestinationRecord.Meta.COLUMN_NAME_AB_META] as ObjectValue
val meta = value.values[Meta.COLUMN_NAME_AB_META] as ObjectValue
return OutputRecord(
rawId =
UUID.fromString(
(value.values[DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID] as StringValue)
.value
),
UUID.fromString((value.values[Meta.COLUMN_NAME_AB_RAW_ID] as StringValue).value),
extractedAt =
Instant.ofEpochMilli(
(value.values[DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT]
as IntegerValue)
.value
.toLong()
(value.values[Meta.COLUMN_NAME_AB_EXTRACTED_AT] as IntegerValue).value.toLong()
),
loadedAt = null,
data = value.values[DestinationRecord.Meta.COLUMN_NAME_DATA] as ObjectValue,
data = value.values[Meta.COLUMN_NAME_DATA] as ObjectValue,
generationId =
(value.values[DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID] as IntegerValue)
.value
.toLong(),
(value.values[Meta.COLUMN_NAME_AB_GENERATION_ID] as IntegerValue).value.toLong(),
airbyteMeta =
OutputRecord.Meta(
syncId = (meta.values["sync_id"] as IntegerValue).value.toLong(),
@@ -44,7 +36,7 @@ class AirbyteValueWithMetaToOutputRecord {
(meta.values["changes"] as ArrayValue)
.values
.map {
DestinationRecord.Change(
Meta.Change(
field =
((it as ObjectValue).values["field"] as StringValue).value,
change =
@@ -68,11 +60,10 @@ fun AirbyteValue.maybeUnflatten(wasFlattened: Boolean): ObjectValue {
if (!wasFlattened) {
return this
}
val (meta, data) =
this.values.toList().partition { DestinationRecord.Meta.COLUMN_NAMES.contains(it.first) }
val (meta, data) = this.values.toList().partition { Meta.COLUMN_NAMES.contains(it.first) }
val properties = LinkedHashMap(meta.toMap())
val dataObject = ObjectValue(LinkedHashMap(data.toMap()))
properties[DestinationRecord.Meta.COLUMN_NAME_DATA] = dataObject
properties[Meta.COLUMN_NAME_DATA] = dataObject
return ObjectValue(properties)
}

View File

@@ -8,9 +8,9 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecordStreamComplete
import io.airbyte.cdk.load.message.InputMessage
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.test.util.destination_process.DestinationProcessFactory
import io.airbyte.cdk.load.test.util.destination_process.DestinationUncleanExitException
@@ -125,7 +125,7 @@ abstract class IntegrationTest(
fun runSync(
configContents: String,
stream: DestinationStream,
messages: List<DestinationMessage>,
messages: List<InputMessage>,
streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE,
useFileTransfer: Boolean = false,
): List<AirbyteMessage> =
@@ -146,7 +146,7 @@ abstract class IntegrationTest(
fun runSync(
configContents: String,
catalog: DestinationCatalog,
messages: List<DestinationMessage>,
messages: List<InputMessage>,
/**
* If you set this to anything other than `COMPLETE`, you may run into a race condition.
* It's recommended that you send an explicit state message in [messages], and run the sync
@@ -207,7 +207,7 @@ abstract class IntegrationTest(
fun runSyncUntilStateAck(
configContents: String,
stream: DestinationStream,
records: List<DestinationRecord>,
records: List<InputRecord>,
inputStateMessage: StreamCheckpoint,
allowGracefulShutdown: Boolean,
useFileTransfer: Boolean = false,

View File

@@ -5,7 +5,7 @@
package io.airbyte.cdk.load.test.util
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.message.DestinationRecord.Change
import io.airbyte.cdk.load.message.Meta.Change
import java.time.Instant
import java.util.UUID

View File

@@ -36,8 +36,10 @@ import io.airbyte.cdk.load.data.TimestampValue
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecord.Change
import io.airbyte.cdk.load.message.InputFile
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.message.InputStreamCheckpoint
import io.airbyte.cdk.load.message.Meta.Change
import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.DestinationDataDumper
@@ -144,7 +146,7 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
stream,
listOf(
DestinationRecord(
InputRecord(
namespace = randomizedNamespace,
name = "test_stream",
data = """{"id": 5678, "undeclared": "asdf"}""",
@@ -160,7 +162,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
)
),
StreamCheckpoint(
InputStreamCheckpoint(
streamName = "test_stream",
streamNamespace = randomizedNamespace,
blob = """{"foo": "bar"}""",
@@ -256,13 +258,12 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
stream,
listOf(
DestinationFile(
InputFile(
stream = stream.descriptor,
emittedAtMs = 1234,
serialized = "",
fileMessage = fileMessage,
),
StreamCheckpoint(
InputStreamCheckpoint(
streamName = stream.descriptor.name,
streamNamespace = stream.descriptor.namespace,
blob = """{"foo": "bar"}""",
@@ -316,7 +317,7 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
stream,
listOf(
DestinationRecord(
InputRecord(
namespace = randomizedNamespace,
name = "test_stream",
data = """{"id": 12}""",
@@ -402,13 +403,13 @@ abstract class BasicFunctionalityIntegrationTest(
)
),
listOf(
DestinationRecord(
InputRecord(
namespace = stream1.descriptor.namespace,
name = stream1.descriptor.name,
data = """{"id": 1234}""",
emittedAtMs = 1234,
),
DestinationRecord(
InputRecord(
namespace = stream2.descriptor.namespace,
name = stream2.descriptor.name,
data = """{"id": 5678}""",
@@ -502,7 +503,7 @@ abstract class BasicFunctionalityIntegrationTest(
// The id field is always 42, and the string fields are always "foo\nbar".
val messages =
catalog.streams.map { stream ->
DestinationRecord(
InputRecord(
stream.descriptor,
ObjectValue(
(stream.schema as ObjectType)
@@ -560,7 +561,7 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
makeStream(generationId = 12, minimumGenerationId = 0, syncId = 42),
listOf(
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "name": "first_value"}""",
@@ -573,7 +574,7 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
finalStream,
listOf(
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "name": "second_value"}""",
@@ -611,7 +612,7 @@ abstract class BasicFunctionalityIntegrationTest(
open fun testInterruptedTruncateWithPriorData() {
assumeTrue(verifyDataWriting)
fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) =
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""",
@@ -780,7 +781,7 @@ abstract class BasicFunctionalityIntegrationTest(
open fun testInterruptedTruncateWithoutPriorData() {
assumeTrue(verifyDataWriting)
fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) =
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""",
@@ -901,7 +902,7 @@ abstract class BasicFunctionalityIntegrationTest(
open fun resumeAfterCancelledTruncate() {
assumeTrue(verifyDataWriting)
fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) =
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""",
@@ -1100,7 +1101,7 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
makeStream(syncId = 42),
listOf(
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "name": "first_value"}""",
@@ -1113,7 +1114,7 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
finalStream,
listOf(
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "name": "second_value"}""",
@@ -1169,7 +1170,7 @@ abstract class BasicFunctionalityIntegrationTest(
linkedMapOf("id" to intType, "to_drop" to stringType, "to_change" to intType)
),
listOf(
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "to_drop": "val1", "to_change": 42}""",
@@ -1186,7 +1187,7 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
finalStream,
listOf(
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "to_change": "val2", "to_add": "val3"}""",
@@ -1248,7 +1249,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = syncId,
)
fun makeRecord(data: String, extractedAt: Long) =
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
data,
@@ -1393,7 +1394,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
fun makeRecord(cursorName: String) =
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
data = """{"id": 1, "$cursorName": 1, "name": "foo_$cursorName"}""",
@@ -1453,7 +1454,7 @@ abstract class BasicFunctionalityIntegrationTest(
}
val messages =
(0..manyStreamCount).map { i ->
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream_$i",
"""{"id": 1, "name": "foo_$i"}""",
@@ -1507,7 +1508,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
fun makeRecord(data: String) =
DestinationRecord(
InputRecord(
randomizedNamespace,
"test_stream",
data,
@@ -1809,7 +1810,7 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
stream,
listOf(
DestinationRecord(
InputRecord(
randomizedNamespace,
"problematic_types",
"""
@@ -1824,7 +1825,7 @@ abstract class BasicFunctionalityIntegrationTest(
}""".trimIndent(),
emittedAtMs = 1602637589100,
),
DestinationRecord(
InputRecord(
randomizedNamespace,
"problematic_types",
"""
@@ -1839,7 +1840,7 @@ abstract class BasicFunctionalityIntegrationTest(
}""".trimIndent(),
emittedAtMs = 1602637589200,
),
DestinationRecord(
InputRecord(
randomizedNamespace,
"problematic_types",
"""
@@ -2066,7 +2067,7 @@ abstract class BasicFunctionalityIntegrationTest(
configContents,
stream,
listOf(
DestinationRecord(
InputRecord(
randomizedNamespace,
"problematic_types",
"""
@@ -2080,7 +2081,7 @@ abstract class BasicFunctionalityIntegrationTest(
}""".trimIndent(),
emittedAtMs = 1602637589100,
),
DestinationRecord(
InputRecord(
randomizedNamespace,
"problematic_types",
"""
@@ -2094,7 +2095,7 @@ abstract class BasicFunctionalityIntegrationTest(
}""".trimIndent(),
emittedAtMs = 1602637589200,
),
DestinationRecord(
InputRecord(
randomizedNamespace,
"problematic_types",
"""

View File

@@ -16,7 +16,7 @@ data:
type: GSM
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.16
dockerImageTag: 0.2.0
dockerRepository: airbyte/destination-iceberg-v2
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
githubIssueLabel: destination-iceberg-v2

View File

@@ -8,7 +8,7 @@ import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
@@ -53,7 +53,7 @@ object IcebergV2DataDumper : DestinationDataDumper {
private fun getMetaData(record: Record): OutputRecord.Meta {
val airbyteMeta =
record.getField(DestinationRecord.Meta.COLUMN_NAME_AB_META) as? Record
record.getField(Meta.COLUMN_NAME_AB_META) as? Record
?: throw IllegalStateException("Received no metadata in the record.")
val syncId = airbyteMeta.getField("sync_id") as? Long
@@ -73,7 +73,7 @@ object IcebergV2DataDumper : DestinationDataDumper {
AirbyteRecordMessageMetaChange.Reason.fromValue(
change.getField("reason") as String
)
DestinationRecord.Change(field, changeValue, reason)
Meta.Change(field, changeValue, reason)
}
return OutputRecord.Meta(syncId = syncId, changes = metaChanges)
@@ -100,20 +100,13 @@ object IcebergV2DataDumper : DestinationDataDumper {
outputRecords.add(
OutputRecord(
rawId =
UUID.fromString(
record
.getField(DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID)
.toString()
),
UUID.fromString(record.getField(Meta.COLUMN_NAME_AB_RAW_ID).toString()),
extractedAt =
Instant.ofEpochMilli(
record.getField(DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT)
as Long
record.getField(Meta.COLUMN_NAME_AB_EXTRACTED_AT) as Long
),
loadedAt = null,
generationId =
record.getField(DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID)
as Long,
generationId = record.getField(Meta.COLUMN_NAME_AB_GENERATION_ID) as Long,
data = getCastedData(schema, record),
airbyteMeta = getMetaData(record)
)

View File

@@ -19,10 +19,10 @@ import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema
import io.airbyte.cdk.load.data.withAirbyteMeta
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_META
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import io.mockk.every

View File

@@ -22,10 +22,11 @@ import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimestampValue
import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_META
import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration
import io.mockk.every
import io.mockk.mockk
@@ -188,7 +189,7 @@ internal class IcebergUtilTest {
linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe"))
),
emittedAtMs = System.currentTimeMillis(),
meta = DestinationRecord.Meta(),
meta = Meta(),
serialized = "{\"id\":42, \"name\":\"John Doe\"}"
)
val pipeline = ParquetMapperPipelineFactory().create(airbyteStream)
@@ -240,7 +241,7 @@ internal class IcebergUtilTest {
)
),
emittedAtMs = System.currentTimeMillis(),
meta = DestinationRecord.Meta(),
meta = Meta(),
serialized = "{\"id\":42, \"name\":\"John Doe\"}"
)
val pipeline = ParquetMapperPipelineFactory().create(airbyteStream)
@@ -288,7 +289,7 @@ internal class IcebergUtilTest {
linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe"))
),
emittedAtMs = System.currentTimeMillis(),
meta = DestinationRecord.Meta(),
meta = Meta(),
serialized = "{\"id\":42, \"name\":\"John Doe\"}"
)
val pipeline = ParquetMapperPipelineFactory().create(airbyteStream)