1
0
mirror of synced 2025-12-29 09:03:46 -05:00

Rbroughan/dont fail fast on stream incomplete (#49455)

This commit is contained in:
Ryan Br...
2024-12-13 15:37:05 -08:00
committed by GitHub
parent 526c159758
commit bdaae730ab
36 changed files with 235 additions and 263 deletions

View File

@@ -12,7 +12,7 @@ import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamIncompleteResult
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
@@ -42,7 +42,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
override val state = Batch.State.PERSISTED
}
override suspend fun close(streamFailure: StreamIncompleteResult?) {
override suspend fun close(streamFailure: StreamProcessingFailed?) {
if (streamFailure == null) {
when (val importType = stream.importType) {
is Append -> {

View File

@@ -37,6 +37,8 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
fun asProtocolObject(): ConfiguredAirbyteCatalog =
ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() })
fun size(): Int = streams.size
}
interface DestinationCatalogFactory {

View File

@@ -4,6 +4,7 @@
package io.airbyte.cdk.load.config
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.state.ReservationManager
@@ -51,7 +52,9 @@ class SyncBeanFactory {
fun fileAggregateQueue(
@Value("\${airbyte.resources.disk.bytes}") availableBytes: Long,
config: DestinationConfiguration,
catalog: DestinationCatalog
): MultiProducerChannel<FileAggregateMessage> {
val streamCount = catalog.size()
// total batches by disk capacity
val maxBatchesThatFitOnDisk = (availableBytes / config.recordBatchSizeBytes).toInt()
// account for batches in flight processing by the workers
@@ -64,6 +67,6 @@ class SyncBeanFactory {
val capacity = min(maxBatchesMinusUploadOverhead, idealDepth)
log.info { "Creating file aggregate queue with limit $capacity" }
val channel = Channel<FileAggregateMessage>(capacity)
return MultiProducerChannel(channel)
return MultiProducerChannel(streamCount.toLong(), channel)
}
}

View File

@@ -26,18 +26,26 @@ interface Sized {
*/
sealed class DestinationStreamEvent : Sized
/** Contains a record to be aggregated and processed. */
data class StreamRecordEvent(
val index: Long,
override val sizeBytes: Long,
val record: DestinationRecord
) : DestinationStreamEvent()
data class StreamCompleteEvent(
/**
* Indicates the stream is in a terminal (complete or incomplete) state as signalled by upstream.
*/
data class StreamEndEvent(
val index: Long,
) : DestinationStreamEvent() {
override val sizeBytes: Long = 0L
}
/**
* Emitted to trigger evaluation of the conditional flush logic of a stream. The consumer may or may
* not decide to flush.
*/
data class StreamFlushEvent(
val tickedAtMs: Long,
) : DestinationStreamEvent() {

View File

@@ -5,37 +5,29 @@
package io.airbyte.cdk.load.message
import io.github.oshai.kotlinlogging.KotlinLogging
import java.lang.IllegalStateException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.channels.Channel
/**
* A channel designed for use with a dynamic amount of producers. Close will only close the
* A channel designed for use with a fixed amount of producers. Close will be called on the
* underlying channel, when there are no remaining registered producers.
*/
class MultiProducerChannel<T>(override val channel: Channel<T>) : ChannelMessageQueue<T>() {
class MultiProducerChannel<T>(
producerCount: Long,
override val channel: Channel<T>,
) : ChannelMessageQueue<T>() {
private val log = KotlinLogging.logger {}
private val producerCount = AtomicLong(0)
private val closed = AtomicBoolean(false)
fun registerProducer(): MultiProducerChannel<T> {
if (closed.get()) {
throw IllegalStateException("Attempted to register producer for closed channel.")
}
val count = producerCount.incrementAndGet()
log.info { "Registering producer (count=$count)" }
return this
}
private val initializedProducerCount = producerCount
private val producerCount = AtomicLong(producerCount)
override suspend fun close() {
val count = producerCount.decrementAndGet()
log.info { "Closing producer (count=$count)" }
log.info {
"Closing producer (active count=$count, initialized count: $initializedProducerCount)"
}
if (count == 0L) {
log.info { "Closing queue" }
log.info { "Closing underlying queue" }
channel.close()
closed.getAndSet(true)
}
}
}

View File

@@ -20,13 +20,9 @@ import kotlinx.coroutines.CompletableDeferred
sealed interface StreamResult
sealed interface StreamIncompleteResult : StreamResult
data class StreamProcessingFailed(val streamException: Exception) : StreamResult
data class StreamFailed(val streamException: Exception) : StreamIncompleteResult
data class StreamKilled(val syncException: Exception) : StreamIncompleteResult
data object StreamSucceeded : StreamResult
data object StreamProcessingSucceeded : StreamResult
/** Manages the state of a single stream. */
interface StreamManager {
@@ -38,13 +34,17 @@ interface StreamManager {
fun recordCount(): Long
/**
* Mark the end-of-stream and return the record count. Expect this exactly once. Expect no
* further `countRecordIn`, and expect that [markSucceeded] or [markFailed] or [markKilled] will
* alway occur after this.
* Mark the end-of-stream, set the end of stream variant (complete or incomplete) and return the
* record count. Expect this exactly once. Expect no further `countRecordIn`, and expect that
* [markProcessingSucceeded] will always occur after this, while [markProcessingFailed] can
* occur before or after.
*/
fun markEndOfStream(): Long
fun markEndOfStream(receivedStreamCompleteMessage: Boolean): Long
fun endOfStreamRead(): Boolean
/** Whether we received a stream complete message for the managed stream. */
fun isComplete(): Boolean
/**
* Mark a checkpoint in the stream and return the current index and the number of records since
* the last one.
@@ -72,22 +72,23 @@ interface StreamManager {
*/
fun areRecordsPersistedUntil(index: Long): Boolean
/** Mark the stream as closed. This should only be called after all records have been read. */
fun markSucceeded()
/**
* Indicates destination processing of the stream succeeded, regardless of complete/incomplete
* status. This should only be called after all records and end of stream messages have been
* read.
*/
fun markProcessingSucceeded()
/**
* Mark that the stream was killed due to failure elsewhere. Returns false if task was already
* complete.
* Indicates destination processing of the stream failed. Returns false if task was already
* complete
*/
fun markKilled(causedBy: Exception): Boolean
/** Mark that the stream itself failed. Return false if task was already complete */
fun markFailed(causedBy: Exception): Boolean
fun markProcessingFailed(causedBy: Exception): Boolean
/** Suspend until the stream completes, returning the result. */
suspend fun awaitStreamResult(): StreamResult
/** True if the stream has not yet been marked successful, failed, or killed. */
/** True if the stream processing has not yet been marked as successful or failed. */
fun isActive(): Boolean
}
@@ -105,6 +106,7 @@ class DefaultStreamManager(
private val lastCheckpoint = AtomicLong(0L)
private val markedEndOfStream = AtomicBoolean(false)
private val receivedComplete = AtomicBoolean(false)
private val rangesState: ConcurrentHashMap<Batch.State, RangeSet<Long>> = ConcurrentHashMap()
@@ -124,10 +126,11 @@ class DefaultStreamManager(
return recordCount.get()
}
override fun markEndOfStream(): Long {
override fun markEndOfStream(receivedStreamCompleteMessage: Boolean): Long {
if (markedEndOfStream.getAndSet(true)) {
throw IllegalStateException("Stream is closed for reading")
}
receivedComplete.getAndSet(receivedStreamCompleteMessage)
return recordCount.get()
}
@@ -136,6 +139,10 @@ class DefaultStreamManager(
return markedEndOfStream.get()
}
override fun isComplete(): Boolean {
return receivedComplete.get()
}
override fun markCheckpoint(): Pair<Long, Long> {
val index = recordCount.get()
val lastCheckpoint = lastCheckpoint.getAndSet(index)
@@ -220,19 +227,15 @@ class DefaultStreamManager(
return isProcessingCompleteForState(index, Batch.State.PERSISTED)
}
override fun markSucceeded() {
override fun markProcessingSucceeded() {
if (!markedEndOfStream.get()) {
throw IllegalStateException("Stream is not closed for reading")
}
streamResult.complete(StreamSucceeded)
streamResult.complete(StreamProcessingSucceeded)
}
override fun markKilled(causedBy: Exception): Boolean {
return streamResult.complete(StreamKilled(causedBy))
}
override fun markFailed(causedBy: Exception): Boolean {
return streamResult.complete(StreamFailed(causedBy))
override fun markProcessingFailed(causedBy: Exception): Boolean {
return streamResult.complete(StreamProcessingFailed(causedBy))
}
override suspend fun awaitStreamResult(): StreamResult {

View File

@@ -14,14 +14,14 @@ import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.CompletableDeferred
sealed interface SyncResult
sealed interface DestinationResult
data object SyncSuccess : SyncResult
data object DestinationSuccess : DestinationResult
data class SyncFailure(
val syncFailure: Exception,
data class DestinationFailure(
val cause: Exception,
val streamResults: Map<DestinationStream.Descriptor, StreamResult>
) : SyncResult
) : DestinationResult
/** Manages the state of all streams in the destination. */
interface SyncManager {
@@ -35,18 +35,26 @@ interface SyncManager {
suspend fun getOrAwaitStreamLoader(stream: DestinationStream.Descriptor): StreamLoader
suspend fun getStreamLoaderOrNull(stream: DestinationStream.Descriptor): StreamLoader?
/** Suspend until all streams are complete. Returns false if any stream was failed/killed. */
suspend fun awaitAllStreamsCompletedSuccessfully(): Boolean
/**
* Suspend until all streams are processed successfully. Returns false if processing failed for
* any stream.
*/
suspend fun awaitAllStreamsProcessedSuccessfully(): Boolean
suspend fun markInputConsumed()
suspend fun markCheckpointsProcessed()
suspend fun markFailed(causedBy: Exception): SyncFailure
suspend fun markSucceeded()
suspend fun markDestinationFailed(causedBy: Exception): DestinationFailure
suspend fun markDestinationSucceeded()
/**
* Whether we received stream complete messages for all streams in the catalog from upstream.
*/
suspend fun allStreamsComplete(): Boolean
fun isActive(): Boolean
suspend fun awaitInputProcessingComplete(): Unit
suspend fun awaitSyncResult(): SyncResult
suspend fun awaitInputProcessingComplete()
suspend fun awaitDestinationResult(): DestinationResult
}
@SuppressFBWarnings(
@@ -56,7 +64,7 @@ interface SyncManager {
class DefaultSyncManager(
private val streamManagers: ConcurrentHashMap<DestinationStream.Descriptor, StreamManager>
) : SyncManager {
private val syncResult = CompletableDeferred<SyncResult>()
private val destinationResult = CompletableDeferred<DestinationResult>()
private val streamLoaders =
ConcurrentHashMap<DestinationStream.Descriptor, CompletableDeferred<Result<StreamLoader>>>()
private val inputConsumed = CompletableDeferred<Boolean>()
@@ -87,32 +95,38 @@ class DefaultSyncManager(
return streamLoaders[stream]?.await()?.getOrNull()
}
override suspend fun awaitAllStreamsCompletedSuccessfully(): Boolean {
return streamManagers.all { (_, manager) -> manager.awaitStreamResult() is StreamSucceeded }
override suspend fun awaitAllStreamsProcessedSuccessfully(): Boolean {
return streamManagers.all { (_, manager) ->
manager.awaitStreamResult() is StreamProcessingSucceeded
}
}
override suspend fun markFailed(causedBy: Exception): SyncFailure {
override suspend fun markDestinationFailed(causedBy: Exception): DestinationFailure {
val result =
SyncFailure(causedBy, streamManagers.mapValues { it.value.awaitStreamResult() })
syncResult.complete(result)
DestinationFailure(causedBy, streamManagers.mapValues { it.value.awaitStreamResult() })
destinationResult.complete(result)
return result
}
override suspend fun markSucceeded() {
override suspend fun markDestinationSucceeded() {
if (streamManagers.values.any { it.isActive() }) {
throw IllegalStateException(
"Cannot mark sync as succeeded until all streams are complete"
)
}
syncResult.complete(SyncSuccess)
destinationResult.complete(DestinationSuccess)
}
override suspend fun allStreamsComplete(): Boolean {
return streamManagers.all { it.value.isComplete() }
}
override fun isActive(): Boolean {
return syncResult.isActive
return destinationResult.isActive
}
override suspend fun awaitSyncResult(): SyncResult {
return syncResult.await()
override suspend fun awaitDestinationResult(): DestinationResult {
return destinationResult.await()
}
override suspend fun awaitInputProcessingComplete() {

View File

@@ -125,7 +125,7 @@ class DefaultDestinationTaskLauncher(
// File transfer
@Value("\${airbyte.file-transfer.enabled}") private val fileTransferEnabled: Boolean,
// Input Comsumer requirements
// Input Consumer requirements
private val inputFlow: SizedInputFlow<Reserved<DestinationMessage>>,
private val recordQueueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>>,

View File

@@ -16,7 +16,8 @@ interface CloseStreamTask : ImplementorScope
/**
* Wraps @[StreamLoader.close] and marks the stream as closed in the stream manager. Also starts the
* teardown task.
* teardown task. Called after the end of stream message (complete OR incomplete) has been received
* and all record messages have been processed.
*/
class DefaultCloseStreamTask(
private val syncManager: SyncManager,
@@ -27,7 +28,7 @@ class DefaultCloseStreamTask(
override suspend fun execute() {
val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor)
streamLoader.close()
syncManager.getStreamManager(streamDescriptor).markSucceeded()
syncManager.getStreamManager(streamDescriptor).markProcessingSucceeded()
taskLauncher.handleStreamClosed(streamLoader.stream.descriptor)
}
}

View File

@@ -5,8 +5,8 @@
package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.state.StreamIncompleteResult
import io.airbyte.cdk.load.state.StreamSucceeded
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.state.StreamProcessingSucceeded
import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.task.ImplementorScope
@@ -17,8 +17,8 @@ import jakarta.inject.Singleton
interface FailStreamTask : ImplementorScope
/**
* FailStreamTask is a task that is executed when a stream fails. It is responsible for cleaning up
* resources and reporting the failure.
* FailStreamTask is a task that is executed when the processing of a stream fails in the
* destination. It is responsible for cleaning up resources and reporting the failure.
*/
class DefaultFailStreamTask(
private val taskLauncher: DestinationTaskLauncher,
@@ -30,12 +30,12 @@ class DefaultFailStreamTask(
override suspend fun execute() {
val streamManager = syncManager.getStreamManager(stream)
streamManager.markFailed(exception)
streamManager.markProcessingFailed(exception)
when (val streamResult = streamManager.awaitStreamResult()) {
is StreamSucceeded -> {
is StreamProcessingSucceeded -> {
log.info { "Cannot fail stream $stream, which is already complete, doing nothing." }
}
is StreamIncompleteResult -> {
is StreamProcessingFailed -> {
syncManager.getStreamLoaderOrNull(stream)?.close(streamResult)
?: log.warn { "StreamLoader not found for stream $stream, cannot call close." }
}

View File

@@ -16,8 +16,9 @@ import jakarta.inject.Singleton
interface FailSyncTask : ImplementorScope
/**
* FailSyncTask is a task that is executed when a sync fails. It is responsible for cleaning up
* resources and reporting the failure.
* FailSyncTask is a task that is executed only when the destination itself fails during a sync. If
* the sync is failed by upstream (e.g. an incomplete stream message is received), we do not call
* this task. It is responsible for cleaning up resources and reporting the failure.
*/
class DefaultFailSyncTask(
private val taskLauncher: DestinationTaskLauncher,
@@ -31,7 +32,7 @@ class DefaultFailSyncTask(
override suspend fun execute() {
// Ensure any remaining ready state gets captured: don't waste work!
checkpointManager.flushReadyCheckpointMessages()
val result = syncManager.markFailed(exception) // awaits stream completion
val result = syncManager.markDestinationFailed(exception) // awaits stream completion
log.info { "Calling teardown with failure result $result" }
destinationWriter.teardown(result)
taskLauncher.handleTeardownComplete(success = false)

View File

@@ -31,9 +31,9 @@ class DefaultTeardownTask(
override suspend fun execute() {
syncManager.awaitInputProcessingComplete()
log.info { "Teardown task awaiting stream completion" }
if (!syncManager.awaitAllStreamsCompletedSuccessfully()) {
log.info { "Streams failed to complete successfully, doing nothing." }
log.info { "Teardown task awaiting stream processing completion" }
if (!syncManager.awaitAllStreamsProcessedSuccessfully()) {
log.info { "Streams failed to be processed successfully, doing nothing." }
return
}
@@ -41,7 +41,7 @@ class DefaultTeardownTask(
log.info { "Starting teardown task" }
destination.teardown()
log.info { "Teardown task complete, marking sync succeeded." }
syncManager.markSucceeded()
syncManager.markDestinationSucceeded()
taskLauncher.handleTeardownComplete()
}
}

View File

@@ -27,7 +27,7 @@ import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.message.StreamCheckpointWrapped
import io.airbyte.cdk.load.message.StreamCompleteEvent
import io.airbyte.cdk.load.message.StreamEndEvent
import io.airbyte.cdk.load.message.StreamRecordEvent
import io.airbyte.cdk.load.message.Undefined
import io.airbyte.cdk.load.state.Reserved
@@ -83,19 +83,23 @@ class DefaultInputConsumerTask(
}
is DestinationRecordStreamComplete -> {
reserved.release() // safe because multiple calls conflate
val wrapped = StreamCompleteEvent(index = manager.markEndOfStream())
val wrapped = StreamEndEvent(index = manager.markEndOfStream(true))
recordQueue.publish(reserved.replace(wrapped))
recordQueue.close()
}
is DestinationRecordStreamIncomplete -> {
reserved.release() // safe because multiple calls conflate
val wrapped = StreamEndEvent(index = manager.markEndOfStream(false))
recordQueue.publish(reserved.replace(wrapped))
recordQueue.close()
}
is DestinationRecordStreamIncomplete ->
throw IllegalStateException("Stream $stream failed upstream, cannot continue.")
is DestinationFile -> {
val index = manager.countRecordIn()
destinationTaskLauncher.handleFile(stream, message, index)
}
is DestinationFileStreamComplete -> {
reserved.release() // safe because multiple calls conflate
manager.markEndOfStream()
manager.markEndOfStream(true)
val envelope = BatchEnvelope(SimpleBatch(Batch.State.COMPLETE))
destinationTaskLauncher.handleNewBatch(stream, envelope)
}

View File

@@ -15,7 +15,7 @@ import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.QueueReader
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.message.StreamCompleteEvent
import io.airbyte.cdk.load.message.StreamEndEvent
import io.airbyte.cdk.load.message.StreamFlushEvent
import io.airbyte.cdk.load.message.StreamRecordEvent
import io.airbyte.cdk.load.state.FlushStrategy
@@ -61,13 +61,12 @@ class DefaultSpillToDiskTask(
override suspend fun execute() {
val initialAccumulator = fileAccFactory.make()
val registration = outputQueue.registerProducer()
registration.use {
outputQueue.use {
inputQueue.consume().fold(initialAccumulator) { acc, reserved ->
reserved.use {
when (val event = it.value) {
is StreamRecordEvent -> accRecordEvent(acc, event)
is StreamCompleteEvent -> accStreamCompleteEvent(acc, event)
is StreamEndEvent -> accStreamEndEvent(acc, event)
is StreamFlushEvent -> accFlushEvent(acc)
}
}
@@ -117,12 +116,12 @@ class DefaultSpillToDiskTask(
}
/**
* Handles accumulation of stream completion events, triggering a final flush if the aggregate
* isn't empty.
* Handles accumulation of stream end events (complete or incomplete), triggering a final flush
* if the aggregate isn't empty.
*/
private suspend fun accStreamCompleteEvent(
private suspend fun accStreamEndEvent(
acc: FileAccumulator,
event: StreamCompleteEvent,
event: StreamEndEvent,
): FileAccumulator {
val (spillFile, outputStream, timeWindow, range, sizeBytes) = acc
if (sizeBytes == 0L) {

View File

@@ -5,7 +5,7 @@
package io.airbyte.cdk.load.write
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.state.SyncFailure
import io.airbyte.cdk.load.state.DestinationFailure
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
@@ -22,7 +22,7 @@ interface DestinationWriter {
// Called once at the end of the job, unconditionally.
// NOTE: we don't pass Success here, because it depends on this completing successfully.
suspend fun teardown(syncFailure: SyncFailure? = null) {}
suspend fun teardown(destinationFailure: DestinationFailure? = null) {}
}
@Singleton

View File

@@ -9,7 +9,7 @@ import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamIncompleteResult
import io.airbyte.cdk.load.state.StreamProcessingFailed
/**
* Implementor interface. The framework calls open and close once per stream at the beginning and
@@ -39,5 +39,5 @@ interface StreamLoader {
suspend fun processRecords(records: Iterator<DestinationRecord>, totalSizeBytes: Long): Batch
suspend fun processFile(file: DestinationFile): Batch
suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE)
suspend fun close(streamFailure: StreamIncompleteResult? = null) {}
suspend fun close(streamFailure: StreamProcessingFailed? = null) {}
}

View File

@@ -0,0 +1,16 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.write
/**
* Thrown when the destination completes successfully, but some streams were indicated as incomplete
* by upstream. Without throwing an exception the sync will not be marked as succeed by the
* platform.
*
* TODO: Once the API with platform is updated to not require an exceptional exit code, remove this.
*/
class StreamsIncompleteException : Exception() {
override val message = "Some streams were indicated as incomplete by upstream."
}

View File

@@ -5,9 +5,9 @@
package io.airbyte.cdk.load.write
import io.airbyte.cdk.Operation
import io.airbyte.cdk.load.state.SyncFailure
import io.airbyte.cdk.load.state.DestinationFailure
import io.airbyte.cdk.load.state.DestinationSuccess
import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.load.state.SyncSuccess
import io.airbyte.cdk.load.task.TaskLauncher
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
@@ -32,13 +32,19 @@ class WriteOperation(
override fun execute() = runBlocking {
taskLauncher.run()
when (val result = syncManager.awaitSyncResult()) {
is SyncSuccess -> {
log.info { "Sync completed successfully" }
when (val result = syncManager.awaitDestinationResult()) {
is DestinationSuccess -> {
if (!syncManager.allStreamsComplete()) {
log.info {
"Destination completed successfully but some streams were incomplete. Throwing to exit non-zero..."
}
throw StreamsIncompleteException()
}
log.info { "Destination completed successfully and all streams were complete." }
}
is SyncFailure -> {
log.info { "Sync failed with stream results ${result.streamResults}" }
throw result.syncFailure
is DestinationFailure -> {
log.info { "Destination failed with stream results ${result.streamResults}" }
throw result.cause
}
}
}

View File

@@ -7,12 +7,10 @@ package io.airbyte.cdk.load.message
import io.mockk.coVerify
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import java.lang.IllegalStateException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(MockKExtension::class)
@@ -21,33 +19,23 @@ class MultiProducerChannelTest {
private lateinit var channel: MultiProducerChannel<String>
val size = 3L
@BeforeEach
fun setup() {
channel = MultiProducerChannel(wrapped)
channel = MultiProducerChannel(size, wrapped)
}
@Test
fun `cannot register a producer if channel already closed`() = runTest {
channel.registerProducer()
fun `does not close until the expected number of producers have closed`() = runTest {
channel.close()
assertThrows<IllegalStateException> { channel.registerProducer() }
}
@Test
fun `does not close underlying channel while registered producers exist`() = runTest {
channel.registerProducer()
channel.registerProducer()
channel.close()
coVerify(exactly = 0) { wrapped.close() }
}
@Test
fun `closes underlying channel when no producers are registered`() = runTest {
channel.registerProducer()
channel.registerProducer()
channel.registerProducer()
channel.close()
channel.close()
channel.close()
@@ -56,9 +44,7 @@ class MultiProducerChannelTest {
@Test
fun `subsequent calls to to close are idempotent`() = runTest {
channel.registerProducer()
channel.registerProducer()
channel.close()
channel.close()
channel.close()
channel.close()

View File

@@ -62,17 +62,19 @@ class StreamManagerTest {
val manager = DefaultStreamManager(stream1)
val channel = Channel<Boolean>(Channel.UNLIMITED)
launch { channel.send(manager.awaitStreamResult() is StreamSucceeded) }
launch { channel.send(manager.awaitStreamResult() is StreamProcessingSucceeded) }
delay(500)
Assertions.assertTrue(channel.tryReceive().isFailure)
Assertions.assertThrows(IllegalStateException::class.java) { manager.markSucceeded() }
manager.markEndOfStream()
Assertions.assertThrows(IllegalStateException::class.java) {
manager.markProcessingSucceeded()
}
manager.markEndOfStream(true)
manager.markSucceeded()
manager.markProcessingSucceeded()
Assertions.assertTrue(channel.receive())
Assertions.assertEquals(StreamSucceeded, manager.awaitStreamResult())
Assertions.assertEquals(StreamProcessingSucceeded, manager.awaitStreamResult())
}
@Test
@@ -80,29 +82,14 @@ class StreamManagerTest {
val manager = DefaultStreamManager(stream1)
val channel = Channel<Boolean>(Channel.UNLIMITED)
launch { channel.send(manager.awaitStreamResult() is StreamSucceeded) }
launch { channel.send(manager.awaitStreamResult() is StreamProcessingSucceeded) }
delay(500)
Assertions.assertTrue(channel.tryReceive().isFailure)
manager.markFailed(Exception("test"))
manager.markProcessingFailed(Exception("test"))
Assertions.assertFalse(channel.receive())
Assertions.assertTrue(manager.awaitStreamResult() is StreamFailed)
}
@Test
fun testMarkKilled() = runTest {
val manager = DefaultStreamManager(stream1)
val channel = Channel<Boolean>(Channel.UNLIMITED)
launch { channel.send(manager.awaitStreamResult() is StreamSucceeded) }
delay(500)
Assertions.assertTrue(channel.tryReceive().isFailure)
manager.markKilled(Exception("test"))
Assertions.assertFalse(channel.receive())
Assertions.assertTrue(manager.awaitStreamResult() is StreamKilled)
Assertions.assertTrue(manager.awaitStreamResult() is StreamProcessingFailed)
}
class TestUpdateBatchStateProvider : ArgumentsProvider {
@@ -274,7 +261,7 @@ class StreamManagerTest {
val manager = managers[stream.descriptor]!!
when (event) {
is SetRecordCount -> repeat(event.count.toInt()) { manager.countRecordIn() }
is SetEndOfStream -> manager.markEndOfStream()
is SetEndOfStream -> manager.markEndOfStream(true)
is AddPersisted ->
manager.updateBatchState(
BatchEnvelope(
@@ -310,23 +297,25 @@ class StreamManagerTest {
val manager = DefaultStreamManager(stream1)
// Can't mark success before end-of-stream
Assertions.assertThrows(IllegalStateException::class.java) { manager.markSucceeded() }
Assertions.assertThrows(IllegalStateException::class.java) {
manager.markProcessingSucceeded()
}
manager.countRecordIn()
manager.markEndOfStream()
manager.markEndOfStream(true)
// Can't update after end-of-stream
Assertions.assertThrows(IllegalStateException::class.java) { manager.countRecordIn() }
Assertions.assertThrows(IllegalStateException::class.java) { manager.markEndOfStream() }
Assertions.assertThrows(IllegalStateException::class.java) { manager.markEndOfStream(true) }
// Can close now
Assertions.assertDoesNotThrow(manager::markSucceeded)
Assertions.assertDoesNotThrow(manager::markProcessingSucceeded)
}
@Test
fun testEmptyCompletedStreamYieldsBatchProcessingComplete() {
val manager = DefaultStreamManager(stream1)
manager.markEndOfStream()
manager.markEndOfStream(true)
Assertions.assertTrue(manager.isBatchProcessingComplete())
}
@@ -407,7 +396,7 @@ class StreamManagerTest {
val range2 = Range.closed(10, 19L)
val batch2 = BatchEnvelope(SimpleBatch(Batch.State.PERSISTED, groupId = "foo"), range2)
manager.markEndOfStream()
manager.markEndOfStream(true)
manager.updateBatchState(batch2)
manager.updateBatchState(batch1)

View File

@@ -41,43 +41,43 @@ class SyncManagerTest {
// deferred; B) It should probably move into a writer wrapper.
@Test
fun testAwaitAllStreamsCompletedSuccessfully() = runTest {
fun testAwaitAllStreamsProcessedSuccessfully() = runTest {
val manager1 = syncManager.getStreamManager(stream1.descriptor)
val manager2 = syncManager.getStreamManager(stream2.descriptor)
val completionChannel = Channel<Boolean>(Channel.UNLIMITED)
manager1.markEndOfStream()
manager2.markEndOfStream()
manager1.markEndOfStream(true)
manager2.markEndOfStream(true)
launch { completionChannel.send(syncManager.awaitAllStreamsCompletedSuccessfully()) }
launch { completionChannel.send(syncManager.awaitAllStreamsProcessedSuccessfully()) }
delay(500)
Assertions.assertTrue(completionChannel.tryReceive().isFailure)
manager1.markSucceeded()
manager1.markProcessingSucceeded()
delay(500)
Assertions.assertTrue(completionChannel.tryReceive().isFailure)
manager2.markSucceeded()
manager2.markProcessingSucceeded()
Assertions.assertTrue(completionChannel.receive())
}
@Test
fun testAwaitAllStreamsCompletedSuccessfullyWithFailure() = runTest {
fun testAwaitAllStreamsProcessedSuccessfullyWithFailure() = runTest {
val manager1 = syncManager.getStreamManager(stream1.descriptor)
val manager2 = syncManager.getStreamManager(stream2.descriptor)
val completionChannel = Channel<Boolean>(Channel.UNLIMITED)
launch { completionChannel.send(syncManager.awaitAllStreamsCompletedSuccessfully()) }
launch { completionChannel.send(syncManager.awaitAllStreamsProcessedSuccessfully()) }
manager1.markEndOfStream()
manager2.markEndOfStream()
manager1.markEndOfStream(true)
manager2.markEndOfStream(true)
delay(500)
Assertions.assertTrue(completionChannel.tryReceive().isFailure)
manager1.markSucceeded()
manager1.markProcessingSucceeded()
delay(500)
Assertions.assertTrue(completionChannel.tryReceive().isFailure)
manager2.markFailed(RuntimeException())
manager2.markProcessingFailed(RuntimeException())
Assertions.assertFalse(completionChannel.receive())
}
@@ -86,15 +86,15 @@ class SyncManagerTest {
val manager1 = syncManager.getStreamManager(stream1.descriptor)
val manager2 = syncManager.getStreamManager(stream2.descriptor)
manager1.markEndOfStream()
manager2.markEndOfStream()
manager1.markEndOfStream(true)
manager2.markEndOfStream(true)
Assertions.assertTrue(syncManager.isActive())
manager1.markSucceeded()
manager1.markProcessingSucceeded()
Assertions.assertTrue(syncManager.isActive())
manager2.markSucceeded()
manager2.markProcessingSucceeded()
Assertions.assertTrue(syncManager.isActive())
syncManager.markSucceeded()
syncManager.markDestinationSucceeded()
Assertions.assertFalse(syncManager.isActive())
}
@@ -103,35 +103,35 @@ class SyncManagerTest {
val manager1 = syncManager.getStreamManager(stream1.descriptor)
val manager2 = syncManager.getStreamManager(stream2.descriptor)
manager1.markEndOfStream()
manager2.markEndOfStream()
manager1.markEndOfStream(true)
manager2.markEndOfStream(true)
val completionChannel = Channel<SyncResult>(Channel.UNLIMITED)
val completionChannel = Channel<DestinationResult>(Channel.UNLIMITED)
launch { completionChannel.send(syncManager.awaitSyncResult()) }
launch { completionChannel.send(syncManager.awaitDestinationResult()) }
CoroutineTestUtils.assertThrows(IllegalStateException::class) {
syncManager.markSucceeded()
syncManager.markDestinationSucceeded()
}
Assertions.assertTrue(completionChannel.tryReceive().isFailure)
manager1.markSucceeded()
manager1.markProcessingSucceeded()
CoroutineTestUtils.assertThrows(IllegalStateException::class) {
syncManager.markSucceeded()
syncManager.markDestinationSucceeded()
}
Assertions.assertTrue(completionChannel.tryReceive().isFailure)
manager2.markSucceeded()
manager2.markProcessingSucceeded()
Assertions.assertTrue(completionChannel.tryReceive().isFailure)
syncManager.markSucceeded()
Assertions.assertEquals(SyncSuccess, completionChannel.receive())
syncManager.markDestinationSucceeded()
Assertions.assertEquals(DestinationSuccess, completionChannel.receive())
}
@Test
fun testCrashOnNoEndOfStream() = runTest {
val manager1 = syncManager.getStreamManager(stream1.descriptor)
manager1.markEndOfStream()
manager1.markEndOfStream(true)
// This should fail, because stream2 was not marked with end of stream
val e = assertThrows<IllegalStateException> { syncManager.markInputConsumed() }
assertEquals(

View File

@@ -419,7 +419,7 @@ class DestinationTaskLauncherTest<T : ScopedTask> {
syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
repeat(100) { streamManager.countRecordIn() }
streamManager.markEndOfStream()
streamManager.markEndOfStream(true)
// Verify incomplete batch triggers process batch
val incompleteBatch = BatchEnvelope(MockBatch(Batch.State.LOCAL), range)
@@ -468,7 +468,7 @@ class DestinationTaskLauncherTest<T : ScopedTask> {
val range = TreeRangeSet.create(listOf(Range.closed(0L, 0L)))
val streamManager =
syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
streamManager.markEndOfStream()
streamManager.markEndOfStream(true)
val emptyBatch = BatchEnvelope(MockBatch(Batch.State.COMPLETE), range)
taskLauncher.handleNewBatch(MockDestinationCatalogFactory.stream1.descriptor, emptyBatch)

View File

@@ -14,7 +14,7 @@ import io.airbyte.cdk.load.message.GlobalCheckpointWrapped
import io.airbyte.cdk.load.message.MessageQueue
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.StreamCheckpointWrapped
import io.airbyte.cdk.load.message.StreamCompleteEvent
import io.airbyte.cdk.load.message.StreamEndEvent
import io.airbyte.cdk.load.message.StreamRecordEvent
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.state.Reserved
@@ -144,11 +144,11 @@ class InputConsumerTaskTest {
Assertions.assertEquals(expectedRecords, messages1.map { it.value })
Assertions.assertEquals(expectedRecords.map { _ -> 1L }, messages1.map { it.bytesReserved })
Assertions.assertEquals(StreamCompleteEvent(10), streamComplete1.value)
Assertions.assertEquals(StreamEndEvent(10), streamComplete1.value)
Assertions.assertEquals(1, streamComplete1.bytesReserved)
Assertions.assertEquals(10L, manager1.recordCount())
Assertions.assertEquals(emptyList<DestinationStreamEvent>(), queue1.consume().toList())
Assertions.assertEquals(StreamCompleteEvent(0), streamComplete2.value)
Assertions.assertEquals(StreamEndEvent(0), streamComplete2.value)
Assertions.assertEquals(emptyList<DestinationStreamEvent>(), queue2.consume().toList())
Assertions.assertEquals(0L, manager2.recordCount())
mockInputFlow.stop()
@@ -208,7 +208,7 @@ class InputConsumerTaskTest {
"test"
)
),
StreamCompleteEvent(1)
StreamEndEvent(1)
),
queue2.consume().toList().map { it.value }
)
@@ -220,7 +220,7 @@ class InputConsumerTaskTest {
queue1.close()
val messages1 = queue1.consume().toList()
Assertions.assertEquals(11, messages1.size)
Assertions.assertEquals(messages1[10].value, StreamCompleteEvent(10))
Assertions.assertEquals(messages1[10].value, StreamEndEvent(10))
Assertions.assertEquals(
mockInputFlow.initialMemory - 11,
mockInputFlow.memoryManager.remainingCapacityBytes,
@@ -353,30 +353,6 @@ class InputConsumerTaskTest {
mockInputFlow.stop()
}
@Test
fun testStreamIncompleteThrows() = runTest {
mockInputFlow.addMessage(
StubDestinationMessageFactory.makeRecord(MockDestinationCatalogFactory.stream1, "test"),
1L
)
mockInputFlow.addMessage(
StubDestinationMessageFactory.makeStreamIncomplete(
MockDestinationCatalogFactory.stream1
),
0L
)
val task =
taskFactory.make(
mockCatalogFactory.make(),
mockInputFlow,
recordQueueSupplier,
checkpointQueue,
mockk(),
)
CoroutineTestUtils.assertThrows(IllegalStateException::class) { task.execute() }
mockInputFlow.stop()
}
@Test
fun testFileStreamIncompleteThrows() = runTest {
mockInputFlow.addMessage(

View File

@@ -17,7 +17,7 @@ import io.airbyte.cdk.load.message.DestinationStreamEventQueue
import io.airbyte.cdk.load.message.DestinationStreamQueueSupplier
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.StreamCompleteEvent
import io.airbyte.cdk.load.message.StreamEndEvent
import io.airbyte.cdk.load.message.StreamFlushEvent
import io.airbyte.cdk.load.message.StreamRecordEvent
import io.airbyte.cdk.load.state.FlushStrategy
@@ -111,7 +111,7 @@ class SpillToDiskTaskTest {
@Test
fun `publishes 'spilled file' aggregates on stream complete event`() = runTest {
val completeMsg = StreamCompleteEvent(0L)
val completeMsg = StreamEndEvent(0L)
inputQueue.publish(Reserved(value = completeMsg))
val job = launch {
@@ -267,7 +267,7 @@ class SpillToDiskTaskTest {
queue.publish(
memoryManager.reserve(
0L,
StreamCompleteEvent(index = maxRecords),
StreamEndEvent(index = maxRecords),
),
)
return recordsWritten

View File

@@ -18,7 +18,7 @@ import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.state.DestinationStateManager
import io.airbyte.cdk.load.state.StreamIncompleteResult
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
import io.airbyte.cdk.load.write.StreamLoader
import io.github.oshai.kotlinlogging.KotlinLogging
@@ -160,7 +160,7 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
)
}
override suspend fun close(streamFailure: StreamIncompleteResult?) {
override suspend fun close(streamFailure: StreamProcessingFailed?) {
if (streamFailure != null) {
log.info { "Sync failed, persisting destination state for next run" }
destinationStateManager.persistState(stream)

View File

@@ -491,7 +491,7 @@ protected constructor(
* both syncs are preserved.
*/
@Test
open fun testOverwriteSyncFailedResumedGeneration() {
fun testOverwriteSyncFailedResumedGeneration() {
assumeTrue(
implementsOverwrite(),
"Destination's spec.json does not support overwrite sync mode."
@@ -525,7 +525,7 @@ protected constructor(
/** Test runs 2 failed syncs and verifies the previous sync objects are not cleaned up. */
@Test
open fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {
fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {
assumeTrue(
implementsOverwrite(),
"Destination's spec.json does not support overwrite sync mode."

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
dockerRepository: airbyte/destination-iceberg-v2
githubIssueLabel: destination-iceberg-v2
icon: s3.svg

View File

@@ -11,7 +11,7 @@ import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamIncompleteResult
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory
@@ -74,7 +74,7 @@ class IcebergStreamLoader(
throw NotImplementedError("Destination Iceberg does not support universal file transfer.")
}
override suspend fun close(streamFailure: StreamIncompleteResult?) {
override suspend fun close(streamFailure: StreamProcessingFailed?) {
if (streamFailure == null) {
// Doing it first to make sure that data coming in the current batch is written to the
// main branch

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.3.1
dockerImageTag: 0.3.2
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
icon: s3.svg

View File

@@ -21,8 +21,4 @@ class S3V2AvroDestinationAcceptanceTest : S3BaseAvroDestinationAcceptanceTest()
override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
// Disable these tests until we fix the incomplete stream handling behavior.
override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
override fun testOverwriteSyncFailedResumedGeneration() {}
}

View File

@@ -22,8 +22,4 @@ class S3V2CsvAssumeRoleDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanc
override fun testFakeFileTransfer() {
super.testFakeFileTransfer()
}
// Disable these tests until we fix the incomplete stream handling behavior.
override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
override fun testOverwriteSyncFailedResumedGeneration() {}
}

View File

@@ -15,8 +15,4 @@ class S3V2CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {
override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
// Disable these tests until we fix the incomplete stream handling behavior.
override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
override fun testOverwriteSyncFailedResumedGeneration() {}
}

View File

@@ -15,8 +15,4 @@ class S3V2CsvGzipDestinationAcceptanceTest : S3BaseCsvGzipDestinationAcceptanceT
override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
// Disable these tests until we fix the incomplete stream handling behavior.
override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
override fun testOverwriteSyncFailedResumedGeneration() {}
}

View File

@@ -15,8 +15,4 @@ class S3V2JsonlDestinationAcceptanceTest : S3BaseJsonlDestinationAcceptanceTest(
override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
// Disable these tests until we fix the incomplete stream handling behavior.
override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
override fun testOverwriteSyncFailedResumedGeneration() {}
}

View File

@@ -15,8 +15,4 @@ class S3V2JsonlGzipDestinationAcceptanceTest : S3BaseJsonlGzipDestinationAccepta
override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
// Disable these tests until we fix the incomplete stream handling behavior.
override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
override fun testOverwriteSyncFailedResumedGeneration() {}
}

View File

@@ -73,8 +73,4 @@ class S3V2ParquetDestinationAcceptanceTest : S3BaseParquetDestinationAcceptanceT
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false)
}
// Disable these tests until we fix the incomplete stream handling behavior.
override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
override fun testOverwriteSyncFailedResumedGeneration() {}
}