1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Resource config for the CDK (#64885)

This commit is contained in:
Benoit Moriceau
2025-08-12 13:09:30 -07:00
committed by GitHub
parent c2ca2e4084
commit 94dceae348
6 changed files with 85 additions and 9 deletions

View File

@@ -6,7 +6,7 @@ import javax.xml.xpath.XPathFactory
import org.w3c.dom.Document
allprojects {
version = "0.1.12"
version = "0.1.13"
apply plugin: 'java-library'
apply plugin: 'maven-publish'

View File

@@ -1,3 +1,9 @@
## Version 0.1.13
**Load CDK**
* **Changed:** Make the resources being used by the dataflow CDK configurable.
## Version 0.1.12
**Load CDK**

View File

@@ -4,6 +4,7 @@
package io.airbyte.cdk.load.dataflow
import io.airbyte.cdk.load.dataflow.config.MemoryAndParallelismConfig
import io.airbyte.cdk.load.dataflow.stages.AggregateStage
import jakarta.inject.Named
import jakarta.inject.Singleton
@@ -23,13 +24,14 @@ class DataFlowPipeline(
@Named("state") private val state: DataFlowStage,
private val startHandler: PipelineStartHandler,
private val completionHandler: PipelineCompletionHandler,
private val memoryAndParallelismConfig: MemoryAndParallelismConfig,
) {
suspend fun run() {
input
.onStart { startHandler.run() }
.map(parse::apply)
.transform { aggregate.apply(it, this) }
.buffer(capacity = 5)
.buffer(capacity = memoryAndParallelismConfig.maxBufferedAggregates)
.map(flush::apply)
.map(state::apply)
.onCompletion { completionHandler.apply(it) }

View File

@@ -5,10 +5,14 @@
package io.airbyte.cdk.load.dataflow
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.dataflow.config.MemoryAndParallelismConfig
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
@@ -18,6 +22,7 @@ class DestinationLifecycle(
private val destinationInitializer: DestinationWriter,
private val destinationCatalog: DestinationCatalog,
private val pipeline: DataFlowPipeline,
private val memoryAndParallelismConfig: MemoryAndParallelismConfig,
) {
private val log = KotlinLogging.logger {}
@@ -46,12 +51,18 @@ class DestinationLifecycle(
}
}
@OptIn(ExperimentalCoroutinesApi::class)
private fun initializeIndividualStreams(): List<StreamLoader> {
val initDispatcher: CoroutineDispatcher =
Dispatchers.Default.limitedParallelism(
memoryAndParallelismConfig.maxConcurrentLifecycleOperations
)
return runBlocking {
val result = mutableListOf<StreamLoader>()
destinationCatalog.streams
.map {
async {
async(initDispatcher) {
log.info {
"Starting stream loader for stream ${it.mappedDescriptor.namespace}:${it.mappedDescriptor.name}"
}
@@ -69,11 +80,17 @@ class DestinationLifecycle(
}
}
@OptIn(ExperimentalCoroutinesApi::class)
private fun finalizeIndividualStreams(streamLoaders: List<StreamLoader>) {
val finalizeDispatcher: CoroutineDispatcher =
Dispatchers.Default.limitedParallelism(
memoryAndParallelismConfig.maxConcurrentLifecycleOperations
)
runBlocking {
streamLoaders
.map {
async {
async(finalizeDispatcher) {
log.info {
"Finalizing stream ${it.stream.mappedDescriptor.namespace}:${it.stream.mappedDescriptor.name}"
}

View File

@@ -6,6 +6,7 @@ package io.airbyte.cdk.load.dataflow.aggregate
import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.dataflow.config.MemoryAndParallelismConfig
import io.airbyte.cdk.load.dataflow.state.PartitionHistogram
import io.airbyte.cdk.load.dataflow.transform.RecordDTO
import io.github.oshai.kotlinlogging.KotlinLogging
@@ -17,14 +18,15 @@ typealias StoreKey = DestinationStream.Descriptor
@Singleton
class AggregateStore(
private val aggFactory: AggregateFactory,
private val memoryAndParallelismConfig: MemoryAndParallelismConfig,
) {
private val log = KotlinLogging.logger {}
// TODO: Inject
private val maxConcurrentAggregates = 5L
private val stalenessDeadlinePerAggMs = 5L * 60000
private val maxRecordsPerAgg = 100_000L
private val maxEstBytesPerAgg = 70_000_000L
private val maxConcurrentAggregates = memoryAndParallelismConfig.maxOpenAggregates
private val stalenessDeadlinePerAggMs =
memoryAndParallelismConfig.stalenessDeadlinePerAgg.inWholeMilliseconds
private val maxRecordsPerAgg = memoryAndParallelismConfig.maxRecordsPerAgg
private val maxEstBytesPerAgg = memoryAndParallelismConfig.maxEstBytesPerAgg
private val aggregates = ConcurrentHashMap<StoreKey, AggregateEntry>()

View File

@@ -0,0 +1,49 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.dataflow.config
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
/**
* This class configures the parallelism and the memory consumption for the dataflow job.
* - maxConcurrentAggregates configures the number of ongoing aggregates.
* - maxBufferedFlushes configures the number of aggregates being buffered if another flush is in
* progress.
* - maxEstBytesPerAgg configures the estimated size of each aggregate.
* - The max memory consumption is (maxEstBytesPerAgg * maxConcurrentAggregates) +
* (maxEstBytesPerAgg * 2). Example with default values: (70,000,000 * 5) + (70,000,000 * 2) =
* 350,000,000 + 140,000,000 = 490,000,000 bytes (approx 0.49 GB).
* - stalenessDeadlinePerAggMs is how long we will wait to flush an aggregate if it is not
* fulfilling the requirement of entry count or max memory.
* - maxRecordsPerAgg configures the max number of records in an aggregate.
* - initConcurrentOperation configures the concurrency in the init phase
*/
data class MemoryAndParallelismConfig(
val maxOpenAggregates: Int = 5,
val maxBufferedAggregates: Int = 5,
val stalenessDeadlinePerAgg: Duration = 5.minutes,
val maxRecordsPerAgg: Long = 100_000L,
val maxEstBytesPerAgg: Long = 70_000_000L,
val maxConcurrentLifecycleOperations: Int = 10,
) {
init {
require(maxOpenAggregates > 0) { "maxOpenAggregates must be greater than 0" }
require(maxBufferedAggregates > 0) { "maxBufferedFlushes must be greater than 0" }
require(maxRecordsPerAgg > 0) { "maxRecordsPerAgg must be greater than 0" }
require(maxEstBytesPerAgg > 0) { "maxEstBytesPerAgg must be greater than 0" }
require(maxConcurrentLifecycleOperations > 0) {
"maxConcurrentLifecycleOperations must be greater than 0"
}
}
}
@Factory
class MemoryAndParallelismConfigFactory {
@Singleton @Secondary fun getMemoryAndParallelismConfig() = MemoryAndParallelismConfig()
}