diff --git a/connector-writer/destination/step-by-step/0-introduction.md b/connector-writer/destination/step-by-step/0-introduction.md index e4fffc855d2..1dd95c22c9e 100644 --- a/connector-writer/destination/step-by-step/0-introduction.md +++ b/connector-writer/destination/step-by-step/0-introduction.md @@ -96,15 +96,15 @@ ## Milestone Summary -| Guide | Phases | What Works | Lines | Time | Prerequisites | -|-------|--------|------------|-------|------|---------------| -| **1-getting-started.md** | Setup 1-2 | --spec | ~626 | 4h | None | -| **2-database-setup.md** | Database 1-2 | --check | ~1180 | 6h | Guide 1 | -| **3-write-infrastructure.md** | Infrastructure 1-2 | DI ready | ~600 | 4h | Guide 2 | -| **4-write-operations.md** | Write 1-4 | --write (append, overwrite) | ~780 | 8h | Guide 3 | -| **5-advanced-features.md** | Advanced 1-4 | All features | ~900 | 12h | Guide 4 | -| **6-testing.md** | Testing 1 | All tests pass | ~730 | 2h | Guide 5 | -| **7-troubleshooting.md** | Reference | Debug help | ~280 | As needed | Any | +| Guide | Phases | What Works | Tests | Prerequisites | +|-------|--------|------------|-------|---------------| +| **1-getting-started.md** | Setup 1-2 | --spec | SpecTest | None | +| **2-database-setup.md** | Database 1-2 | --check | TableOperationsSuite, CheckTest | Guide 1 | +| **3-write-infrastructure.md** | Infrastructure 1-3 | DI ready | WriteInitTest | Guide 2 | +| **4-write-operations.md** | Write 1-4 | --write (append, overwrite) | ConnectorWiringSuite | Guide 3 | +| **5-advanced-features.md** | Advanced 1-3 | All features | TableSchemaEvolutionSuite | Guide 4 | +| **6-testing.md** | Testing 1 | All tests pass | BasicFunctionalityIntegrationTest | Guide 5 | +| **7-troubleshooting.md** | Reference | Debug help | - | Any | --- @@ -122,6 +122,7 @@ - ✅ `--check` operation validates configuration ### After Guide 3 (Write Infrastructure) +- ✅ TableSchemaMapper (unified schema transformation) - ✅ Name generators (table, column, temp table) - ✅ TableCatalog DI setup - ✅ Write operation entry point @@ -166,6 +167,7 @@ - Component vs integration tests ### Guide 3: Write Infrastructure +- TableSchemaMapper (unified schema transformation) - Name generators and column mapping - StreamStateStore pattern - Test contexts (component vs integration vs basic functionality) diff --git a/connector-writer/destination/step-by-step/1-getting-started.md b/connector-writer/destination/step-by-step/1-getting-started.md index 9c446aa9740..5411c28fb7d 100644 --- a/connector-writer/destination/step-by-step/1-getting-started.md +++ b/connector-writer/destination/step-by-step/1-getting-started.md @@ -329,7 +329,7 @@ package io.airbyte.integrations.destination.{db}.spec import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.annotation.JsonPropertyDescription import io.airbyte.cdk.command.ConfigurationSpecification -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton @Singleton open class {DB}Specification : ConfigurationSpecification() { @@ -372,7 +372,7 @@ package io.airbyte.integrations.destination.{db}.spec import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfigurationFactory -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton // Runtime configuration (used by your code) data class {DB}Configuration( @@ -422,7 +422,7 @@ package io.airbyte.integrations.destination.{db}.spec import io.airbyte.cdk.load.spec.DestinationSpecificationExtension import io.airbyte.protocol.models.v0.DestinationSyncMode -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton @Singleton class {DB}SpecificationExtension : DestinationSpecificationExtension { diff --git a/connector-writer/destination/step-by-step/2-database-setup.md b/connector-writer/destination/step-by-step/2-database-setup.md index 8884b7914eb..0175aa1e799 100644 --- a/connector-writer/destination/step-by-step/2-database-setup.md +++ b/connector-writer/destination/step-by-step/2-database-setup.md @@ -56,10 +56,11 @@ This file contains two phases: package io.airbyte.integrations.destination.{db} import io.airbyte.cdk.Operation +import io.airbyte.cdk.command.ConfigurationSpecificationSupplier import io.airbyte.integrations.destination.{db}.spec.* import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Requires -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton import javax.sql.DataSource import com.zaxxer.hikari.HikariDataSource @@ -69,7 +70,7 @@ class {DB}BeanFactory { @Singleton fun configuration( configFactory: {DB}ConfigurationFactory, - specFactory: MigratingConfigurationSpecificationSupplier<{DB}Specification>, + specFactory: ConfigurationSpecificationSupplier<{DB}Specification>, ): {DB}Configuration { val spec = specFactory.get() return configFactory.makeWithoutExceptionHandling(spec) @@ -151,12 +152,11 @@ dependencies { ```kotlin package io.airbyte.integrations.destination.{db}.component -import io.airbyte.cdk.command.MigratingConfigurationSpecificationSupplier import io.airbyte.integrations.destination.{db}.spec.* import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Requires -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton import org.testcontainers.containers.{DB}Container // e.g., PostgreSQLContainer @Factory @@ -199,32 +199,6 @@ class {DB}TestConfigFactory { password = container.password, ) } - - @Singleton - @Primary - fun testSpecSupplier( - config: {DB}Configuration - ): MigratingConfigurationSpecificationSupplier<{DB}Specification> { - return object : MigratingConfigurationSpecificationSupplier<{DB}Specification> { - override fun get() = {DB}Specification() - } - } -} -``` - -**Alternative: Environment variables** (for local development with existing database) - -```kotlin -@Singleton -@Primary -fun testConfig(): {DB}Configuration { - return {DB}Configuration( - hostname = System.getenv("DB_HOSTNAME") ?: "localhost", - port = System.getenv("DB_PORT")?.toInt() ?: 5432, - database = System.getenv("DB_DATABASE") ?: "test", - username = System.getenv("DB_USERNAME") ?: "test", - password = System.getenv("DB_PASSWORD") ?: "test", - ) } ``` @@ -235,6 +209,79 @@ fun testConfig(): {DB}Configuration { - ✅ Automatic cleanup - ✅ No manual database installation +#### Part D: Testing Without Testcontainers + +**Use this approach when:** +- No Testcontainers module exists for your database (Snowflake, BigQuery, Databricks) +- Testing against a cloud-hosted or managed database +- Testcontainers doesn't work in your environment + +**Prerequisites:** + +Before running tests, `secrets/config.json` must exist with valid database credentials. + +**File:** `destination-{db}/secrets/config.json` + +```json +{ + "hostname": "your-database-host.example.com", + "port": 5432, + "database": "your_database", + "username": "your_username", + "password": "your_password" +} +``` + +⚠️ This file is gitignored - never commit credentials. + +**TestConfigFactory (reads from secrets file):** + +**File:** `src/test-integration/kotlin/.../component/{DB}TestConfigFactory.kt` + +```kotlin +package io.airbyte.integrations.destination.{db}.component + +import io.airbyte.cdk.load.component.config.TestConfigLoader.loadTestConfig +import io.airbyte.integrations.destination.{db}.spec.* +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Primary +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton + +@Factory +@Requires(env = ["component"]) +class {DB}TestConfigFactory { + + @Singleton + @Primary + fun testConfig(): {DB}Configuration { + return loadTestConfig( + {DB}Specification::class.java, + {DB}ConfigurationFactory::class.java, + "test-instance.json", // or "config.json" in secrets/ + ) + } +} +``` + +**Alternative: Environment variables** (for CI or when you prefer not to use files) + +Replace `testConfig()` with: + +```kotlin +@Singleton +@Primary +fun testConfig(): {DB}Configuration { + return {DB}Configuration( + hostname = System.getenv("DB_HOSTNAME") ?: error("DB_HOSTNAME not set"), + port = System.getenv("DB_PORT")?.toInt() ?: error("DB_PORT not set"), + database = System.getenv("DB_DATABASE") ?: error("DB_DATABASE not set"), + username = System.getenv("DB_USERNAME") ?: error("DB_USERNAME not set"), + password = System.getenv("DB_PASSWORD") ?: error("DB_PASSWORD not set"), + ) +} +``` + **Validate infrastructure setup:** ```bash $ ./gradlew :destination-{db}:compileKotlin @@ -252,7 +299,7 @@ Expected: BUILD SUCCESSFUL package io.airbyte.integrations.destination.{db}.client import io.airbyte.cdk.load.data.* -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton @Singleton class {DB}ColumnUtils { @@ -311,9 +358,9 @@ package io.airbyte.integrations.destination.{db}.client import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.table.ColumnNameMapping -import io.airbyte.cdk.load.table.TableName +import io.airbyte.cdk.load.schema.model.TableName import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton private val log = KotlinLogging.logger {} @@ -470,10 +517,10 @@ import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.component.TableOperationsClient import io.airbyte.cdk.load.component.TableSchemaEvolutionClient import io.airbyte.cdk.load.table.ColumnNameMapping -import io.airbyte.cdk.load.table.TableName +import io.airbyte.cdk.load.schema.model.TableName import io.airbyte.integrations.destination.{db}.spec.{DB}Configuration import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton import java.sql.SQLException import javax.sql.DataSource @@ -651,9 +698,9 @@ package io.airbyte.integrations.destination.{db}.component import io.airbyte.cdk.load.component.TestTableOperationsClient import io.airbyte.cdk.load.data.* -import io.airbyte.cdk.load.table.TableName +import io.airbyte.cdk.load.schema.model.TableName import io.micronaut.context.annotation.Requires -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton import java.sql.Date import java.sql.PreparedStatement import java.sql.Timestamp @@ -991,10 +1038,10 @@ import io.airbyte.cdk.load.check.DestinationCheckerV2 import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.* import io.airbyte.cdk.load.table.ColumnNameMapping -import io.airbyte.cdk.load.table.TableName +import io.airbyte.cdk.load.schema.model.TableName import io.airbyte.integrations.destination.{db}.client.{DB}AirbyteClient import io.airbyte.integrations.destination.{db}.spec.{DB}Configuration -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton import kotlinx.coroutines.runBlocking import java.util.UUID diff --git a/connector-writer/destination/step-by-step/3-write-infrastructure.md b/connector-writer/destination/step-by-step/3-write-infrastructure.md index 983fd91ba5f..c7f001e1a87 100644 --- a/connector-writer/destination/step-by-step/3-write-infrastructure.md +++ b/connector-writer/destination/step-by-step/3-write-infrastructure.md @@ -5,6 +5,7 @@ ## What You'll Build After completing this guide, you'll have: +- TableSchemaMapper (unified schema transformation) - Name generators (table, column, temp) - TableCatalog DI setup - Write operation entry point @@ -12,195 +13,269 @@ After completing this guide, you'll have: --- -## Infrastructure Phase 1: Name Generators & TableCatalog DI +## Infrastructure Phase 1: TableSchemaMapper + +**Goal:** Define how Airbyte schemas transform to your database's conventions + +**Checkpoint:** TableSchemaMapper implemented (validated later via TableSchemaEvolutionSuite) + +**📋 What TableSchemaMapper Does:** + +TableSchemaMapper defines schema transformations: +- **Table names:** Stream descriptor → database table name +- **Column names:** Airbyte column → database column (case, special chars) +- **Column types:** Airbyte types → database types (INTEGER → BIGINT, etc.) +- **Temp tables:** Generate staging table names + +This interface is used by: +- `TableNameResolver` / `ColumnNameResolver` (CDK collision handling) +- `TableSchemaEvolutionClient` (schema evolution in Phase 5) + +### Infrastructure Step 1: Create TableSchemaMapper + +**File:** `schema/{DB}TableSchemaMapper.kt` + +```kotlin +package io.airbyte.integrations.destination.{db}.schema + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.component.ColumnType +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType +import io.airbyte.cdk.load.schema.TableSchemaMapper +import io.airbyte.cdk.load.schema.model.TableName +import io.airbyte.cdk.load.table.TempTableNameGenerator +import io.airbyte.integrations.destination.{db}.config.toDbCompatibleName +import io.airbyte.integrations.destination.{db}.spec.{DB}Configuration +import jakarta.inject.Singleton + +@Singleton +class {DB}TableSchemaMapper( + private val config: {DB}Configuration, + private val tempTableNameGenerator: TempTableNameGenerator, +) : TableSchemaMapper { + + override fun toFinalTableName(desc: DestinationStream.Descriptor): TableName { + val namespace = (desc.namespace ?: config.database).toDbCompatibleName() + val name = desc.name.toDbCompatibleName() + return TableName(namespace, name) + } + + override fun toTempTableName(tableName: TableName): TableName { + return tempTableNameGenerator.generate(tableName) + } + + override fun toColumnName(name: String): String { + return name.toDbCompatibleName() + } + + override fun toColumnType(fieldType: FieldType): ColumnType { + val dbType = when (fieldType.type) { + BooleanType -> {DB}SqlTypes.BOOLEAN + DateType -> {DB}SqlTypes.DATE + IntegerType -> {DB}SqlTypes.BIGINT + NumberType -> {DB}SqlTypes.DECIMAL + StringType -> {DB}SqlTypes.VARCHAR + TimeTypeWithTimezone, + TimeTypeWithoutTimezone -> {DB}SqlTypes.TIME + TimestampTypeWithTimezone, + TimestampTypeWithoutTimezone -> {DB}SqlTypes.TIMESTAMP + is ArrayType, + ArrayTypeWithoutSchema, + is UnionType, + is UnknownType -> {DB}SqlTypes.JSON + ObjectTypeWithEmptySchema, + ObjectTypeWithoutSchema, + is ObjectType -> {DB}SqlTypes.JSON + } + return ColumnType(dbType, fieldType.nullable) + } +} +``` + +**Database-specific type mappings:** + +| Airbyte Type | Postgres | MySQL | Snowflake | ClickHouse | +|--------------|----------|-------|-----------|------------| +| BooleanType | BOOLEAN | TINYINT(1) | BOOLEAN | Bool | +| IntegerType | BIGINT | BIGINT | NUMBER(38,0) | Int64 | +| NumberType | DECIMAL(38,9) | DECIMAL(38,9) | FLOAT | Decimal(38,9) | +| StringType | VARCHAR | VARCHAR(65535) | VARCHAR | String | +| TimestampTypeWithTimezone | TIMESTAMPTZ | TIMESTAMP | TIMESTAMP_TZ | DateTime64(3) | +| ObjectType | JSONB | JSON | VARIANT | String/JSON | + +**Optional: Override toFinalSchema() for Dedupe Mode** + +Some databases need to adjust column nullability for dedupe mode (e.g., ClickHouse's ReplacingMergeTree requires non-null PK/cursor columns): + +```kotlin +override fun toFinalSchema(tableSchema: StreamTableSchema): StreamTableSchema { + if (tableSchema.importType !is Dedupe) { + return tableSchema // No changes for append/overwrite + } + + // Make PK and cursor columns non-nullable for dedupe + val pks = tableSchema.getPrimaryKey().flatten() + val cursor = tableSchema.getCursor().firstOrNull() + val nonNullCols = buildSet { + addAll(pks) + cursor?.let { add(it) } + } + + val finalSchema = tableSchema.columnSchema.finalSchema + .mapValues { (name, type) -> + if (name in nonNullCols) type.copy(nullable = false) else type + } + + return tableSchema.copy( + columnSchema = tableSchema.columnSchema.copy(finalSchema = finalSchema) + ) +} +``` + +Most databases don't need this override - the default implementation returns the schema unchanged. + +### Infrastructure Step 2: Validate Compilation + +```bash +$ ./gradlew :destination-{db}:compileKotlin +``` + +Expected: BUILD SUCCESSFUL (may have unresolved reference to `toDbCompatibleName` until Phase 2) + +**Note:** TableSchemaMapper is validated via `TableSchemaEvolutionSuite` in [5-advanced-features.md](./5-advanced-features.md). No separate tests needed now. + +✅ **Checkpoint:** TableSchemaMapper implemented + +--- + +## Infrastructure Phase 2: Name Generators & TableCatalog DI **Goal:** Create name generator beans required for TableCatalog instantiation **Checkpoint:** Compilation succeeds without DI errors -**📋 Dependency Context:** TableCatalog (auto-instantiated by CDK) requires these three @Singleton beans: -- RawTableNameGenerator +**📋 Dependency Context:** TableCatalog (auto-instantiated by CDK) requires these @Singleton beans: - FinalTableNameGenerator - ColumnNameGenerator +- TempTableNameGenerator -Without these beans, you'll get **"Error instantiating TableCatalog"** or **"No bean of type [FinalTableNameGenerator]"** errors in Phase 7 write tests. +Without these beans, you'll get **"Error instantiating TableCatalog"** or **"No bean of type [FinalTableNameGenerator]"** errors in write tests. -### Infrastructure Step 1: Create RawTableNameGenerator +### Infrastructure Step 1: Create Name Generators -**File:** `config/{DB}NameGenerators.kt` +**Add to file:** `config/{DB}NameGenerators.kt` (same file as the helper function) ```kotlin package io.airbyte.integrations.destination.{db}.config import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.orchestration.db.RawTableNameGenerator -import io.airbyte.cdk.load.table.TableName +import io.airbyte.cdk.load.data.Transformations.Companion.toAlphanumericAndUnderscore +import io.airbyte.cdk.load.schema.model.TableName +import io.airbyte.cdk.load.table.ColumnNameGenerator +import io.airbyte.cdk.load.table.FinalTableNameGenerator import io.airbyte.integrations.destination.{db}.spec.{DB}Configuration -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton +import java.util.Locale +import java.util.UUID -@Singleton -class {DB}RawTableNameGenerator( - private val config: {DB}Configuration, -) : RawTableNameGenerator { - override fun getTableName(descriptor: DestinationStream.Descriptor): TableName { - // Raw tables typically go to internal schema - // Modern CDK uses final tables directly, so raw tables are rarely used - val namespace = config.database // Or config.internalSchema if you have one - val name = "_airbyte_raw_${descriptor.namespace}_${descriptor.name}".toDbCompatible() - return TableName(namespace, name) - } -} -``` - -**Notes:** -- `@Singleton` annotation is **REQUIRED** - without it, Micronaut cannot inject this bean -- RawTableNameGenerator is legacy from two-stage sync (raw → final tables) -- Modern connectors typically use final tables only, but interface must be implemented -- Keep implementation simple (identity mapping is fine) - -### Infrastructure Step 2: Create FinalTableNameGenerator - -**Add to same file:** `config/{DB}NameGenerators.kt` - -```kotlin @Singleton class {DB}FinalTableNameGenerator( private val config: {DB}Configuration, ) : FinalTableNameGenerator { - override fun getTableName(descriptor: DestinationStream.Descriptor): TableName { - val namespace = descriptor.namespace?.toDbCompatible() - ?: config.database - val name = descriptor.name.toDbCompatible() + override fun getTableName(streamDescriptor: DestinationStream.Descriptor): TableName { + val namespace = (streamDescriptor.namespace ?: config.database).toDbCompatibleName() + val name = streamDescriptor.name.toDbCompatibleName() return TableName(namespace, name) } } -``` -**What this does:** -- Maps Airbyte stream descriptor → database table name -- Handles namespace mapping (if source has schemas/databases) -- Applies database-specific name transformation rules - -**Example transforms:** -```kotlin -// Input: descriptor(namespace="public", name="users") -// Output: TableName("public", "users") - -// Input: descriptor(namespace=null, name="customers") -// Output: TableName("my_database", "customers") // Uses config.database as fallback -``` - -### Infrastructure Step 3: Create ColumnNameGenerator - -**Add to same file:** `config/{DB}NameGenerators.kt` - -```kotlin @Singleton class {DB}ColumnNameGenerator : ColumnNameGenerator { override fun getColumnName(column: String): ColumnNameGenerator.ColumnName { - val dbName = column.toDbCompatible() return ColumnNameGenerator.ColumnName( - canonicalName = dbName, - displayName = dbName, + column.toDbCompatibleName(), + column.lowercase(Locale.getDefault()).toDbCompatibleName(), ) } } -``` -**What this does:** -- Maps Airbyte column names → database column names -- Applies database-specific transformations (case, special chars) +/** + * Transforms a string to be compatible with {DB} table and column names. + */ +fun String.toDbCompatibleName(): String { + // 1. Replace non-alphanumeric characters with underscore + var transformed = toAlphanumericAndUnderscore(this) -**Example transforms:** -```kotlin -// Snowflake: uppercase -"userId" → "USERID" - -// Postgres/ClickHouse: lowercase -"userId" → "userid" - -// MySQL: preserve case -"userId" → "userId" -``` - -### Infrastructure Step 4: Add Name Transformation Helper - -**Add to same file:** `config/{DB}NameGenerators.kt` - -```kotlin -// Helper function for database-specific name transformations -private fun String.toDbCompatible(): String { - // Snowflake: uppercase - return this.uppercase() - - // ClickHouse/Postgres: lowercase - return this.lowercase() - - // MySQL: preserve case, but sanitize special chars - return this.replace(Regex("[^a-zA-Z0-9_]"), "_") - - // Custom rules: Apply your database's naming conventions - // - Max length limits - // - Reserved word handling - // - Character restrictions -} -``` - -**Database-specific examples:** - -**Snowflake:** -```kotlin -private fun String.toDbCompatible() = this.uppercase() -``` - -**ClickHouse:** -```kotlin -private fun String.toDbCompatible() = this.lowercase() -``` - -**Postgres (strict):** -```kotlin -private fun String.toDbCompatible(): String { - val sanitized = this - .lowercase() - .replace(Regex("[^a-z0-9_]"), "_") - .take(63) // Postgres identifier limit - - // Handle reserved words - return if (sanitized in POSTGRES_RESERVED_WORDS) { - "_$sanitized" - } else { - sanitized + // 2. Ensure identifier does not start with a digit + if (transformed.isNotEmpty() && transformed[0].isDigit()) { + transformed = "_$transformed" } -} -private val POSTGRES_RESERVED_WORDS = setOf("user", "table", "select", ...) + // 3. Handle empty strings + if (transformed.isEmpty()) { + return "default_name_${UUID.randomUUID()}" + } + + return transformed +} ``` -### Infrastructure Step 5: Register TempTableNameGenerator in BeanFactory +**Notes:** +- `@Singleton` annotation is **REQUIRED** - without it, Micronaut cannot inject these beans +- `canonicalName` is used for collision detection (usually lowercase) +- `displayName` is what appears in queries +- Both generators use the same `toDbCompatibleName()` helper as `TableSchemaMapper` + +### Infrastructure Step 2: Register TempTableNameGenerator in BeanFactory **File:** Update `{DB}BeanFactory.kt` +Choose the pattern that fits your database: + +**Pattern A: Simple (no separate internal schema)** +```kotlin +@Singleton +fun tempTableNameGenerator(): TempTableNameGenerator { + return DefaultTempTableNameGenerator() +} +``` + +**Pattern B: With internal schema (Postgres, Snowflake)** ```kotlin @Singleton fun tempTableNameGenerator(config: {DB}Configuration): TempTableNameGenerator { return DefaultTempTableNameGenerator( - internalNamespace = config.database // Or config.internalSchema if you have one + internalNamespace = config.internalSchema ) } ``` -**What this does:** -- Temp tables are used during overwrite/dedupe operations -- CDK provides `DefaultTempTableNameGenerator` implementation -- Just needs to know which namespace to use for temp tables +**Which pattern to use:** +- **Pattern A:** Temp tables in same namespace as final tables (ClickHouse) +- **Pattern B:** Dedicated internal/staging schema for temp tables (Postgres, Snowflake) **Why register as bean?** - TempTableNameGenerator is an interface, not a class - CDK provides implementation, but YOU must register it - Used by Writer to create staging tables -### Infrastructure Step 6: Verify Compilation +### Infrastructure Step 3: Verify Compilation **Validate:** ```bash @@ -210,12 +285,11 @@ $ ./gradlew :destination-{db}:integrationTest # testSpecOss, testSuccessConfigs ``` **If you see DI errors:** -- Check all three classes have `@Singleton` annotation +- Check all classes have `@Singleton` annotation - Verify package name matches your connector structure - Ensure classes implement correct interfaces: - - `RawTableNameGenerator` (from `io.airbyte.cdk.load.orchestration.db`) - - `FinalTableNameGenerator` (from `io.airbyte.cdk.load.orchestration.db`) - - `ColumnNameGenerator` (from `io.airbyte.cdk.load.orchestration.db`) + - `FinalTableNameGenerator` (from `io.airbyte.cdk.load.table`) + - `ColumnNameGenerator` (from `io.airbyte.cdk.load.table`) ✅ **Checkpoint:** Name generators registered + all previous phases still work @@ -223,11 +297,11 @@ $ ./gradlew :destination-{db}:integrationTest # testSpecOss, testSuccessConfigs --- -⚠️ **IMPORTANT: Before starting Phase 7, read [Understanding Test Contexts](./7-troubleshooting.md#understanding-test-contexts) in the troubleshooting guide. Phase 7 introduces integration tests which behave differently than the component tests you've been using.** +⚠️ **IMPORTANT: Before starting Phase 3, read [Understanding Test Contexts](./7-troubleshooting.md#understanding-test-contexts) in the troubleshooting guide. This phase introduces integration tests which behave differently than the component tests you've been using.** --- -## Infrastructure Phase 2: Write Operation Infrastructure +## Infrastructure Phase 3: Write Operation Infrastructure **Goal:** Create write operation infrastructure beans (no business logic yet) @@ -253,7 +327,7 @@ import io.airbyte.cdk.Operation import io.airbyte.cdk.load.dataflow.DestinationLifecycle import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Requires -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton @Primary @Singleton @@ -300,17 +374,18 @@ IllegalStateException: A legal sync requires a declared @Singleton of a type tha ```kotlin package io.airbyte.integrations.destination.{db}.config +import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.component.TableOperationsClient -import io.airbyte.cdk.load.orchestration.db.* -import io.micronaut.context.annotation.Singleton +import io.airbyte.cdk.load.table.BaseDirectLoadInitialStatusGatherer +import jakarta.inject.Singleton @Singleton class {DB}DirectLoadDatabaseInitialStatusGatherer( tableOperationsClient: TableOperationsClient, - tempTableNameGenerator: TempTableNameGenerator, + catalog: DestinationCatalog, ) : BaseDirectLoadInitialStatusGatherer( tableOperationsClient, - tempTableNameGenerator, + catalog, ) ``` @@ -335,35 +410,9 @@ DirectLoadInitialStatus( ) ``` -⚠️ **MISSING IN V1 GUIDE:** This step existed as code but bean registration was missing! +**Note:** The `@Singleton` annotation on the class is sufficient - no separate BeanFactory registration needed. Micronaut will auto-discover this bean. -### Infrastructure Step 3: Register DatabaseInitialStatusGatherer in BeanFactory - -**File:** Update `{DB}BeanFactory.kt` - -```kotlin -@Singleton -fun initialStatusGatherer( - client: TableOperationsClient, - tempTableNameGenerator: TempTableNameGenerator, -): DatabaseInitialStatusGatherer { - return {DB}DirectLoadDatabaseInitialStatusGatherer(client, tempTableNameGenerator) -} -``` - -⚠️ **CRITICAL:** This bean registration was MISSING in V1 guide! - -**Why this is needed:** -- Writer requires `DatabaseInitialStatusGatherer` injection -- Without this bean: `No bean of type [DatabaseInitialStatusGatherer] exists` -- Class exists but bean registration forgotten → DI error - -**Why use factory method instead of class @Singleton?** -- DatabaseInitialStatusGatherer is generic: `DatabaseInitialStatusGatherer` -- Micronaut needs explicit return type for generic beans -- Factory method provides type safety - -### Infrastructure Step 4: Create ColumnNameMapper +### Infrastructure Step 3: Create ColumnNameMapper **File:** `write/transform/{DB}ColumnNameMapper.kt` @@ -373,7 +422,7 @@ package io.airbyte.integrations.destination.{db}.write.transform import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.dataflow.transform.ColumnNameMapper import io.airbyte.cdk.load.table.TableCatalog -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton @Singleton class {DB}ColumnNameMapper( @@ -407,7 +456,7 @@ class {DB}ColumnNameMapper( - ColumnNameMapper: Uses mappings during transform (Phase 7) - Separation of concerns: generation vs. application -### Infrastructure Step 5: Register AggregatePublishingConfig in BeanFactory +### Infrastructure Step 4: Register AggregatePublishingConfig in BeanFactory **File:** Update `{DB}BeanFactory.kt` @@ -449,7 +498,7 @@ fun aggregatePublishingConfig(dataChannelMedium: DataChannelMedium): AggregatePu - Tune later based on performance requirements - Start with defaults - they work for most databases -### Infrastructure Step 6: Create WriteInitializationTest +### Infrastructure Step 5: Create WriteInitializationTest **File:** `src/test-integration/kotlin/.../write/{DB}WriteInitTest.kt` @@ -498,7 +547,7 @@ Phase 7: WriteInitTest validates they work with real catalog Phase 8: ConnectorWiringSuite validates full write path with mock catalog ``` -### Infrastructure Step 7: Create Test Config File +### Infrastructure Step 6: Create Test Config File **File:** `secrets/config.json` @@ -525,7 +574,7 @@ $ mkdir -p destination-{db}/secrets **Note:** Add `secrets/` to `.gitignore` to avoid committing credentials -### Infrastructure Step 8: Validate WriteInitializationTest +### Infrastructure Step 7: Validate WriteInitializationTest **Validate:** ```bash diff --git a/connector-writer/destination/step-by-step/4-write-operations.md b/connector-writer/destination/step-by-step/4-write-operations.md index 02711c088b0..217d57c383f 100644 --- a/connector-writer/destination/step-by-step/4-write-operations.md +++ b/connector-writer/destination/step-by-step/4-write-operations.md @@ -36,7 +36,7 @@ Phase 7 validates "can we start?" Phase 8 validates "can we write data?" package io.airbyte.integrations.destination.{db}.write.load import io.airbyte.cdk.load.data.AirbyteValue -import io.airbyte.cdk.load.table.TableName +import io.airbyte.cdk.load.schema.model.TableName import io.airbyte.integrations.destination.{db}.client.{DB}AirbyteClient import io.github.oshai.kotlinlogging.KotlinLogging @@ -208,13 +208,13 @@ package io.airbyte.integrations.destination.{db}.dataflow import io.airbyte.cdk.load.dataflow.aggregate.Aggregate import io.airbyte.cdk.load.dataflow.aggregate.AggregateFactory -import io.airbyte.cdk.load.orchestration.db.DirectLoadTableExecutionConfig import io.airbyte.cdk.load.state.StoreKey -import io.airbyte.cdk.load.state.StreamStateStore +import io.airbyte.cdk.load.table.directload.DirectLoadTableExecutionConfig +import io.airbyte.cdk.load.write.StreamStateStore import io.airbyte.integrations.destination.{db}.client.{DB}AirbyteClient import io.airbyte.integrations.destination.{db}.write.load.{DB}InsertBuffer import io.micronaut.context.annotation.Factory -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton @Factory class {DB}AggregateFactory( @@ -256,90 +256,101 @@ class {DB}AggregateFactory( ```kotlin package io.airbyte.integrations.destination.{db}.write +import io.airbyte.cdk.SystemErrorException +import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.orchestration.db.* -import io.airbyte.cdk.load.state.StreamStateStore -import io.airbyte.cdk.load.table.TableCatalog +import io.airbyte.cdk.load.table.ColumnNameMapping +import io.airbyte.cdk.load.table.DatabaseInitialStatusGatherer +import io.airbyte.cdk.load.table.directload.DirectLoadInitialStatus +import io.airbyte.cdk.load.table.directload.DirectLoadTableAppendStreamLoader +import io.airbyte.cdk.load.table.directload.DirectLoadTableAppendTruncateStreamLoader +import io.airbyte.cdk.load.table.directload.DirectLoadTableExecutionConfig import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader +import io.airbyte.cdk.load.write.StreamStateStore import io.airbyte.integrations.destination.{db}.client.{DB}AirbyteClient -import io.micronaut.context.annotation.Singleton +import jakarta.inject.Singleton @Singleton class {DB}Writer( - private val names: TableCatalog, + private val catalog: DestinationCatalog, private val stateGatherer: DatabaseInitialStatusGatherer, private val streamStateStore: StreamStateStore, private val client: {DB}AirbyteClient, - private val tempTableNameGenerator: TempTableNameGenerator, ) : DestinationWriter { private lateinit var initialStatuses: Map override suspend fun setup() { // Create all namespaces - names.values - .map { it.tableNames.finalTableName!!.namespace } + catalog.streams + .map { it.tableSchema.tableNames.finalTableName!!.namespace } .toSet() .forEach { client.createNamespace(it) } // Gather initial state (which tables exist, generation IDs, etc.) - initialStatuses = stateGatherer.gatherInitialStatus(names) + initialStatuses = stateGatherer.gatherInitialStatus() } override fun createStreamLoader(stream: DestinationStream): StreamLoader { - // Defensive: Handle streams not in catalog (for test compatibility) - val initialStatus = if (::initialStatuses.isInitialized) { - initialStatuses[stream] ?: DirectLoadInitialStatus(null, null) - } else { - DirectLoadInitialStatus(null, null) - } + val initialStatus = initialStatuses[stream]!! - val tableNameInfo = names[stream] - val (realTableName, tempTableName, columnNameMapping) = if (tableNameInfo != null) { - // Stream in catalog - use configured names - Triple( - tableNameInfo.tableNames.finalTableName!!, - tempTableNameGenerator.generate(tableNameInfo.tableNames.finalTableName!!), - tableNameInfo.columnNameMapping - ) - } else { - // Dynamic stream (test-generated) - use descriptor names directly - val tableName = TableName( - namespace = stream.mappedDescriptor.namespace ?: "test", - name = stream.mappedDescriptor.name - ) - Triple(tableName, tempTableNameGenerator.generate(tableName), ColumnNameMapping(emptyMap())) - } - - // Phase 8: Append mode only - // Phase 10: Add truncate mode (minimumGenerationId = generationId) - // Phase 13: Add dedupe mode (importType is Dedupe) - return DirectLoadTableAppendStreamLoader( - stream, - initialStatus, - realTableName, - tempTableName, - columnNameMapping, - client, // TableOperationsClient - client, // TableSchemaEvolutionClient - streamStateStore, + // Access schema directly from stream (modern CDK pattern) + val realTableName = stream.tableSchema.tableNames.finalTableName!! + val tempTableName = stream.tableSchema.tableNames.tempTableName!! + val columnNameMapping = ColumnNameMapping( + stream.tableSchema.columnSchema.inputToFinalColumnNames ) + + // Choose StreamLoader based on sync mode + return when (stream.minimumGenerationId) { + 0L -> + // Append mode: just insert records + DirectLoadTableAppendStreamLoader( + stream, + initialStatus, + realTableName = realTableName, + tempTableName = tempTableName, + columnNameMapping, + client, // TableOperationsClient + client, // TableSchemaEvolutionClient + streamStateStore, + ) + stream.generationId -> + // Overwrite/truncate mode: replace table contents + DirectLoadTableAppendTruncateStreamLoader( + stream, + initialStatus, + realTableName = realTableName, + tempTableName = tempTableName, + columnNameMapping, + client, + client, + streamStateStore, + ) + else -> + throw SystemErrorException( + "Cannot execute a hybrid refresh - current generation ${stream.generationId}; minimum generation ${stream.minimumGenerationId}" + ) + } } } ``` **What this does:** - **setup()**: Creates namespaces, gathers initial table state -- **createStreamLoader()**: Creates StreamLoader for each stream - - AppendStreamLoader: Just insert records (this phase) - - TruncateStreamLoader: Overwrite table (Phase 10) - - DedupStreamLoader: Upsert with primary key (Phase 13) +- **createStreamLoader()**: Creates StreamLoader for each stream based on sync mode -**Defensive pattern (lines 27-52):** -- Handles ConnectorWiringSuite creating dynamic test streams -- Test streams not in TableCatalog → use descriptor names directly -- Prevents NullPointerException in tests +**Modern CDK pattern (stream.tableSchema):** +- Schema info is embedded in `stream.tableSchema` (set by CDK) +- Access via `stream.tableSchema.tableNames.finalTableName!!` +- Column mappings via `stream.tableSchema.columnSchema.inputToFinalColumnNames` +- No need for defensive null checks (CDK guarantees schema exists) + +**StreamLoader selection:** +- `minimumGenerationId == 0`: Append mode (DirectLoadTableAppendStreamLoader) +- `minimumGenerationId == generationId`: Overwrite mode (DirectLoadTableAppendTruncateStreamLoader) +- Other combinations: Error (hybrid refresh not supported) **StreamLoader responsibilities:** - start(): Create/prepare table diff --git a/connector-writer/destination/step-by-step/5-advanced-features.md b/connector-writer/destination/step-by-step/5-advanced-features.md index 7a25bd5268a..87efd99e7b7 100644 --- a/connector-writer/destination/step-by-step/5-advanced-features.md +++ b/connector-writer/destination/step-by-step/5-advanced-features.md @@ -100,23 +100,21 @@ override fun computeSchema( stream: DestinationStream, columnNameMapping: ColumnNameMapping ): TableSchema { - val columns = stream.schema.asColumns() - .filter { (name, _) -> name !in AIRBYTE_META_COLUMNS } - .mapKeys { (name, _) -> columnNameMapping[name]!! } - .mapValues { (_, field) -> - val dbType = columnUtils.toDialectType(field.type) - .takeWhile { it != '(' } // Strip precision - ColumnType(dbType, field.nullable) - } - - return TableSchema(columns) + // Modern CDK pattern: schema is pre-computed by CDK using TableSchemaMapper + return TableSchema(stream.tableSchema.columnSchema.finalSchema) } ``` **What this does:** -- Converts Airbyte schema → database schema -- Applies column name mapping (Phase 6 generators) -- Uses ColumnUtils.toDialectType() from Phase 4 +- Returns the pre-computed final schema from the stream +- CDK has already applied `TableSchemaMapper.toColumnType()` and `toColumnName()` to compute this +- No manual type conversion needed - TableSchemaMapper handles it + +**Why this is simpler than manual conversion:** +- TableSchemaMapper (from Phase 3.1) defines type mappings +- CDK calls mapper during catalog initialization +- Result is stored in `stream.tableSchema.columnSchema.finalSchema` +- `computeSchema()` just returns this pre-computed value ### Advanced Step 3: Implement alterTable() - ADD COLUMN @@ -304,14 +302,140 @@ override suspend fun ensureSchemaMatches( - If source schema changed since last sync, applies schema changes - Automatic - no user intervention needed -### Advanced Step 7: Validate Schema Evolution +### Advanced Step 7: Create TableSchemaEvolutionTest + +**File:** `src/test-integration/kotlin/.../component/{DB}TableSchemaEvolutionTest.kt` + +```kotlin +package io.airbyte.integrations.destination.{db}.component + +import io.airbyte.cdk.load.component.TableOperationsClient +import io.airbyte.cdk.load.component.TableSchema +import io.airbyte.cdk.load.component.TableSchemaEvolutionClient +import io.airbyte.cdk.load.component.TableSchemaEvolutionSuite +import io.airbyte.cdk.load.component.TestTableOperationsClient +import io.airbyte.cdk.load.schema.TableSchemaFactory +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import org.junit.jupiter.api.Test + +@MicronautTest(environments = ["component"]) +class {DB}TableSchemaEvolutionTest( + override val client: TableSchemaEvolutionClient, + override val opsClient: TableOperationsClient, + override val testClient: TestTableOperationsClient, + override val schemaFactory: TableSchemaFactory, +) : TableSchemaEvolutionSuite { + + // Provide expected schema for your database's type representations + // This validates that discoverSchema() and computeSchema() produce consistent results + private val expectedAllTypesSchema: TableSchema by lazy { + // Build expected schema based on your TableSchemaMapper.toColumnType() implementation + // Example for Postgres: + // TableSchema(mapOf( + // "boolean_col" to ColumnType("boolean", true), + // "integer_col" to ColumnType("bigint", true), + // "number_col" to ColumnType("numeric", true), + // "string_col" to ColumnType("character varying", true), + // ... + // )) + TODO("Define expected schema for all types") + } + + @Test + override fun `discover recognizes all data types`() { + super.`discover recognizes all data types`(expectedAllTypesSchema) + } + + @Test + override fun `computeSchema handles all data types`() { + super.`computeSchema handles all data types`(expectedAllTypesSchema) + } + + @Test + override fun `noop diff`() { + super.`noop diff`() + } + + @Test + override fun `changeset is correct when adding a column`() { + super.`changeset is correct when adding a column`() + } + + @Test + override fun `changeset is correct when dropping a column`() { + super.`changeset is correct when dropping a column`() + } + + @Test + override fun `changeset is correct when changing a column's type`() { + super.`changeset is correct when changing a column's type`() + } + + @Test + override fun `apply changeset - handle sync mode append`() { + super.`apply changeset - handle sync mode append`() + } + + @Test + override fun `apply changeset - handle changing sync mode from append to dedup`() { + super.`apply changeset - handle changing sync mode from append to dedup`() + } + + @Test + override fun `apply changeset - handle changing sync mode from dedup to append`() { + super.`apply changeset - handle changing sync mode from dedup to append`() + } + + @Test + override fun `apply changeset - handle sync mode dedup`() { + super.`apply changeset - handle sync mode dedup`() + } + + @Test + override fun `change from string type to unknown type`() { + super.`change from string type to unknown type`() + } + + @Test + override fun `change from unknown type to string type`() { + super.`change from unknown type to string type`() + } +} +``` + +**What each test validates:** + +| Test | What It Validates | +|------|-------------------| +| `discover recognizes all data types` | discoverSchema() correctly identifies existing table columns | +| `computeSchema handles all data types` | computeSchema() produces correct schema from stream definition | +| `noop diff` | No changes detected when schemas match | +| `changeset is correct when adding a column` | Detects when new columns need to be added | +| `changeset is correct when dropping a column` | Detects when columns should be dropped | +| `changeset is correct when changing a column's type` | Detects type changes | +| `apply changeset - handle sync mode *` | Schema evolution works across sync mode changes | +| `change from string/unknown type` | Complex type transformations work | + +**Note:** This test validates both `TableSchemaEvolutionClient` AND `TableSchemaMapper` (via `computeSchema`). + +### Advanced Step 8: Validate Schema Evolution **Validate:** ```bash -$ ./gradlew :destination-{db}:componentTest # 12 tests should pass -$ ./gradlew :destination-{db}:integrationTest # 3 tests should pass +$ ./gradlew :destination-{db}:componentTest --tests "*TableSchemaEvolutionTest*" +$ ./gradlew :destination-{db}:componentTest # All component tests should pass +$ ./gradlew :destination-{db}:integrationTest # Integration tests should pass ``` +**Common failures and fixes:** + +| Failure | Likely Cause | Fix | +|---------|--------------|-----| +| `discover recognizes all data types` fails | discoverSchema() returns wrong types | Check information_schema query and type name normalization | +| `computeSchema handles all data types` fails | TableSchemaMapper.toColumnType() returns wrong types | Update type mapping in TableSchemaMapper | +| Type mismatch between discover/compute | Inconsistent type names (e.g., "VARCHAR" vs "varchar") | Normalize type names in both methods | +| `apply changeset` fails | ALTER TABLE syntax wrong for your database | Check SqlGenerator.alterTable() implementation | + ✅ **Checkpoint:** Schema evolution works + all previous phases still work ---