Bulk load CDK: DestinationRecord has full DestinationStream (#55811)
Co-authored-by: Francis Genet <francis.genet@airbyte.io>
This commit is contained in:
@@ -84,7 +84,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
|
||||
is LocalBatch -> {
|
||||
log.info { "Persisting ${batch.records.size} records for ${stream.descriptor}" }
|
||||
batch.records.forEach {
|
||||
val filename = getFilename(it.stream, staging = true)
|
||||
val filename = getFilename(it.stream.descriptor, staging = true)
|
||||
val record =
|
||||
OutputRecord(
|
||||
UUID.randomUUID(),
|
||||
|
||||
@@ -45,7 +45,7 @@ sealed interface DestinationMessage {
|
||||
|
||||
/** Records. */
|
||||
sealed interface DestinationStreamAffinedMessage : DestinationMessage {
|
||||
val stream: DestinationStream.Descriptor
|
||||
val stream: DestinationStream
|
||||
}
|
||||
|
||||
sealed interface DestinationRecordDomainMessage : DestinationStreamAffinedMessage
|
||||
@@ -123,7 +123,7 @@ data class Meta(
|
||||
}
|
||||
|
||||
data class DestinationRecord(
|
||||
override val stream: DestinationStream.Descriptor,
|
||||
override val stream: DestinationStream,
|
||||
val message: AirbyteMessage,
|
||||
val serialized: String,
|
||||
val schema: AirbyteType
|
||||
@@ -153,14 +153,11 @@ data class DestinationRecord(
|
||||
* Represents a record already in its serialized state. The intended use is for conveying records
|
||||
* from stdin to the spill file, where reserialization is not necessary.
|
||||
*/
|
||||
data class DestinationRecordSerialized(
|
||||
val stream: DestinationStream.Descriptor,
|
||||
val serialized: String
|
||||
)
|
||||
data class DestinationRecordSerialized(val stream: DestinationStream, val serialized: String)
|
||||
|
||||
/** Represents a record both deserialized AND marshaled to airbyte value. The marshaling */
|
||||
data class DestinationRecordAirbyteValue(
|
||||
val stream: DestinationStream.Descriptor,
|
||||
val stream: DestinationStream,
|
||||
val data: AirbyteValue,
|
||||
val emittedAtMs: Long,
|
||||
val meta: Meta?,
|
||||
@@ -168,7 +165,7 @@ data class DestinationRecordAirbyteValue(
|
||||
)
|
||||
|
||||
data class EnrichedDestinationRecordAirbyteValue(
|
||||
val stream: DestinationStream.Descriptor,
|
||||
val stream: DestinationStream,
|
||||
val declaredFields: Map<String, EnrichedAirbyteValue>,
|
||||
val airbyteMetaFields: Map<String, EnrichedAirbyteValue>,
|
||||
val undeclaredFields: Map<String, JsonNode>,
|
||||
@@ -178,7 +175,7 @@ data class EnrichedDestinationRecordAirbyteValue(
|
||||
)
|
||||
|
||||
data class DestinationRecordRaw(
|
||||
val stream: DestinationStream.Descriptor,
|
||||
val stream: DestinationStream,
|
||||
private val rawData: AirbyteMessage,
|
||||
private val serialized: String
|
||||
) {
|
||||
@@ -205,7 +202,7 @@ data class DestinationRecordRaw(
|
||||
}
|
||||
|
||||
data class DestinationFile(
|
||||
override val stream: DestinationStream.Descriptor,
|
||||
override val stream: DestinationStream,
|
||||
val emittedAtMs: Long,
|
||||
val serialized: String,
|
||||
val fileMessage: AirbyteRecordMessageFile
|
||||
@@ -274,8 +271,8 @@ data class DestinationFile(
|
||||
.withType(AirbyteMessage.Type.RECORD)
|
||||
.withRecord(
|
||||
AirbyteRecordMessage()
|
||||
.withStream(stream.name)
|
||||
.withNamespace(stream.namespace)
|
||||
.withStream(stream.descriptor.name)
|
||||
.withNamespace(stream.descriptor.namespace)
|
||||
.withEmittedAt(emittedAtMs)
|
||||
.withAdditionalProperty("file", file)
|
||||
)
|
||||
@@ -301,35 +298,35 @@ private fun statusToProtocolMessage(
|
||||
)
|
||||
|
||||
data class DestinationRecordStreamComplete(
|
||||
override val stream: DestinationStream.Descriptor,
|
||||
override val stream: DestinationStream,
|
||||
val emittedAtMs: Long,
|
||||
) : DestinationRecordDomainMessage {
|
||||
override fun asProtocolMessage(): AirbyteMessage =
|
||||
statusToProtocolMessage(stream, emittedAtMs, AirbyteStreamStatus.COMPLETE)
|
||||
statusToProtocolMessage(stream.descriptor, emittedAtMs, AirbyteStreamStatus.COMPLETE)
|
||||
}
|
||||
|
||||
data class DestinationRecordStreamIncomplete(
|
||||
override val stream: DestinationStream.Descriptor,
|
||||
override val stream: DestinationStream,
|
||||
val emittedAtMs: Long,
|
||||
) : DestinationRecordDomainMessage {
|
||||
override fun asProtocolMessage(): AirbyteMessage =
|
||||
statusToProtocolMessage(stream, emittedAtMs, AirbyteStreamStatus.INCOMPLETE)
|
||||
statusToProtocolMessage(stream.descriptor, emittedAtMs, AirbyteStreamStatus.INCOMPLETE)
|
||||
}
|
||||
|
||||
data class DestinationFileStreamComplete(
|
||||
override val stream: DestinationStream.Descriptor,
|
||||
override val stream: DestinationStream,
|
||||
val emittedAtMs: Long,
|
||||
) : DestinationFileDomainMessage {
|
||||
override fun asProtocolMessage(): AirbyteMessage =
|
||||
statusToProtocolMessage(stream, emittedAtMs, AirbyteStreamStatus.COMPLETE)
|
||||
statusToProtocolMessage(stream.descriptor, emittedAtMs, AirbyteStreamStatus.COMPLETE)
|
||||
}
|
||||
|
||||
data class DestinationFileStreamIncomplete(
|
||||
override val stream: DestinationStream.Descriptor,
|
||||
override val stream: DestinationStream,
|
||||
val emittedAtMs: Long,
|
||||
) : DestinationFileDomainMessage {
|
||||
override fun asProtocolMessage(): AirbyteMessage =
|
||||
statusToProtocolMessage(stream, emittedAtMs, AirbyteStreamStatus.INCOMPLETE)
|
||||
statusToProtocolMessage(stream.descriptor, emittedAtMs, AirbyteStreamStatus.INCOMPLETE)
|
||||
}
|
||||
|
||||
/** State. */
|
||||
@@ -485,7 +482,7 @@ class DestinationMessageFactory(
|
||||
message.record.additionalProperties["file"] as Map<String, Any>
|
||||
|
||||
DestinationFile(
|
||||
stream = stream.descriptor,
|
||||
stream = stream,
|
||||
emittedAtMs = message.record.emittedAt,
|
||||
serialized = serialized,
|
||||
fileMessage =
|
||||
@@ -504,7 +501,7 @@ class DestinationMessageFactory(
|
||||
)
|
||||
}
|
||||
} else {
|
||||
DestinationRecord(stream.descriptor, message, serialized, stream.schema)
|
||||
DestinationRecord(stream, message, serialized, stream.schema)
|
||||
}
|
||||
}
|
||||
AirbyteMessage.Type.TRACE -> {
|
||||
@@ -522,24 +519,24 @@ class DestinationMessageFactory(
|
||||
AirbyteStreamStatus.COMPLETE ->
|
||||
if (fileTransferEnabled) {
|
||||
DestinationFileStreamComplete(
|
||||
stream.descriptor,
|
||||
stream,
|
||||
message.trace.emittedAt?.toLong() ?: 0L
|
||||
)
|
||||
} else {
|
||||
DestinationRecordStreamComplete(
|
||||
stream.descriptor,
|
||||
stream,
|
||||
message.trace.emittedAt?.toLong() ?: 0L
|
||||
)
|
||||
}
|
||||
AirbyteStreamStatus.INCOMPLETE ->
|
||||
if (fileTransferEnabled) {
|
||||
DestinationFileStreamIncomplete(
|
||||
stream.descriptor,
|
||||
stream,
|
||||
message.trace.emittedAt?.toLong() ?: 0L
|
||||
)
|
||||
} else {
|
||||
DestinationRecordStreamIncomplete(
|
||||
stream.descriptor,
|
||||
stream,
|
||||
message.trace.emittedAt?.toLong() ?: 0L
|
||||
)
|
||||
}
|
||||
|
||||
@@ -96,8 +96,8 @@ class DefaultInputConsumerTask(
|
||||
sizeBytes: Long
|
||||
) {
|
||||
val stream = reserved.value.stream
|
||||
val manager = syncManager.getStreamManager(stream)
|
||||
val recordQueue = recordQueueSupplier.get(stream)
|
||||
val manager = syncManager.getStreamManager(stream.descriptor)
|
||||
val recordQueue = recordQueueSupplier.get(stream.descriptor)
|
||||
when (val message = reserved.value) {
|
||||
is DestinationRecord -> {
|
||||
val wrapped =
|
||||
@@ -125,7 +125,9 @@ class DefaultInputConsumerTask(
|
||||
is DestinationFile -> {
|
||||
val index = manager.incrementReadCount()
|
||||
// destinationTaskLauncher.handleFile(stream, message, index)
|
||||
fileTransferQueue.publish(FileTransferQueueMessage(stream, message, index))
|
||||
fileTransferQueue.publish(
|
||||
FileTransferQueueMessage(stream.descriptor, message, index)
|
||||
)
|
||||
}
|
||||
is DestinationFileStreamComplete -> {
|
||||
reserved.release() // safe because multiple calls conflate
|
||||
@@ -133,9 +135,9 @@ class DefaultInputConsumerTask(
|
||||
val envelope =
|
||||
BatchEnvelope(
|
||||
SimpleBatch(Batch.State.COMPLETE),
|
||||
streamDescriptor = message.stream,
|
||||
streamDescriptor = message.stream.descriptor,
|
||||
)
|
||||
destinationTaskLauncher.handleNewBatch(stream, envelope)
|
||||
destinationTaskLauncher.handleNewBatch(stream.descriptor, envelope)
|
||||
}
|
||||
is DestinationFileStreamIncomplete ->
|
||||
throw IllegalStateException("File stream $stream failed upstream, cannot continue.")
|
||||
@@ -146,16 +148,16 @@ class DefaultInputConsumerTask(
|
||||
reserved: Reserved<DestinationStreamAffinedMessage>,
|
||||
) {
|
||||
val stream = reserved.value.stream
|
||||
unopenedStreams.remove(stream)?.let {
|
||||
unopenedStreams.remove(stream.descriptor)?.let {
|
||||
log.info { "Saw first record for stream $stream; initializing" }
|
||||
// Note, since we're not spilling to disk, there is nothing to do with
|
||||
// any records before initialization is complete, so we'll wait here
|
||||
// for it to finish.
|
||||
openStreamQueue.publish(it)
|
||||
syncManager.getOrAwaitStreamLoader(stream)
|
||||
syncManager.getOrAwaitStreamLoader(stream.descriptor)
|
||||
log.info { "Initialization for stream $stream complete" }
|
||||
}
|
||||
val manager = syncManager.getStreamManager(stream)
|
||||
val manager = syncManager.getStreamManager(stream.descriptor)
|
||||
when (val message = reserved.value) {
|
||||
is DestinationRecord -> {
|
||||
val record = message.asDestinationRecordRaw()
|
||||
@@ -163,7 +165,7 @@ class DefaultInputConsumerTask(
|
||||
val pipelineMessage =
|
||||
PipelineMessage(
|
||||
mapOf(manager.getCurrentCheckpointId() to 1),
|
||||
StreamKey(stream),
|
||||
StreamKey(stream.descriptor),
|
||||
record
|
||||
) { reserved.release() }
|
||||
val partition = partitioner.getPartition(record, recordQueueForPipeline.partitions)
|
||||
@@ -172,19 +174,21 @@ class DefaultInputConsumerTask(
|
||||
is DestinationRecordStreamComplete -> {
|
||||
manager.markEndOfStream(true)
|
||||
log.info { "Read COMPLETE for stream $stream" }
|
||||
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
|
||||
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream.descriptor))
|
||||
reserved.release()
|
||||
}
|
||||
is DestinationRecordStreamIncomplete -> {
|
||||
manager.markEndOfStream(false)
|
||||
log.info { "Read INCOMPLETE for stream $stream" }
|
||||
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
|
||||
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream.descriptor))
|
||||
reserved.release()
|
||||
}
|
||||
is DestinationFile -> {
|
||||
val index = manager.incrementReadCount()
|
||||
// destinationTaskLauncher.handleFile(stream, message, index)
|
||||
fileTransferQueue.publish(FileTransferQueueMessage(stream, message, index))
|
||||
fileTransferQueue.publish(
|
||||
FileTransferQueueMessage(stream.descriptor, message, index)
|
||||
)
|
||||
}
|
||||
is DestinationFileStreamComplete -> {
|
||||
reserved.release() // safe because multiple calls conflate
|
||||
@@ -192,9 +196,9 @@ class DefaultInputConsumerTask(
|
||||
val envelope =
|
||||
BatchEnvelope(
|
||||
SimpleBatch(Batch.State.COMPLETE),
|
||||
streamDescriptor = message.stream,
|
||||
streamDescriptor = message.stream.descriptor,
|
||||
)
|
||||
destinationTaskLauncher.handleNewBatch(stream, envelope)
|
||||
destinationTaskLauncher.handleNewBatch(stream.descriptor, envelope)
|
||||
}
|
||||
is DestinationFileStreamIncomplete ->
|
||||
throw IllegalStateException("File stream $stream failed upstream, cannot continue.")
|
||||
|
||||
@@ -41,7 +41,7 @@ class DestinationRecordAirbyteValueToAirbyteValueWithMetaTest {
|
||||
)
|
||||
val expected = LinkedHashMap(expectedMeta)
|
||||
expected[Meta.COLUMN_NAME_DATA] = data
|
||||
val mockRecord = DestinationRecordAirbyteValue(stream.descriptor, data, emittedAtMs, Meta())
|
||||
val mockRecord = DestinationRecordAirbyteValue(stream, data, emittedAtMs, Meta())
|
||||
val withMeta = mockRecord.dataWithAirbyteMeta(stream, flatten = false)
|
||||
val uuid = withMeta.values.remove(Meta.COLUMN_NAME_AB_RAW_ID) as StringValue
|
||||
Assertions.assertTrue(
|
||||
@@ -64,7 +64,7 @@ class DestinationRecordAirbyteValueToAirbyteValueWithMetaTest {
|
||||
)
|
||||
val expected = LinkedHashMap(expectedMeta)
|
||||
data.values.forEach { (name, value) -> expected[name] = value }
|
||||
val mockRecord = DestinationRecordAirbyteValue(stream.descriptor, data, emittedAtMs, Meta())
|
||||
val mockRecord = DestinationRecordAirbyteValue(stream, data, emittedAtMs, Meta())
|
||||
val withMeta = mockRecord.dataWithAirbyteMeta(stream, flatten = true)
|
||||
withMeta.values.remove(Meta.COLUMN_NAME_AB_RAW_ID)
|
||||
Assertions.assertEquals(expected, withMeta.values)
|
||||
|
||||
@@ -66,7 +66,7 @@ class ProcessRecordsTaskTest {
|
||||
coEvery { deserializer.deserialize(any()) } answers
|
||||
{
|
||||
DestinationRecord(
|
||||
stream = MockDestinationCatalogFactory.stream1.descriptor,
|
||||
stream = MockDestinationCatalogFactory.stream1,
|
||||
message =
|
||||
AirbyteMessage()
|
||||
.withRecord(
|
||||
|
||||
@@ -111,7 +111,7 @@ class InputConsumerTaskTest {
|
||||
queue1.publish(
|
||||
match {
|
||||
it.value is StreamRecordEvent &&
|
||||
(it.value as StreamRecordEvent).payload.stream == STREAM1
|
||||
(it.value as StreamRecordEvent).payload.stream.descriptor == STREAM1
|
||||
}
|
||||
)
|
||||
}
|
||||
@@ -119,7 +119,7 @@ class InputConsumerTaskTest {
|
||||
queue2.publish(
|
||||
match {
|
||||
it.value is StreamRecordEvent &&
|
||||
(it.value as StreamRecordEvent).payload.stream == STREAM2
|
||||
(it.value as StreamRecordEvent).payload.stream.descriptor == STREAM2
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -51,7 +51,8 @@ class InputConsumerTaskUTest {
|
||||
@MockK lateinit var partitioner: InputPartitioner
|
||||
@MockK lateinit var openStreamQueue: QueueWriter<DestinationStream>
|
||||
|
||||
private val stream = DestinationStream.Descriptor("namespace", "name")
|
||||
private val streamDescriptor = DestinationStream.Descriptor("namespace", "name")
|
||||
private lateinit var dstream: DestinationStream
|
||||
|
||||
private fun createTask(loadPipeline: LoadPipeline?) =
|
||||
DefaultInputConsumerTask(
|
||||
@@ -70,10 +71,10 @@ class InputConsumerTaskUTest {
|
||||
|
||||
@BeforeEach
|
||||
fun setup() {
|
||||
val dstream = mockk<DestinationStream>(relaxed = true)
|
||||
every { dstream.descriptor } returns stream
|
||||
dstream = mockk<DestinationStream>(relaxed = true)
|
||||
every { dstream.descriptor } returns streamDescriptor
|
||||
coEvery { catalog.streams } returns listOf(dstream)
|
||||
coEvery { recordQueueSupplier.get(stream) } returns mockk(relaxed = true)
|
||||
coEvery { recordQueueSupplier.get(streamDescriptor) } returns mockk(relaxed = true)
|
||||
coEvery { fileTransferQueue.close() } returns Unit
|
||||
coEvery { recordQueueForPipeline.close() } returns Unit
|
||||
coEvery { openStreamQueue.close() } returns Unit
|
||||
@@ -94,7 +95,7 @@ class InputConsumerTaskUTest {
|
||||
null,
|
||||
0,
|
||||
DestinationRecord(
|
||||
stream = stream,
|
||||
stream = dstream,
|
||||
message = mockk(relaxed = true),
|
||||
serialized = "",
|
||||
schema = ObjectTypeWithoutSchema
|
||||
@@ -104,7 +105,7 @@ class InputConsumerTaskUTest {
|
||||
)
|
||||
val job = launch { inputConsumerTask.execute() }
|
||||
job.join()
|
||||
coVerify { recordQueueSupplier.get(stream) }
|
||||
coVerify { recordQueueSupplier.get(streamDescriptor) }
|
||||
coVerify(exactly = 0) { recordQueueForPipeline.publish(any(), any()) }
|
||||
}
|
||||
}
|
||||
@@ -112,7 +113,6 @@ class InputConsumerTaskUTest {
|
||||
@Test
|
||||
fun `input consumer uses the new path when there is a load pipeline`(): Unit = runTest {
|
||||
val inputConsumerTask = createTask(mockk(relaxed = true))
|
||||
|
||||
coEvery { inputFlow.collect(any()) } coAnswers
|
||||
{
|
||||
val collector: FlowCollector<Pair<Long, Reserved<DestinationMessage>>> = firstArg()
|
||||
@@ -123,7 +123,7 @@ class InputConsumerTaskUTest {
|
||||
null,
|
||||
0,
|
||||
DestinationRecord(
|
||||
stream = stream,
|
||||
stream = dstream,
|
||||
message = mockk(relaxed = true),
|
||||
serialized = "",
|
||||
schema = ObjectTypeWithoutSchema
|
||||
@@ -133,7 +133,7 @@ class InputConsumerTaskUTest {
|
||||
)
|
||||
val job = launch { inputConsumerTask.execute() }
|
||||
job.join()
|
||||
coVerify(exactly = 0) { recordQueueSupplier.get(stream) }
|
||||
coVerify(exactly = 0) { recordQueueSupplier.get(streamDescriptor) }
|
||||
coVerify { recordQueueForPipeline.publish(any(), any()) }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ class ReservingDeserializingInputFlowTest {
|
||||
@MockK(relaxed = true) lateinit var config: DestinationConfiguration
|
||||
@MockK(relaxed = true) lateinit var deserializer: ProtocolMessageDeserializer
|
||||
@MockK(relaxed = true) lateinit var memoryManager: ReservationManager
|
||||
@MockK(relaxed = true) lateinit var stream: DestinationStream.Descriptor
|
||||
@MockK(relaxed = true) lateinit var stream: DestinationStream
|
||||
lateinit var inputFlow: ReservingDeserializingInputFlow
|
||||
|
||||
@BeforeEach
|
||||
|
||||
@@ -92,10 +92,7 @@ class SpillToDiskTaskTest {
|
||||
StreamRecordEvent(
|
||||
3L,
|
||||
2L,
|
||||
DestinationRecordSerialized(
|
||||
MockDestinationCatalogFactory.stream1.descriptor,
|
||||
""
|
||||
)
|
||||
DestinationRecordSerialized(MockDestinationCatalogFactory.stream1, "")
|
||||
)
|
||||
// flush strategy returns true, so we flush
|
||||
coEvery { flushStrategy.shouldFlush(any(), any(), any()) } returns true
|
||||
@@ -132,10 +129,7 @@ class SpillToDiskTaskTest {
|
||||
StreamRecordEvent(
|
||||
3L,
|
||||
2L,
|
||||
DestinationRecordSerialized(
|
||||
MockDestinationCatalogFactory.stream1.descriptor,
|
||||
""
|
||||
)
|
||||
DestinationRecordSerialized(MockDestinationCatalogFactory.stream1, "")
|
||||
)
|
||||
|
||||
// must publish 1 record message so range isn't empty
|
||||
@@ -254,7 +248,7 @@ class SpillToDiskTaskTest {
|
||||
sizeBytes = Fixtures.SERIALIZED_SIZE_BYTES,
|
||||
payload =
|
||||
DestinationRecordSerialized(
|
||||
MockDestinationCatalogFactory.stream1.descriptor,
|
||||
MockDestinationCatalogFactory.stream1,
|
||||
"",
|
||||
),
|
||||
),
|
||||
|
||||
@@ -26,7 +26,7 @@ import io.airbyte.protocol.models.v0.AirbyteStateMessage
|
||||
object StubDestinationMessageFactory {
|
||||
fun makeRecord(stream: DestinationStream): DestinationRecord {
|
||||
return DestinationRecord(
|
||||
stream = stream.descriptor,
|
||||
stream = stream,
|
||||
message =
|
||||
AirbyteMessage()
|
||||
.withRecord(
|
||||
@@ -39,7 +39,7 @@ object StubDestinationMessageFactory {
|
||||
|
||||
fun makeFile(stream: DestinationStream, record: String): DestinationFile {
|
||||
return DestinationFile(
|
||||
stream = stream.descriptor,
|
||||
stream = stream,
|
||||
emittedAtMs = 0,
|
||||
serialized = record,
|
||||
fileMessage = nullFileMessage,
|
||||
@@ -47,19 +47,19 @@ object StubDestinationMessageFactory {
|
||||
}
|
||||
|
||||
fun makeStreamComplete(stream: DestinationStream): DestinationRecordStreamComplete {
|
||||
return DestinationRecordStreamComplete(stream = stream.descriptor, emittedAtMs = 0)
|
||||
return DestinationRecordStreamComplete(stream = stream, emittedAtMs = 0)
|
||||
}
|
||||
|
||||
fun makeFileStreamComplete(stream: DestinationStream): DestinationFileStreamComplete {
|
||||
return DestinationFileStreamComplete(stream = stream.descriptor, emittedAtMs = 0)
|
||||
return DestinationFileStreamComplete(stream = stream, emittedAtMs = 0)
|
||||
}
|
||||
|
||||
fun makeStreamIncomplete(stream: DestinationStream): DestinationRecordStreamIncomplete {
|
||||
return DestinationRecordStreamIncomplete(stream = stream.descriptor, emittedAtMs = 0)
|
||||
return DestinationRecordStreamIncomplete(stream = stream, emittedAtMs = 0)
|
||||
}
|
||||
|
||||
fun makeFileStreamIncomplete(stream: DestinationStream): DestinationFileStreamIncomplete {
|
||||
return DestinationFileStreamIncomplete(stream = stream.descriptor, emittedAtMs = 0)
|
||||
return DestinationFileStreamIncomplete(stream = stream, emittedAtMs = 0)
|
||||
}
|
||||
|
||||
fun makeStreamState(stream: DestinationStream, recordCount: Long): CheckpointMessage {
|
||||
|
||||
@@ -64,7 +64,7 @@ data class InputFile(
|
||||
val file: DestinationFile,
|
||||
) : InputMessage {
|
||||
constructor(
|
||||
stream: DestinationStream.Descriptor,
|
||||
stream: DestinationStream,
|
||||
emittedAtMs: Long,
|
||||
fileMessage: DestinationFile.AirbyteRecordMessageFile,
|
||||
serialized: String = ""
|
||||
|
||||
@@ -216,7 +216,7 @@ abstract class IntegrationTest(
|
||||
if (streamStatus != null) {
|
||||
catalog.streams.forEach {
|
||||
destination.sendMessage(
|
||||
DestinationRecordStreamComplete(it.descriptor, System.currentTimeMillis())
|
||||
DestinationRecordStreamComplete(it, System.currentTimeMillis())
|
||||
.asProtocolMessage()
|
||||
)
|
||||
}
|
||||
|
||||
@@ -331,7 +331,7 @@ abstract class BasicFunctionalityIntegrationTest(
|
||||
stream,
|
||||
listOf(
|
||||
InputFile(
|
||||
stream = stream.descriptor,
|
||||
stream = stream,
|
||||
emittedAtMs = 1234,
|
||||
fileMessage = fileMessage,
|
||||
),
|
||||
|
||||
@@ -374,10 +374,7 @@ abstract class BasicPerformanceTest(
|
||||
testScenario.send(destination)
|
||||
testScenario.catalog.streams.forEach {
|
||||
destination.sendMessage(
|
||||
DestinationRecordStreamComplete(
|
||||
it.descriptor,
|
||||
System.currentTimeMillis()
|
||||
)
|
||||
DestinationRecordStreamComplete(it, System.currentTimeMillis())
|
||||
.asProtocolMessage()
|
||||
)
|
||||
}
|
||||
|
||||
@@ -166,6 +166,15 @@ class SingleStreamFileTransfer(
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
private val descriptor = DestinationStream.Descriptor(randomizedNamespace, streamName)
|
||||
private val stream =
|
||||
DestinationStream(
|
||||
descriptor = DestinationStream.Descriptor(randomizedNamespace, streamName),
|
||||
importType = Append,
|
||||
schema = ObjectType(linkedMapOf()),
|
||||
generationId = 1,
|
||||
minimumGenerationId = 0,
|
||||
syncId = 1,
|
||||
)
|
||||
|
||||
override val catalog: DestinationCatalog =
|
||||
DestinationCatalog(
|
||||
@@ -202,7 +211,7 @@ class SingleStreamFileTransfer(
|
||||
val fileName = makeFileName(it.toLong())
|
||||
val message =
|
||||
DestinationFile(
|
||||
descriptor,
|
||||
stream,
|
||||
System.currentTimeMillis(),
|
||||
"",
|
||||
DestinationFile.AirbyteRecordMessageFile(
|
||||
|
||||
@@ -82,7 +82,7 @@ class FilePartAccumulatorTest {
|
||||
|
||||
private fun createFileMessage(file: File): DestinationFile {
|
||||
return DestinationFile(
|
||||
descriptor,
|
||||
stream,
|
||||
0,
|
||||
"",
|
||||
DestinationFile.AirbyteRecordMessageFile(
|
||||
|
||||
@@ -42,7 +42,7 @@ class RecordToPartAccumulatorTest {
|
||||
|
||||
private fun makeRecord(): DestinationRecordAirbyteValue =
|
||||
DestinationRecordAirbyteValue(
|
||||
DestinationStream.Descriptor("test", "stream"),
|
||||
stream,
|
||||
ObjectValue(linkedMapOf()),
|
||||
0L,
|
||||
null,
|
||||
|
||||
@@ -16,7 +16,7 @@ data:
|
||||
type: GSM
|
||||
connectorType: destination
|
||||
definitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
|
||||
dockerImageTag: 2.0.2
|
||||
dockerImageTag: 2.0.3
|
||||
dockerRepository: airbyte/destination-mssql
|
||||
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql
|
||||
githubIssueLabel: destination-mssql
|
||||
|
||||
@@ -133,7 +133,7 @@ class MSSQLChecker(private val dataSourceFactory: MSSQLDataSourceFactory) :
|
||||
MSSQLCSVFormattingWriter(stream, outputStream, true).use { csvWriter ->
|
||||
val destinationRecord =
|
||||
DestinationRecordAirbyteValue(
|
||||
stream.descriptor,
|
||||
stream,
|
||||
ObjectValue(
|
||||
linkedMapOf(COLUMN_NAME to IntegerValue(TEST_ID_VALUE.toBigInteger()))
|
||||
),
|
||||
|
||||
@@ -6,7 +6,7 @@ plugins {
|
||||
airbyteBulkConnector {
|
||||
core = 'load'
|
||||
toolkits = ['load-iceberg-parquet', 'load-aws']
|
||||
cdk = '0.344'
|
||||
cdk = 'local'
|
||||
}
|
||||
|
||||
application {
|
||||
|
||||
@@ -26,7 +26,7 @@ data:
|
||||
alias: airbyte-connector-testing-secret-store
|
||||
connectorType: destination
|
||||
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
|
||||
dockerImageTag: 0.3.17
|
||||
dockerImageTag: 0.3.18
|
||||
dockerRepository: airbyte/destination-s3-data-lake
|
||||
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
|
||||
githubIssueLabel: destination-s3-data-lake
|
||||
|
||||
@@ -87,7 +87,7 @@ class S3DataLakeDirectLoader(
|
||||
record = recordAirbyteValue,
|
||||
stream = stream,
|
||||
tableSchema = schema,
|
||||
pipeline = pipeline
|
||||
pipeline = pipeline,
|
||||
)
|
||||
writer.write(icebergRecord)
|
||||
|
||||
|
||||
@@ -5,23 +5,13 @@
|
||||
package io.airbyte.integrations.destination.s3_data_lake
|
||||
|
||||
import io.airbyte.cdk.load.command.Dedupe
|
||||
import io.airbyte.cdk.load.command.DestinationCatalog
|
||||
import io.airbyte.cdk.load.message.DestinationRecordRaw
|
||||
import io.airbyte.cdk.load.pipeline.InputPartitioner
|
||||
import jakarta.inject.Singleton
|
||||
import kotlin.math.abs
|
||||
import kotlin.random.Random
|
||||
|
||||
@Singleton
|
||||
class S3DataLakePartitioner(catalog: DestinationCatalog) : InputPartitioner {
|
||||
private val streamToPrimaryKeyFieldNames =
|
||||
catalog.streams.associate { stream ->
|
||||
stream.descriptor to
|
||||
when (stream.importType) {
|
||||
is Dedupe -> (stream.importType as Dedupe).primaryKey
|
||||
else -> null
|
||||
}
|
||||
}
|
||||
class S3DataLakePartitioner : InputPartitioner {
|
||||
private val random = Random(System.currentTimeMillis())
|
||||
|
||||
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
|
||||
@@ -29,22 +19,17 @@ class S3DataLakePartitioner(catalog: DestinationCatalog) : InputPartitioner {
|
||||
return 0
|
||||
}
|
||||
|
||||
streamToPrimaryKeyFieldNames[record.stream]?.let { primaryKey ->
|
||||
val jsonData = record.asRawJson()
|
||||
|
||||
val primaryKeyValues =
|
||||
primaryKey.map { keys ->
|
||||
keys.map { key -> if (jsonData.has(key)) jsonData.get(key) else null }
|
||||
}
|
||||
val hash = primaryKeyValues.hashCode()
|
||||
/** abs(MIN_VALUE) == MIN_VALUE, so we need to handle this case separately */
|
||||
if (hash == Int.MIN_VALUE) {
|
||||
return 0
|
||||
}
|
||||
return abs(primaryKeyValues.hashCode()) % numParts
|
||||
if (record.stream.importType !is Dedupe) {
|
||||
return random.nextInt(numParts)
|
||||
}
|
||||
?: run {
|
||||
return abs(random.nextInt()) % numParts
|
||||
|
||||
val primaryKey = (record.stream.importType as Dedupe).primaryKey
|
||||
val jsonData = record.asRawJson()
|
||||
|
||||
val primaryKeyValues =
|
||||
primaryKey.map { keys ->
|
||||
keys.map { key -> if (jsonData.has(key)) jsonData.get(key) else null }
|
||||
}
|
||||
return Math.floorMod(primaryKeyValues.hashCode(), numParts)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,7 +207,7 @@ internal class S3DataLakeUtilTest {
|
||||
)
|
||||
val airbyteRecord =
|
||||
DestinationRecordAirbyteValue(
|
||||
stream = airbyteStream.descriptor,
|
||||
stream = airbyteStream,
|
||||
data =
|
||||
ObjectValue(
|
||||
linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe"))
|
||||
@@ -254,7 +254,7 @@ internal class S3DataLakeUtilTest {
|
||||
)
|
||||
val airbyteRecord =
|
||||
DestinationRecordAirbyteValue(
|
||||
stream = airbyteStream.descriptor,
|
||||
stream = airbyteStream,
|
||||
data =
|
||||
ObjectValue(
|
||||
linkedMapOf(
|
||||
@@ -306,7 +306,7 @@ internal class S3DataLakeUtilTest {
|
||||
)
|
||||
val airbyteRecord =
|
||||
DestinationRecordAirbyteValue(
|
||||
stream = airbyteStream.descriptor,
|
||||
stream = airbyteStream,
|
||||
data =
|
||||
ObjectValue(
|
||||
linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe"))
|
||||
|
||||
@@ -158,6 +158,7 @@ See the [Getting Started: Configuration section](#configuration) of this guide f
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:-----------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
|
||||
| 2.0.3 | 2025-03-18 | [55811](https://github.com/airbytehq/airbyte/pull/55811) | CDK: Pass DestinationStream around vs Descriptor |
|
||||
| 2.0.2 | 2025-03-12 | [55720](https://github.com/airbytehq/airbyte/pull/55720) | Restore definition ID |
|
||||
| 2.0.1 | 2025-03-12 | [55718](https://github.com/airbytehq/airbyte/pull/55718) | Fix breaking change information in metadata.yaml |
|
||||
| 2.0.0 | 2025-03-11 | [55684](https://github.com/airbytehq/airbyte/pull/55684) | Release 2.0.0 |
|
||||
|
||||
@@ -305,41 +305,44 @@ Now, you can identify the latest version of the 'Alice' record by querying wheth
|
||||
<details>
|
||||
<summary>Expand to review</summary>
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------- |
|
||||
| 0.3.15 | 2025-02-28 | [\#54724](https://github.com/airbytehq/airbyte/pull/54724) | Certify connector |
|
||||
| 0.3.14 | 2025-02-14 | [\#53241](https://github.com/airbytehq/airbyte/pull/53241) | New CDK interface; perf improvements, skip initial record staging |
|
||||
| 0.3.13 | 2025-02-14 | [\#53697](https://github.com/airbytehq/airbyte/pull/53697) | Internal refactor |
|
||||
| 0.3.12 | 2025-02-12 | [\#53170](https://github.com/airbytehq/airbyte/pull/53170) | Improve documentation, tweak error handling of invalid schema evolution |
|
||||
| 0.3.11 | 2025-02-12 | [\#53216](https://github.com/airbytehq/airbyte/pull/53216) | Support arbitrary schema change in overwrite / truncate refresh / clear sync |
|
||||
| 0.3.10 | 2025-02-11 | [\#53622](https://github.com/airbytehq/airbyte/pull/53622) | Enable the Nessie integration tests |
|
||||
| 0.3.9 | 2025-02-10 | [\#53165](https://github.com/airbytehq/airbyte/pull/53165) | Very basic usability improvements and documentation |
|
||||
| 0.3.8 | 2025-02-10 | [\#52666](https://github.com/airbytehq/airbyte/pull/52666) | Change the chunk size to 1.5Gb |
|
||||
| 0.3.7 | 2025-02-07 | [\#53141](https://github.com/airbytehq/airbyte/pull/53141) | Adding integration tests around the Rest catalog |
|
||||
| 0.3.6 | 2025-02-06 | [\#53172](https://github.com/airbytehq/airbyte/pull/53172) | Internal refactor |
|
||||
| 0.3.5 | 2025-02-06 | [\#53164](https://github.com/airbytehq/airbyte/pull/53164) | Improve error message on null primary key in dedup mode |
|
||||
| 0.3.4 | 2025-02-05 | [\#53173](https://github.com/airbytehq/airbyte/pull/53173) | Tweak spec wording |
|
||||
| 0.3.3 | 2025-02-05 | [\#53176](https://github.com/airbytehq/airbyte/pull/53176) | Fix time_with_timezone handling (values are now adjusted to UTC) |
|
||||
| 0.3.2 | 2025-02-04 | [\#52690](https://github.com/airbytehq/airbyte/pull/52690) | Handle special characters in stream name/namespace when using AWS Glue |
|
||||
| 0.3.1 | 2025-02-03 | [\#52633](https://github.com/airbytehq/airbyte/pull/52633) | Fix dedup |
|
||||
| 0.3.0 | 2025-01-31 | [\#52639](https://github.com/airbytehq/airbyte/pull/52639) | Make the database/namespace a required field |
|
||||
| 0.2.23 | 2025-01-27 | [\#51600](https://github.com/airbytehq/airbyte/pull/51600) | Internal refactor |
|
||||
| 0.2.22 | 2025-01-22 | [\#52081](https://github.com/airbytehq/airbyte/pull/52081) | Implement support for REST catalog |
|
||||
| 0.2.21 | 2025-01-27 | [\#52564](https://github.com/airbytehq/airbyte/pull/52564) | Fix crash on stream with 0 records |
|
||||
| 0.2.20 | 2025-01-23 | [\#52068](https://github.com/airbytehq/airbyte/pull/52068) | Add support for default namespace (/database name) |
|
||||
| 0.2.19 | 2025-01-16 | [\#51595](https://github.com/airbytehq/airbyte/pull/51595) | Clarifications in connector config options |
|
||||
| 0.2.18 | 2025-01-15 | [\#51042](https://github.com/airbytehq/airbyte/pull/51042) | Write structs as JSON strings instead of Iceberg structs. |
|
||||
| 0.2.17 | 2025-01-14 | [\#51542](https://github.com/airbytehq/airbyte/pull/51542) | New identifier fields should be marked as required. |
|
||||
| 0.2.16 | 2025-01-14 | [\#51538](https://github.com/airbytehq/airbyte/pull/51538) | Update identifier fields if incoming fields are different than existing ones |
|
||||
| 0.2.15 | 2025-01-14 | [\#51530](https://github.com/airbytehq/airbyte/pull/51530) | Set AWS region for S3 bucket for nessie catalog |
|
||||
| 0.2.14 | 2025-01-14 | [\#50413](https://github.com/airbytehq/airbyte/pull/50413) | Update existing table schema based on the incoming schema |
|
||||
| 0.2.13 | 2025-01-14 | [\#50412](https://github.com/airbytehq/airbyte/pull/50412) | Implement logic to determine super types between iceberg types |
|
||||
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
|
||||
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
|
||||
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
|
||||
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
|
||||
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
|
||||
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
|
||||
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------| :--------------------------------------------------------- |:-------------------------------------------------------------------------------|
|
||||
| 0.3.18 | 2025-03-18 | [\#55811](https://github.com/airbytehq/airbyte/pull/55811) | CDK: Pass DestinationStream around vs Descriptor |
|
||||
| 0.3.17 | 2025-03-13 | [\#55737](https://github.com/airbytehq/airbyte/pull/55737) | CDK: Pass DestinationRecordRaw around instead of DestinationRecordAirbyteValue |
|
||||
| 0.3.16 | 2025-03-13 | [\#55755](https://github.com/airbytehq/airbyte/pull/55755) | Exclude number fields from identifier fields |
|
||||
| 0.3.15 | 2025-02-28 | [\#54724](https://github.com/airbytehq/airbyte/pull/54724) | Certify connector |
|
||||
| 0.3.14 | 2025-02-14 | [\#53241](https://github.com/airbytehq/airbyte/pull/53241) | New CDK interface; perf improvements, skip initial record staging |
|
||||
| 0.3.13 | 2025-02-14 | [\#53697](https://github.com/airbytehq/airbyte/pull/53697) | Internal refactor |
|
||||
| 0.3.12 | 2025-02-12 | [\#53170](https://github.com/airbytehq/airbyte/pull/53170) | Improve documentation, tweak error handling of invalid schema evolution |
|
||||
| 0.3.11 | 2025-02-12 | [\#53216](https://github.com/airbytehq/airbyte/pull/53216) | Support arbitrary schema change in overwrite / truncate refresh / clear sync |
|
||||
| 0.3.10 | 2025-02-11 | [\#53622](https://github.com/airbytehq/airbyte/pull/53622) | Enable the Nessie integration tests |
|
||||
| 0.3.9 | 2025-02-10 | [\#53165](https://github.com/airbytehq/airbyte/pull/53165) | Very basic usability improvements and documentation |
|
||||
| 0.3.8 | 2025-02-10 | [\#52666](https://github.com/airbytehq/airbyte/pull/52666) | Change the chunk size to 1.5Gb |
|
||||
| 0.3.7 | 2025-02-07 | [\#53141](https://github.com/airbytehq/airbyte/pull/53141) | Adding integration tests around the Rest catalog |
|
||||
| 0.3.6 | 2025-02-06 | [\#53172](https://github.com/airbytehq/airbyte/pull/53172) | Internal refactor |
|
||||
| 0.3.5 | 2025-02-06 | [\#53164](https://github.com/airbytehq/airbyte/pull/53164) | Improve error message on null primary key in dedup mode |
|
||||
| 0.3.4 | 2025-02-05 | [\#53173](https://github.com/airbytehq/airbyte/pull/53173) | Tweak spec wording |
|
||||
| 0.3.3 | 2025-02-05 | [\#53176](https://github.com/airbytehq/airbyte/pull/53176) | Fix time_with_timezone handling (values are now adjusted to UTC) |
|
||||
| 0.3.2 | 2025-02-04 | [\#52690](https://github.com/airbytehq/airbyte/pull/52690) | Handle special characters in stream name/namespace when using AWS Glue |
|
||||
| 0.3.1 | 2025-02-03 | [\#52633](https://github.com/airbytehq/airbyte/pull/52633) | Fix dedup |
|
||||
| 0.3.0 | 2025-01-31 | [\#52639](https://github.com/airbytehq/airbyte/pull/52639) | Make the database/namespace a required field |
|
||||
| 0.2.23 | 2025-01-27 | [\#51600](https://github.com/airbytehq/airbyte/pull/51600) | Internal refactor |
|
||||
| 0.2.22 | 2025-01-22 | [\#52081](https://github.com/airbytehq/airbyte/pull/52081) | Implement support for REST catalog |
|
||||
| 0.2.21 | 2025-01-27 | [\#52564](https://github.com/airbytehq/airbyte/pull/52564) | Fix crash on stream with 0 records |
|
||||
| 0.2.20 | 2025-01-23 | [\#52068](https://github.com/airbytehq/airbyte/pull/52068) | Add support for default namespace (/database name) |
|
||||
| 0.2.19 | 2025-01-16 | [\#51595](https://github.com/airbytehq/airbyte/pull/51595) | Clarifications in connector config options |
|
||||
| 0.2.18 | 2025-01-15 | [\#51042](https://github.com/airbytehq/airbyte/pull/51042) | Write structs as JSON strings instead of Iceberg structs. |
|
||||
| 0.2.17 | 2025-01-14 | [\#51542](https://github.com/airbytehq/airbyte/pull/51542) | New identifier fields should be marked as required. |
|
||||
| 0.2.16 | 2025-01-14 | [\#51538](https://github.com/airbytehq/airbyte/pull/51538) | Update identifier fields if incoming fields are different than existing ones |
|
||||
| 0.2.15 | 2025-01-14 | [\#51530](https://github.com/airbytehq/airbyte/pull/51530) | Set AWS region for S3 bucket for nessie catalog |
|
||||
| 0.2.14 | 2025-01-14 | [\#50413](https://github.com/airbytehq/airbyte/pull/50413) | Update existing table schema based on the incoming schema |
|
||||
| 0.2.13 | 2025-01-14 | [\#50412](https://github.com/airbytehq/airbyte/pull/50412) | Implement logic to determine super types between iceberg types |
|
||||
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
|
||||
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
|
||||
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
|
||||
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
|
||||
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
|
||||
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
|
||||
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |
|
||||
|
||||
</details>
|
||||
|
||||
Reference in New Issue
Block a user