22 KiB
22 KiB
Dataflow CDK Coding Standards
Summary: Best practices for implementing Airbyte destination connectors. Covers Kotlin style, Micronaut DI patterns, async/coroutines, SQL generation, error handling, and common pitfalls. Follow these standards for maintainable, production-ready code.
Code Organization
Package Structure
destination-{db}/src/main/kotlin/io/airbyte/integrations/destination/{db}/
├── {DB}Destination.kt # Entry point (main())
├── {DB}BeanFactory.kt # Micronaut DI setup
├── check/{DB}Checker.kt # Connection validation
├── client/
│ ├── {DB}AirbyteClient.kt # Database operations
│ └── {DB}SqlGenerator.kt # SQL generation
├── config/
│ ├── {DB}NameGenerators.kt # Table/column naming
│ └── {DB}DirectLoadDatabaseInitialStatusGatherer.kt
├── dataflow/
│ ├── {DB}Aggregate.kt # Record accumulation
│ └── {DB}AggregateFactory.kt # Create aggregates
├── spec/
│ ├── {DB}Specification.kt # Config schema
│ └── {DB}Configuration.kt # Runtime config
└── write/
├── {DB}Writer.kt # Orchestration
└── load/{DB}InsertBuffer.kt # Batch writes
File Naming
Pattern: {DatabaseName}{ComponentType}.kt
Examples:
SnowflakeAirbyteClient.ktClickhouseSqlGenerator.ktPostgresInsertBuffer.kt
Avoid: Generic names like Client.kt, Generator.kt
Kotlin Style
Naming Conventions
// Classes: PascalCase with prefix
class SnowflakeAirbyteClient
class ClickhouseInsertBuffer
// Functions: camelCase, verb-based
fun createTable(...): String
fun toSnowflakeCompatibleName(): String
suspend fun flush()
// Variables: camelCase, descriptive
val snowflakeClient: SnowflakeAirbyteClient
private val columnNameMapping: ColumnNameMapping
// Constants: SCREAMING_SNAKE_CASE
internal const val DATA_SOURCE_CONNECTION_TIMEOUT_MS = 30000L
internal const val CSV_FIELD_SEPARATOR = ','
const val DEFAULT_FLUSH_LIMIT = 1000
// Companion object constants
companion object {
const val DATETIME_WITH_PRECISION = "DateTime64(3)"
const val PROTOCOL = "http"
}
Data Classes vs Regular Classes
// ✅ Data classes: Immutable config/value objects
data class SnowflakeConfiguration(
val host: String,
val database: String,
val schema: String,
) : DestinationConfiguration()
// ✅ Regular classes: Stateful components
@Singleton
class SnowflakeAirbyteClient(
private val dataSource: DataSource,
private val sqlGenerator: SqlGenerator,
) : TableOperationsClient {
// Methods with side effects
}
Extension Functions
// ✅ SQL logging
fun String.andLog(): String {
log.info { this.trim() }
return this
}
// ✅ Name compatibility
fun String.toSnowflakeCompatibleName(): String {
return this.uppercase().replace(Regex("[^A-Z0-9_]"), "_")
}
// ✅ Type utilities
fun String.sqlNullable(): String = "Nullable($this)"
fun ColumnType.typeDecl() = typeDecl(this.type, this.nullable)
// ✅ Quoting
fun String.quote() = "\"$this\"" // Snowflake
Dependency Injection (Micronaut)
Always Use Constructor Injection
// ✅ DO: Constructor injection
@Singleton
class SnowflakeAirbyteClient(
private val dataSource: DataSource,
private val sqlGenerator: SnowflakeDirectLoadSqlGenerator,
private val columnUtils: SnowflakeColumnUtils,
) : TableOperationsClient
// ❌ DON'T: Field injection
@Singleton
class BadExample {
@Inject lateinit var dataSource: DataSource // Avoid!
}
BeanFactory Pattern
@Factory
class SnowflakeBeanFactory {
@Singleton
fun snowflakeConfiguration(
configFactory: SnowflakeConfigurationFactory,
specFactory: SnowflakeMigratingConfigurationSpecificationSupplier,
): SnowflakeConfiguration {
val spec = specFactory.get()
return configFactory.makeWithoutExceptionHandling(spec)
}
@Singleton
@Requires(property = Operation.PROPERTY, notEquals = "spec")
fun snowflakeDataSource(
config: SnowflakeConfiguration,
): HikariDataSource {
return HikariDataSource().apply {
jdbcUrl = buildConnectionString(config)
username = config.username
// ... complex setup
}
}
@Singleton
@Requires(property = Operation.PROPERTY, value = "spec")
fun emptyDataSource(): DataSource {
return object : DataSource {
override fun getConnection(): Connection? = null
}
}
}
Conditional Beans
// ✅ Operation-specific beans
@Singleton
@Requires(property = Operation.PROPERTY, value = "spec")
fun specDataSource(): DataSource {
// Only for --spec operation
}
@Singleton
@Requires(property = Operation.PROPERTY, notEquals = "spec")
fun realDataSource(config: MyConfiguration): HikariDataSource {
// For --check and --write operations
}
lateinit var Usage
// ✅ For lifecycle-initialized state
@Singleton
class SnowflakeWriter(...) : DestinationWriter {
private lateinit var initialStatuses: Map<DestinationStream, DirectLoadInitialStatus>
override suspend fun setup() {
initialStatuses = stateGatherer.gatherInitialStatus(names)
}
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
val status = initialStatuses[stream]!! // Guaranteed after setup()
// ...
}
}
// ✅ For optional state
class InsertBuffer {
private var csvWriter: CsvWriter? = null
private var csvFilePath: Path? = null
}
// ❌ Don't use with nullables or primitives
private lateinit var config: Config? // Compile error!
private lateinit var count: Int // Compile error!
Async & Coroutines
suspend fun Usage
// ✅ Mark I/O operations as suspend
override suspend fun countTable(tableName: TableName): Long? { ... }
override suspend fun createNamespace(namespace: String) { ... }
override suspend fun flush() { ... }
// ✅ Propagate suspend through call chain
override suspend fun flush() {
buffer.flush() // Also suspend
}
// ❌ DON'T block in suspend functions
suspend fun bad() {
Thread.sleep(1000) // Blocks thread!
future.get() // Blocks thread!
}
// ✅ DO use suspend functions
suspend fun good() {
delay(1000) // Suspends without blocking
future.await() // Suspends without blocking
}
runBlocking Usage
// ✅ Use only at non-suspend boundaries
@Singleton
class SnowflakeChecker(...) : DestinationCheckerV2 {
override fun check() { // Can't be suspend (interface constraint)
runBlocking {
try {
client.createNamespace(namespace)
client.createTable(...)
} finally {
client.dropTable(tableName)
}
}
}
}
// ❌ DON'T use inside suspend functions
suspend fun bad() {
runBlocking { // Creates new scope - wrong!
client.execute(query)
}
}
Convert Java Futures
import kotlinx.coroutines.future.await
// ✅ Use .await() for CompletableFuture
internal suspend fun execute(query: String): CommandResponse {
return client.execute(query).await()
}
suspend fun flush() {
val result = clickhouseClient
.insert(tableName, data)
.await() // Not .get()!
}
Error Handling in Async
// ✅ try-finally for cleanup
suspend fun flush() {
csvFilePath?.let { filePath ->
try {
csvWriter?.flush()
csvWriter?.close()
client.putInStage(tableName, filePath.pathString)
client.copyFromStage(tableName, filePath.fileName.toString())
} catch (e: Exception) {
logger.error(e) { "Unable to flush accumulated data." }
throw e
} finally {
// Always cleanup
filePath.deleteIfExists()
csvWriter = null
csvFilePath = null
recordCount = 0
}
}
}
SQL Generation Patterns
Separation of Concerns
// ✅ SqlGenerator: Pure SQL strings, no execution
@Singleton
class SnowflakeDirectLoadSqlGenerator(
private val columnUtils: SnowflakeColumnUtils,
private val config: SnowflakeConfiguration,
) {
fun createTable(...): String { ... }
fun upsertTable(...): String { ... }
fun dropTable(tableName: TableName): String {
return "DROP TABLE IF EXISTS ${fullyQualifiedName(tableName)}"
}
}
// ✅ Client: Execution only, delegates SQL generation
@Singleton
class SnowflakeAirbyteClient(
private val dataSource: DataSource,
private val sqlGenerator: SnowflakeDirectLoadSqlGenerator,
) : TableOperationsClient {
override suspend fun dropTable(tableName: TableName) {
execute(sqlGenerator.dropTable(tableName))
}
private suspend fun execute(sql: String) {
dataSource.connection.use { conn ->
conn.createStatement().executeQuery(sql)
}
}
}
Always Log SQL
private val log = KotlinLogging.logger {}
// ✅ Extension function pattern
fun String.andLog(): String {
log.info { this.trim() }
return this
}
// Usage
fun createTable(...): String {
return """
CREATE TABLE ${fullyQualifiedName(tableName)} (
${columnDeclarations}
)
""".trimIndent().andLog()
}
Quoting and Escaping
// Snowflake: double quotes
internal const val QUOTE = "\""
fun String.quote() = "$QUOTE$this$QUOTE"
"${columnName.quote()}" // "column_name"
// ClickHouse: backticks
"`${tableName.namespace}`.`${tableName.name}`"
// Escape special characters
fun escapeJsonIdentifier(identifier: String): String {
return identifier.replace(QUOTE, "$QUOTE$QUOTE")
}
Multi-line SQL
// ✅ Use trimIndent()
fun createTable(...): String {
return """
CREATE TABLE `${tableName.namespace}`.`${tableName.name}` (
$COLUMN_NAME_AB_RAW_ID String NOT NULL,
$COLUMN_NAME_AB_EXTRACTED_AT DateTime64(3) NOT NULL,
${columnDeclarations}
)
ENGINE = ${engine}
ORDER BY (${orderByColumns})
""".trimIndent().andLog()
}
// ✅ Use StringBuilder for complex statements
fun alterTable(changeset: ColumnChangeset, tableName: TableName): String {
val builder = StringBuilder()
.append("ALTER TABLE `${tableName.namespace}`.`${tableName.name}`")
.appendLine()
changeset.columnsToAdd.forEach { (columnName, columnType) ->
builder.append(" ADD COLUMN `$columnName` ${columnType.typeDecl()},")
}
changeset.columnsToChange.forEach { (columnName, typeChange) ->
builder.append(" MODIFY COLUMN `$columnName` ${typeChange.newType.typeDecl()},")
}
return builder.dropLast(1).toString().andLog() // Remove trailing comma
}
Error Handling
Exception Types
// ✅ ConfigErrorException: User-fixable
throw ConfigErrorException(
"Permission denied: Cannot CREATE tables in schema '$schema'. " +
"Grant CREATE privileges to your role.",
cause
)
throw ConfigErrorException(
"Table '$tableName' exists but lacks Airbyte internal columns. " +
"Delete the table or use a different table prefix."
)
// ✅ TransientErrorException: Retryable
throw TransientErrorException(
"Network timeout connecting to database. Will retry automatically.",
cause
)
// ✅ SystemErrorException: Internal errors
throw SystemErrorException(
"Cannot execute hybrid refresh - unsupported sync mode"
)
Wrap Database Exceptions
private fun handleSnowflakePermissionError(e: SnowflakeSQLException): Nothing {
val errorMessage = e.message?.lowercase() ?: ""
when {
errorMessage.contains("current role has no privileges") -> {
throw ConfigErrorException(
"Permission denied. Grant privileges: " +
"GRANT CREATE, DROP, ALTER ON SCHEMA ${schema} TO ROLE ${role};",
e
)
}
errorMessage.contains("insufficient privileges") -> {
throw ConfigErrorException(
"Insufficient privileges. Contact your database administrator.",
e
)
}
else -> throw e // System error
}
}
// Usage
override suspend fun createNamespace(namespace: String) {
try {
execute(sqlGenerator.createNamespace(namespace))
} catch (e: SnowflakeSQLException) {
handleSnowflakePermissionError(e)
}
}
Return Null for Expected Missing Data
// ✅ Return null when table doesn't exist (expected)
override suspend fun countTable(tableName: TableName): Long? =
try {
dataSource.connection.use { connection ->
val statement = connection.createStatement()
statement.use {
val resultSet = it.executeQuery(sqlGenerator.countTable(tableName))
if (resultSet.next()) resultSet.getLong("count") else 0L
}
}
} catch (e: SQLException) {
log.debug(e) {
"Table ${tableName.toPrettyString()} does not exist. Returning null."
}
null // Expected - not an error
}
Actionable Error Messages
// ✅ DO: Specific and actionable
throw ConfigErrorException(
"Permission denied: Cannot CREATE tables in schema 'public'.\n\n" +
"Required permission:\n" +
" GRANT CREATE ON SCHEMA public TO ROLE your_role;\n\n" +
"Alternatively:\n" +
" 1. Use a different schema where you have permissions\n" +
" 2. Contact your database administrator\n\n" +
"Current user: ${config.username}\n" +
"Current role: ${config.role}",
e
)
// ❌ DON'T: Vague
throw ConfigErrorException("Access denied", e)
throw ConfigErrorException(e.message, e) // Raw database message
Logging Patterns
Log Levels
private val log = KotlinLogging.logger {}
// info: Normal operations
log.info { "Beginning insert into ${tableName.toPrettyString()}..." }
log.info { "Finished insert of $recordCount row(s)..." }
// warn: Unexpected but recoverable
log.warn { "CSV file path is not set: nothing to upload to staging." }
// error: Errors that will fail the operation
log.error(e) { "Unable to flush accumulated data." }
// debug: Detailed diagnostics
log.debug(e) { "Table does not exist. Returning null..." }
Structured Logging
// ✅ Use lambda syntax for lazy evaluation
log.info { "Expensive computation: ${computeExpensiveValue()}" }
// Only called if INFO enabled
// ❌ DON'T: Eager evaluation
log.info("Expensive computation: ${computeExpensiveValue()}")
// Always called, even if INFO disabled
// ✅ Include context
log.info {
"overwriteTable: source=${source.toPrettyString()}, " +
"target=${target.toPrettyString()}, targetExists=$targetExists"
}
log.error(e) {
"Failed to execute query on table ${tableName.toPrettyString()}. " +
"Operation: $operation, SQL State: ${e.sqlState}"
}
Resource Management
JDBC Resources
// ✅ Use .use {} for automatic closing
override suspend fun countTable(tableName: TableName): Long? =
dataSource.connection.use { connection ->
val statement = connection.createStatement()
statement.use {
val resultSet = it.executeQuery(sqlGenerator.countTable(tableName))
if (resultSet.next()) resultSet.getLong("count") else 0L
}
}
// ❌ DON'T: Manual close
val connection = dataSource.connection
try {
// use connection
} finally {
connection.close() // Might not be called if exception in try
}
File Cleanup
// ✅ Use deleteOnExit() for temp files
val tempFile = File.createTempFile("prefix", ".tmp")
tempFile.deleteOnExit()
tempFile.writeText(data)
// ✅ Use deleteIfExists() in finally
suspend fun flush() {
csvFilePath?.let { filePath ->
try {
csvWriter?.flush()
csvWriter?.close()
client.putInStage(tableName, filePath.pathString)
} finally {
filePath.deleteIfExists()
csvWriter = null
csvFilePath = null
}
}
}
Common Gotchas
1. Thread Safety
// ❌ Mutable state in singleton without synchronization
@Singleton
class Bad {
private var counter = 0 // Race condition!
fun increment() { counter++ }
}
// ✅ Use lateinit only for lifecycle initialization (set once)
@Singleton
class Good {
private lateinit var initialStatuses: Map<...> // Set once in setup()
}
2. Micronaut Pitfalls
// ❌ Forgetting @Singleton
class MyService { // Won't be managed by DI!
}
// ❌ Operation beans without @Requires
@Singleton
fun specDataSource(): DataSource {
// Created for ALL operations!
}
// ✅ Use @Requires
@Singleton
@Requires(property = Operation.PROPERTY, value = "spec")
fun specDataSource(): DataSource {
// Only for spec operation
}
3. SQL Injection & Batching
// ❌ Not batching inserts
records.forEach { insertRecord(it) } // One at a time!
// ✅ Use buffering
class InsertBuffer(flushLimit: Int = 1000) {
private val buffer = mutableListOf<Record>()
fun accumulate(record: Record) {
buffer.add(record)
if (buffer.size >= flushLimit) {
runBlocking { flush() }
}
}
suspend fun flush() {
writeBatchToDatabase(buffer)
buffer.clear()
}
}
4. Resource Leaks
// ❌ Not closing resources
val connection = dataSource.connection
// use connection
// Forgot to close!
// ✅ Use .use {}
dataSource.connection.use { connection ->
// Automatically closed even if exception
}
5. Async Pitfalls
// ❌ Blocking in suspend functions
suspend fun bad() {
Thread.sleep(1000) // Blocks!
future.get() // Blocks!
}
// ✅ Use suspend functions
suspend fun good() {
delay(1000) // Suspends
future.await() // Suspends
}
// ❌ Not propagating suspend
class Bad {
fun flush() { // Not suspend
runBlocking { client.execute() } // Creates new scope!
}
}
// ✅ Keep suspend throughout
class Good {
suspend fun flush() { // Propagate suspend
client.execute()
}
}
6. CDK-Specific Gotchas
// ❌ Not using TableCatalog for column mapping
val mappedColumn = columnName.toUpperCase() // Manual!
// ✅ Use TableCatalog
val mappedColumn = columnNameMapping[columnName]!!
// ❌ Creating tables in Writer.setup()
override suspend fun setup() {
client.createTable(...) // StreamLoader does this!
}
// ✅ Only create namespaces in setup()
override suspend fun setup() {
namespaces.forEach { client.createNamespace(it) }
}
// ❌ Finalizing in InsertBuffer.flush()
suspend fun flush() {
writeToDatabase()
client.upsertTable(temp, final) // DON'T - StreamLoader does this!
}
// ✅ Just write records
suspend fun flush() {
writeToDatabase() // That's it!
}
7. Null Safety
// ❌ Unnecessary !! operator
val value = map[key]!! // Throws if missing
// ✅ Handle null explicitly
val value = map[key] ?: throw IllegalStateException("Key not found: $key")
val value = map[key]?.let { process(it) } // Safe call
// ❌ lateinit with wrong types
private lateinit var config: Config? // Compile error!
private lateinit var count: Int // Compile error!
// ✅ Use nullable var or non-nullable
private var config: Config? = null
private var count: Int = 0
Testing Patterns
Test Structure
internal class SnowflakeDirectLoadSqlGeneratorTest {
private lateinit var sqlGenerator: SnowflakeDirectLoadSqlGenerator
private val uuidGenerator: UUIDGenerator = mockk()
@BeforeEach
fun setUp() {
sqlGenerator = SnowflakeDirectLoadSqlGenerator(
columnUtils,
uuidGenerator,
config,
nameUtils,
)
}
@Test
fun testGenerateCountTableQuery() {
val sql = sqlGenerator.countTable(tableName)
assertTrue(sql.contains("SELECT COUNT(*)"))
assertTrue(sql.contains(tableName.name))
}
}
Test Naming
// ✅ Descriptive test names
@Test
fun testGenerateCountTableQuery() { ... }
@Test
fun testGenerateCreateTableStatement() { ... }
// ✅ Backtick format for readability
@Test
fun `test extractPks with single primary key`() { ... }
@Test
fun `test extractPks with multiple primary keys`() { ... }
Test Cleanup
// ✅ Use try-finally
override fun check() {
runBlocking {
try {
client.createNamespace(namespace)
client.createTable(table, ...)
val count = client.countTable(table)
require(count == 1L)
} finally {
client.dropTable(table)
}
}
}
Style Summary
Naming:
- Classes: PascalCase with prefix (
SnowflakeAirbyteClient) - Functions: camelCase, verbs (
createTable,flush) - Variables: camelCase, descriptive (
columnNameMapping) - Constants: SCREAMING_SNAKE_CASE (
DEFAULT_FLUSH_LIMIT)
DI:
- Always constructor injection
- Use
@Factoryfor complex setup - Use
@Requiresfor conditional beans - Use
lateinit varonly for lifecycle state
Async:
- Mark I/O as
suspend - Use
.use {}for resources - Use
.await()for futures - Propagate
suspendthrough call chain
SQL:
- Separate generation from execution
- Always log SQL (
.andLog()) - Use
.trimIndent()for multi-line - Quote all identifiers
Error:
ConfigErrorExceptionfor user errorsTransientErrorExceptionfor retryableSystemErrorExceptionfor internal- Return
nullfor expected missing data
Logging:
- Use lambda syntax (lazy evaluation)
- Include context (table names, operations)
- Appropriate levels (info/warn/error/debug)
Resources:
.use {}for JDBC/closeabledeleteOnExit()andfinallyfor files- Clean up in
finallyblocks
Quick Reference: Most Common Mistakes
- ❌ Forgetting
@Singletonon service classes - ❌ Not logging generated SQL
- ❌ Blocking in suspend functions (
Thread.sleep,.get()) - ❌ Not using
.use {}for resource cleanup - ❌ Missing
.trimIndent()on SQL strings - ❌ Using
!!without proving non-null - ❌ Not batching database operations
- ❌ Creating tables in
Writer.setup()(StreamLoader does this) - ❌ Finalizing in
InsertBuffer.flush()(StreamLoader does this) - ❌ Not using
TableCatalogfor column mapping