Destination MSSQL / bulk load CDK: tweak how we process metadata (#56402)
This commit is contained in:
@@ -87,6 +87,7 @@ abstract class DestinationConfiguration : Configuration {
|
||||
const val DEFAULT_RECORD_BATCH_SIZE_BYTES = 200L * 1024L * 1024L
|
||||
const val DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 60L
|
||||
const val DEFAULT_MAX_TIME_WITHOUT_FLUSHING_DATA_SECONDS = 15 * 60L
|
||||
const val DEFAULT_GENERATION_ID_METADATA_KEY = "ab-generation-id"
|
||||
}
|
||||
|
||||
// DEPRECATED: Old interface config. TODO: Drop when we're totally migrated.
|
||||
@@ -98,6 +99,8 @@ abstract class DestinationConfiguration : Configuration {
|
||||
open val numProcessBatchWorkersForFileTransfer: Int = 3
|
||||
open val batchQueueDepth: Int = 10
|
||||
|
||||
open val generationIdMetadataKey: String = DEFAULT_GENERATION_ID_METADATA_KEY
|
||||
|
||||
/**
|
||||
* Micronaut factory which glues [ConfigurationSpecificationSupplier] and
|
||||
* [DestinationConfigurationFactory] together to produce a [DestinationConfiguration] singleton.
|
||||
|
||||
@@ -82,7 +82,7 @@ class AzureBlobClient(
|
||||
return props?.metadata ?: emptyMap()
|
||||
}
|
||||
|
||||
suspend fun getProperties(key: String): OffsetDateTime? {
|
||||
fun getProperties(key: String): OffsetDateTime? {
|
||||
val blobClient =
|
||||
serviceClient.getBlobContainerClient(blobConfig.containerName).getBlobClient(key)
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@ import java.util.concurrent.ConcurrentSkipListMap
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
private const val BLOB_ID_PREFIX = "block"
|
||||
private const val RESERVED_PREFIX = "x-ms-"
|
||||
|
||||
class AzureBlobStreamingUpload(
|
||||
private val blockBlobClient: BlockBlobClient,
|
||||
@@ -26,7 +25,6 @@ class AzureBlobStreamingUpload(
|
||||
private val log = KotlinLogging.logger {}
|
||||
private val isComplete = AtomicBoolean(false)
|
||||
private val blockIds = ConcurrentSkipListMap<Int, String>()
|
||||
private val invalidCharsRegex = Regex("[<>:\"/\\\\?#*\\-]")
|
||||
|
||||
/**
|
||||
* Each part that arrives is treated as a new block. We must generate unique block IDs for each
|
||||
@@ -68,15 +66,13 @@ class AzureBlobStreamingUpload(
|
||||
} else {
|
||||
val blockList = blockIds.values.toList()
|
||||
log.info { "Committing block list for ${blockBlobClient.blobName}: $blockList" }
|
||||
blockBlobClient.commitBlockList(blockIds.values.toList(), true) // Overwrite = true
|
||||
}
|
||||
|
||||
blockBlobClient.commitBlockList(blockIds.values.toList(), true) // Overwrite = true
|
||||
|
||||
// Set any metadata
|
||||
if (metadata.isNotEmpty()) {
|
||||
val filteredMetadata = filterInvalidMetadata(metadata)
|
||||
if (filteredMetadata.isNotEmpty()) {
|
||||
blockBlobClient.setMetadata(filteredMetadata)
|
||||
}
|
||||
blockBlobClient.setMetadata(metadata)
|
||||
}
|
||||
} else {
|
||||
log.warn { "Complete called multiple times for ${blockBlobClient.blobName}" }
|
||||
@@ -102,43 +98,4 @@ class AzureBlobStreamingUpload(
|
||||
// Encode the entire fixed-length buffer to Base64
|
||||
return Base64.getEncoder().encodeToString(buffer.array())
|
||||
}
|
||||
/**
|
||||
* Return a new map containing only valid key/value pairs according to Azure metadata
|
||||
* constraints.
|
||||
*/
|
||||
private fun filterInvalidMetadata(metadata: Map<String, String>): Map<String, String> {
|
||||
return metadata.filter { (key, value) -> isValidKey(key) && isValidValue(value) }
|
||||
}
|
||||
/**
|
||||
* Validates if the provided key string meets the required criteria.
|
||||
*
|
||||
* @param key The string to validate as a key
|
||||
* @return Boolean indicating whether the key is valid
|
||||
*/
|
||||
private fun isValidKey(key: String): Boolean {
|
||||
// Reject empty keys or keys that start with reserved prefix
|
||||
if (key.isBlank() || key.startsWith(RESERVED_PREFIX)) return false
|
||||
|
||||
// Reject keys containing any characters matching the invalid pattern
|
||||
if (invalidCharsRegex.containsMatchIn(key)) return false
|
||||
|
||||
// Ensure all characters are within the printable ASCII range (32-126)
|
||||
// This includes letters, numbers, and common symbols
|
||||
return key.all { it.code in 32..126 }
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates if the provided value string meets the required criteria.
|
||||
*
|
||||
* @param value The string to validate as a value
|
||||
* @return Boolean indicating whether the value is valid
|
||||
*/
|
||||
private fun isValidValue(value: String): Boolean {
|
||||
// Reject values containing any characters matching the invalid pattern
|
||||
if (invalidCharsRegex.containsMatchIn(value)) return false
|
||||
|
||||
// Ensure all characters are within the printable ASCII range (32-126)
|
||||
// This includes letters, numbers, and common symbols
|
||||
return value.all { it.code in 32..126 }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ class AzureBlobStreamingUploadTest {
|
||||
containerName = "fakeContainer",
|
||||
sharedAccessSignature = "null"
|
||||
)
|
||||
metadata = mapOf("env" to "dev", "author" to "testUser", "ab-generation-id" to "0")
|
||||
metadata = mapOf("env" to "dev", "author" to "testUser", "ab_generation_id" to "0")
|
||||
|
||||
// By default, let's assume blobName returns something
|
||||
every { blockBlobClient.blobName } returns "testBlob"
|
||||
@@ -85,18 +85,24 @@ class AzureBlobStreamingUploadTest {
|
||||
// We want to ensure commitBlockList is NOT called
|
||||
val blobItem = mockk<BlockBlobItem>()
|
||||
every { blockBlobClient.commitBlockList(any(), any()) } returns blobItem
|
||||
every { blockBlobClient.setMetadata(mapOf("env" to "dev", "author" to "testUser")) } just
|
||||
runs
|
||||
// note that generation ID metadata is changed from ab-generation-id -> ab_generation_id
|
||||
every {
|
||||
blockBlobClient.setMetadata(
|
||||
mapOf("env" to "dev", "author" to "testUser", "ab_generation_id" to "0")
|
||||
)
|
||||
} just runs
|
||||
|
||||
// Act
|
||||
val resultBlob = streamingUpload.complete()
|
||||
|
||||
// Assert
|
||||
// 1) No block list calls
|
||||
verify(exactly = 0) { blockBlobClient.commitBlockList(any(), any()) }
|
||||
// 1) We committed the empty blob
|
||||
verify(exactly = 1) { blockBlobClient.commitBlockList(emptyList(), true) }
|
||||
// 2) Metadata still set (the code checks for empty map, but here it's non-empty).
|
||||
verify(exactly = 1) {
|
||||
blockBlobClient.setMetadata(mapOf("env" to "dev", "author" to "testUser"))
|
||||
blockBlobClient.setMetadata(
|
||||
mapOf("env" to "dev", "author" to "testUser", "ab_generation_id" to "0")
|
||||
)
|
||||
}
|
||||
|
||||
// 3) Return object is AzureBlob
|
||||
@@ -133,7 +139,9 @@ class AzureBlobStreamingUploadTest {
|
||||
)
|
||||
}
|
||||
verify(exactly = 1) {
|
||||
blockBlobClient.setMetadata(mapOf("env" to "dev", "author" to "testUser"))
|
||||
blockBlobClient.setMetadata(
|
||||
mapOf("env" to "dev", "author" to "testUser", "ab_generation_id" to "0")
|
||||
)
|
||||
}
|
||||
// Confirm the returned object
|
||||
assertEquals("testBlob", resultBlob.key)
|
||||
@@ -161,7 +169,9 @@ class AzureBlobStreamingUploadTest {
|
||||
verify(exactly = 1) { blockBlobClient.commitBlockList(any(), true) }
|
||||
// setMetadata also only once
|
||||
verify(exactly = 1) {
|
||||
blockBlobClient.setMetadata(mapOf("env" to "dev", "author" to "testUser"))
|
||||
blockBlobClient.setMetadata(
|
||||
mapOf("env" to "dev", "author" to "testUser", "ab_generation_id" to "0")
|
||||
)
|
||||
}
|
||||
// Both calls return the same AzureBlob reference
|
||||
assertEquals("testBlob", firstCall.key)
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
package io.airbyte.cdk.load.pipline.object_storage
|
||||
|
||||
import io.airbyte.cdk.load.command.DestinationCatalog
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
|
||||
import io.airbyte.cdk.load.file.object_storage.RemoteObject
|
||||
import io.airbyte.cdk.load.file.object_storage.StreamingUpload
|
||||
@@ -14,8 +15,8 @@ import io.airbyte.cdk.load.pipeline.BatchAccumulator
|
||||
import io.airbyte.cdk.load.pipeline.BatchAccumulatorResult
|
||||
import io.airbyte.cdk.load.pipeline.FinalOutput
|
||||
import io.airbyte.cdk.load.pipeline.IntermediateOutput
|
||||
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
|
||||
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
|
||||
import io.airbyte.cdk.load.write.object_storage.metadataFor
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import io.micronaut.context.annotation.Requires
|
||||
import jakarta.inject.Singleton
|
||||
@@ -41,6 +42,7 @@ class ObjectLoaderPartLoader<T : RemoteObject<*>>(
|
||||
private val client: ObjectStorageClient<T>,
|
||||
private val catalog: DestinationCatalog,
|
||||
private val uploads: UploadsInProgress<T>,
|
||||
private val destinationConfig: DestinationConfiguration,
|
||||
) :
|
||||
BatchAccumulator<
|
||||
ObjectLoaderPartLoader.State<T>,
|
||||
@@ -82,7 +84,7 @@ class ObjectLoaderPartLoader<T : RemoteObject<*>>(
|
||||
CoroutineScope(Dispatchers.IO).async {
|
||||
client.startStreamingUpload(
|
||||
key.objectKey,
|
||||
metadata = ObjectStorageDestinationState.metadataFor(stream)
|
||||
metadata = destinationConfig.metadataFor(stream)
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
package io.airbyte.cdk.load.state.object_storage
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.command.DestinationStream
|
||||
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
|
||||
import io.airbyte.cdk.load.file.object_storage.PathFactory
|
||||
@@ -25,6 +26,7 @@ class ObjectStorageDestinationState(
|
||||
private val stream: DestinationStream,
|
||||
private val client: ObjectStorageClient<*>,
|
||||
private val pathFactory: PathFactory,
|
||||
private val destinationConfig: DestinationConfiguration,
|
||||
) : DestinationState {
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
@@ -34,11 +36,7 @@ class ObjectStorageDestinationState(
|
||||
pathFactory.getPathMatcher(stream, suffixPattern = OPTIONAL_ORDINAL_SUFFIX_PATTERN)
|
||||
|
||||
companion object {
|
||||
const val METADATA_GENERATION_ID_KEY = "ab-generation-id"
|
||||
const val OPTIONAL_ORDINAL_SUFFIX_PATTERN = "(-[0-9]+)?"
|
||||
|
||||
fun metadataFor(stream: DestinationStream): Map<String, String> =
|
||||
mapOf(METADATA_GENERATION_ID_KEY to stream.generationId.toString())
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -64,7 +62,10 @@ class ObjectStorageDestinationState(
|
||||
.toList() // Force the list call to complete before initiating metadata calls
|
||||
.mapNotNull { obj ->
|
||||
val generationId =
|
||||
client.getMetadata(obj.key)[METADATA_GENERATION_ID_KEY]?.toLongOrNull() ?: 0L
|
||||
client
|
||||
.getMetadata(obj.key)[destinationConfig.generationIdMetadataKey]
|
||||
?.toLongOrNull()
|
||||
?: 0L
|
||||
if (generationId < stream.minimumGenerationId) {
|
||||
Pair(generationId, obj)
|
||||
} else {
|
||||
@@ -118,10 +119,11 @@ class ObjectStorageDestinationState(
|
||||
@Singleton
|
||||
class ObjectStorageFallbackPersister(
|
||||
private val client: ObjectStorageClient<*>,
|
||||
private val pathFactory: PathFactory
|
||||
private val pathFactory: PathFactory,
|
||||
private val destinationConfig: DestinationConfiguration,
|
||||
) : DestinationStatePersister<ObjectStorageDestinationState> {
|
||||
override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState {
|
||||
return ObjectStorageDestinationState(stream, client, pathFactory)
|
||||
return ObjectStorageDestinationState(stream, client, pathFactory, destinationConfig)
|
||||
}
|
||||
|
||||
override suspend fun persist(stream: DestinationStream, state: ObjectStorageDestinationState) {
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
|
||||
package io.airbyte.cdk.load.write.object_storage
|
||||
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.command.DestinationStream
|
||||
import io.airbyte.cdk.load.message.BatchState
|
||||
import io.airbyte.cdk.load.write.LoadStrategy
|
||||
|
||||
@@ -88,3 +90,6 @@ interface ObjectLoader : LoadStrategy {
|
||||
val stateAfterUpload: BatchState
|
||||
get() = BatchState.COMPLETE
|
||||
}
|
||||
|
||||
fun DestinationConfiguration.metadataFor(stream: DestinationStream): Map<String, String> =
|
||||
mapOf(this.generationIdMetadataKey to stream.generationId.toString())
|
||||
|
||||
@@ -6,6 +6,7 @@ package io.airbyte.cdk.load.write.object_storage
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.command.DestinationStream
|
||||
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider
|
||||
import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfigurationProvider
|
||||
@@ -17,7 +18,6 @@ import io.airbyte.cdk.load.file.object_storage.RemoteObject
|
||||
import io.airbyte.cdk.load.message.Batch
|
||||
import io.airbyte.cdk.load.message.BatchEnvelope
|
||||
import io.airbyte.cdk.load.message.MultiProducerChannel
|
||||
import io.airbyte.cdk.load.message.object_storage.*
|
||||
import io.airbyte.cdk.load.state.DestinationStateManager
|
||||
import io.airbyte.cdk.load.state.StreamProcessingFailed
|
||||
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
|
||||
@@ -43,7 +43,8 @@ class ObjectStorageStreamLoaderFactory<T : RemoteObject<*>, U : OutputStream>(
|
||||
private val uploadConfigurationProvider: ObjectStorageUploadConfigurationProvider,
|
||||
private val destinationStateManager: DestinationStateManager<ObjectStorageDestinationState>,
|
||||
@Value("\${airbyte.destination.core.record-batch-size-override}")
|
||||
private val recordBatchSizeOverride: Long? = null
|
||||
private val recordBatchSizeOverride: Long? = null,
|
||||
private val destinationConfig: DestinationConfiguration,
|
||||
) {
|
||||
fun create(stream: DestinationStream): StreamLoader {
|
||||
return ObjectStorageStreamLoader(
|
||||
@@ -55,7 +56,8 @@ class ObjectStorageStreamLoaderFactory<T : RemoteObject<*>, U : OutputStream>(
|
||||
destinationStateManager,
|
||||
uploadConfigurationProvider.objectStorageUploadConfiguration.uploadPartSizeBytes,
|
||||
recordBatchSizeOverride
|
||||
?: uploadConfigurationProvider.objectStorageUploadConfiguration.fileSizeBytes
|
||||
?: uploadConfigurationProvider.objectStorageUploadConfiguration.fileSizeBytes,
|
||||
destinationConfig,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -73,10 +75,11 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
|
||||
private val destinationStateManager: DestinationStateManager<ObjectStorageDestinationState>,
|
||||
private val partSizeBytes: Long,
|
||||
private val fileSizeBytes: Long,
|
||||
destinationConfig: DestinationConfiguration,
|
||||
) : StreamLoader {
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
private val objectAccumulator = PartToObjectAccumulator(stream, client)
|
||||
private val objectAccumulator = PartToObjectAccumulator(stream, client, destinationConfig)
|
||||
|
||||
override suspend fun createBatchAccumulator(): BatchAccumulator {
|
||||
val state = destinationStateManager.getState(stream)
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
package io.airbyte.cdk.load.write.object_storage
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.command.DestinationStream
|
||||
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
|
||||
import io.airbyte.cdk.load.file.object_storage.PartBookkeeper
|
||||
@@ -14,7 +15,6 @@ import io.airbyte.cdk.load.message.Batch
|
||||
import io.airbyte.cdk.load.message.object_storage.IncompletePartialUpload
|
||||
import io.airbyte.cdk.load.message.object_storage.LoadablePart
|
||||
import io.airbyte.cdk.load.message.object_storage.LoadedObject
|
||||
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
|
||||
import io.airbyte.cdk.load.util.setOnce
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
@@ -25,6 +25,7 @@ import kotlinx.coroutines.CompletableDeferred
|
||||
class PartToObjectAccumulator<T : RemoteObject<*>>(
|
||||
private val stream: DestinationStream,
|
||||
private val client: ObjectStorageClient<T>,
|
||||
private val destinationConfig: DestinationConfiguration,
|
||||
) {
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
@@ -41,7 +42,7 @@ class PartToObjectAccumulator<T : RemoteObject<*>>(
|
||||
if (upload.hasStarted.setOnce()) {
|
||||
// Start the upload if we haven't already. Note that the `complete`
|
||||
// here refers to the completable deferred, not the streaming upload.
|
||||
val metadata = ObjectStorageDestinationState.metadataFor(stream)
|
||||
val metadata = destinationConfig.metadataFor(stream)
|
||||
val streamingUpload = client.startStreamingUpload(batch.part.key, metadata)
|
||||
upload.streamingUpload.complete(streamingUpload)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
package io.airbyte.cdk.load.state.object_storage
|
||||
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.command.DestinationStream
|
||||
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
|
||||
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
|
||||
@@ -25,6 +26,11 @@ class ObjectStorageDestinationStateUTest {
|
||||
data class MockObj(override val key: String, override val storageConfig: Unit = Unit) :
|
||||
RemoteObject<Unit>
|
||||
|
||||
private val destinationConfig =
|
||||
object : DestinationConfiguration() {
|
||||
override val generationIdMetadataKey = "test-ab-generation-id"
|
||||
}
|
||||
|
||||
@MockK lateinit var stream: DestinationStream
|
||||
@MockK lateinit var client: ObjectStorageClient<*>
|
||||
@MockK lateinit var pathFactory: ObjectStoragePathFactory
|
||||
@@ -59,7 +65,7 @@ class ObjectStorageDestinationStateUTest {
|
||||
PathMatcher(Regex("(dog|cat|turtle-1)$suffix"), mapOf("suffix" to 2))
|
||||
}
|
||||
|
||||
val persister = ObjectStorageFallbackPersister(client, pathFactory)
|
||||
val persister = ObjectStorageFallbackPersister(client, pathFactory, destinationConfig)
|
||||
val state = persister.load(stream)
|
||||
|
||||
assertEquals("dog-4", state.ensureUnique("dog"))
|
||||
@@ -97,7 +103,7 @@ class ObjectStorageDestinationStateUTest {
|
||||
)
|
||||
}
|
||||
|
||||
val persister = ObjectStorageFallbackPersister(client, pathFactory)
|
||||
val persister = ObjectStorageFallbackPersister(client, pathFactory, destinationConfig)
|
||||
val state = persister.load(stream)
|
||||
|
||||
assertEquals(2L, state.getPartIdCounter("dog/").get())
|
||||
@@ -140,10 +146,12 @@ class ObjectStorageDestinationStateUTest {
|
||||
coEvery { client.getMetadata(any()) } answers
|
||||
{
|
||||
val key = firstArg<String>()
|
||||
mapOf("ab-generation-id" to key.split("/").last())
|
||||
// Note that because we the generation ID metadata key, so the
|
||||
// "ab-generation-id" key is replaced with "test-ab-generation-id"
|
||||
mapOf("test-ab-generation-id" to key.split("/").last())
|
||||
}
|
||||
|
||||
val persister = ObjectStorageFallbackPersister(client, pathFactory)
|
||||
val persister = ObjectStorageFallbackPersister(client, pathFactory, destinationConfig)
|
||||
|
||||
val dogStream = mockk<DestinationStream>(relaxed = true)
|
||||
every { dogStream.descriptor } returns DestinationStream.Descriptor("test", "dog")
|
||||
|
||||
@@ -4,12 +4,12 @@
|
||||
|
||||
package io.airbyte.cdk.load.write.object_storage
|
||||
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.command.DestinationStream
|
||||
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
|
||||
import io.airbyte.cdk.load.file.object_storage.Part
|
||||
import io.airbyte.cdk.load.file.object_storage.StreamingUpload
|
||||
import io.airbyte.cdk.load.message.object_storage.LoadablePart
|
||||
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
|
||||
import io.mockk.coEvery
|
||||
import io.mockk.coVerify
|
||||
import io.mockk.mockk
|
||||
@@ -19,6 +19,7 @@ import org.junit.jupiter.api.Test
|
||||
|
||||
class PartToObjectAccumulatorTest {
|
||||
private val streamDescriptor = DestinationStream.Descriptor("test", "stream")
|
||||
private val destinationConfig = object : DestinationConfiguration() {}
|
||||
|
||||
private lateinit var stream: DestinationStream
|
||||
private lateinit var client: ObjectStorageClient<*>
|
||||
@@ -31,7 +32,7 @@ class PartToObjectAccumulatorTest {
|
||||
client = mockk(relaxed = true)
|
||||
streamingUpload = mockk(relaxed = true)
|
||||
coEvery { stream.descriptor } returns streamDescriptor
|
||||
metadata = ObjectStorageDestinationState.metadataFor(stream)
|
||||
metadata = destinationConfig.metadataFor(stream)
|
||||
coEvery { client.startStreamingUpload(any(), any()) } returns streamingUpload
|
||||
coEvery { streamingUpload.uploadPart(any(), any()) } returns Unit
|
||||
coEvery { streamingUpload.complete() } returns mockk(relaxed = true)
|
||||
@@ -59,7 +60,7 @@ class PartToObjectAccumulatorTest {
|
||||
|
||||
@Test
|
||||
fun `test part accumulation`() = runTest {
|
||||
val acc = PartToObjectAccumulator(stream, client)
|
||||
val acc = PartToObjectAccumulator(stream, client, destinationConfig)
|
||||
|
||||
// First part triggers starting the upload
|
||||
val firstPartFile1 = makePart(1, 1)
|
||||
|
||||
@@ -16,7 +16,7 @@ data:
|
||||
type: GSM
|
||||
connectorType: destination
|
||||
definitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
|
||||
dockerImageTag: 2.2.0
|
||||
dockerImageTag: 2.2.1
|
||||
dockerRepository: airbyte/destination-mssql
|
||||
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql
|
||||
githubIssueLabel: destination-mssql
|
||||
|
||||
@@ -6,6 +6,7 @@ package io.airbyte.integrations.destination.mssql.v2
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
|
||||
import io.airbyte.cdk.load.command.Dedupe
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.command.DestinationStream
|
||||
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration
|
||||
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider
|
||||
@@ -41,12 +42,14 @@ class MSSQLBulkLoadStreamLoader(
|
||||
private val azureBlobClient: AzureBlobClient,
|
||||
private val validateValuesPreLoad: Boolean,
|
||||
private val recordBatchSizeOverride: Long? = null,
|
||||
private val streamStateStore: StreamStateStore<MSSQLStreamState>
|
||||
private val streamStateStore: StreamStateStore<MSSQLStreamState>,
|
||||
destinationConfig: DestinationConfiguration,
|
||||
) : AbstractMSSQLStreamLoader(dataSource, stream, sqlBuilder) {
|
||||
|
||||
// Bulk-load related collaborators
|
||||
private val mssqlFormatFileCreator = MSSQLFormatFileCreator(dataSource, stream, azureBlobClient)
|
||||
private val objectAccumulator = PartToObjectAccumulator(stream, azureBlobClient)
|
||||
private val objectAccumulator =
|
||||
PartToObjectAccumulator(stream, azureBlobClient, destinationConfig)
|
||||
private val mssqlBulkLoadHandler =
|
||||
MSSQLBulkLoadHandler(
|
||||
dataSource,
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
package io.airbyte.integrations.destination.mssql.v2
|
||||
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.command.DestinationStream
|
||||
import io.airbyte.cdk.load.state.DestinationFailure
|
||||
import io.airbyte.cdk.load.write.DestinationWriter
|
||||
@@ -24,7 +25,8 @@ class MSSQLWriter(
|
||||
private val dataSourceFactory: MSSQLDataSourceFactory,
|
||||
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
|
||||
private val recordBatchSizeOverride: Long? = null,
|
||||
private val streamStateStore: StreamStateStore<MSSQLStreamState>
|
||||
private val streamStateStore: StreamStateStore<MSSQLStreamState>,
|
||||
private val destinationConfig: DestinationConfiguration,
|
||||
) : DestinationWriter {
|
||||
|
||||
/** Lazily initialized when [setup] is called. */
|
||||
@@ -53,7 +55,8 @@ class MSSQLWriter(
|
||||
AzureBlobStorageClientCreator.createAzureBlobClient(loadConfig),
|
||||
validateValuesPreLoad = loadConfig.validateValuesPreLoad ?: false,
|
||||
recordBatchSizeOverride = recordBatchSizeOverride,
|
||||
streamStateStore = streamStateStore
|
||||
streamStateStore = streamStateStore,
|
||||
destinationConfig,
|
||||
)
|
||||
}
|
||||
is InsertLoadTypeConfiguration -> {
|
||||
|
||||
@@ -27,6 +27,13 @@ data class MSSQLConfiguration(
|
||||
override val numProcessBatchWorkers: Int = 1
|
||||
override val processEmptyFiles: Boolean = true
|
||||
override val recordBatchSizeBytes = ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES
|
||||
|
||||
/**
|
||||
* Azure requires blob metadata keys to be alphanumeric+underscores, so replace the dashes with
|
||||
* underscores.
|
||||
*/
|
||||
override val generationIdMetadataKey: String
|
||||
get() = "ab_generation_id"
|
||||
}
|
||||
|
||||
@Singleton
|
||||
|
||||
@@ -158,6 +158,7 @@ See the [Getting Started: Configuration section](#configuration) of this guide f
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:-----------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
|
||||
| 2.2.1 | 2025-03-27 | [56402](https://github.com/airbytehq/airbyte/pull/56402) | Improve Azure blob storage load logic. |
|
||||
| 2.2.0 | 2025-03-23 | [56353](https://github.com/airbytehq/airbyte/pull/56353) | Bulk Load performance improvements |
|
||||
| 2.1.2 | 2025-03-25 | [56346](https://github.com/airbytehq/airbyte/pull/56346) | Internal refactor |
|
||||
| 2.1.1 | 2025-03-24 | [56355](https://github.com/airbytehq/airbyte/pull/56355) | Upgrade to airbyte/java-connector-base:2.0.1 to be M4 compatible. |
|
||||
|
||||
Reference in New Issue
Block a user