1
0
mirror of synced 2025-12-19 18:14:56 -05:00

fix clickhouse cdc cursor issue (#70853)

This commit is contained in:
Ryan Br...
2025-12-15 10:27:48 -08:00
committed by GitHub
parent b2afd6e91e
commit ef5ab0482b
4 changed files with 15 additions and 5 deletions

View File

@@ -54,8 +54,11 @@ class ClickhouseSqlGenerator {
// Check if cursor column type is valid for ClickHouse ReplacingMergeTree
val cursor = tableSchema.getCursor().firstOrNull()
val cursorType = cursor?.let { finalSchema[it]?.type }
val useCursorAsVersion =
cursorType != null && isValidVersionColumn(cursor, cursorType)
val versionColumn =
if (cursorType?.isValidVersionColumnType() ?: false) {
if (useCursorAsVersion) {
"`$cursor`"
} else {
// Fallback to _airbyte_extracted_at if no cursor is specified or cursor

View File

@@ -4,6 +4,7 @@
package io.airbyte.integrations.destination.clickhouse.client
import io.airbyte.cdk.load.table.CDC_CURSOR_COLUMN
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlTypes.VALID_VERSION_COLUMN_TYPES
object ClickhouseSqlTypes {
@@ -23,4 +24,9 @@ object ClickhouseSqlTypes {
)
}
fun String.isValidVersionColumnType() = VALID_VERSION_COLUMN_TYPES.contains(this)
// Warning: if any munging changes the name of the CDC column name this will break.
// Currently, that is not the case.
fun isValidVersionColumn(name: String, type: String) =
// CDC cursors cannot be used as a version column since they are null
// during the initial CDC snapshot.
name != CDC_CURSOR_COLUMN && VALID_VERSION_COLUMN_TYPES.contains(type)

View File

@@ -29,7 +29,7 @@ import io.airbyte.cdk.load.schema.model.StreamTableSchema
import io.airbyte.cdk.load.schema.model.TableName
import io.airbyte.cdk.load.table.TempTableNameGenerator
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlTypes
import io.airbyte.integrations.destination.clickhouse.client.isValidVersionColumnType
import io.airbyte.integrations.destination.clickhouse.client.isValidVersionColumn
import io.airbyte.integrations.destination.clickhouse.config.toClickHouseCompatibleName
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import jakarta.inject.Singleton
@@ -100,7 +100,7 @@ class ClickhouseTableSchemaMapper(
if (cursor != null) {
// Check if the cursor column type is valid for ClickHouse ReplacingMergeTree
val cursorColumnType = tableSchema.columnSchema.finalSchema[cursor]!!.type
if (cursorColumnType.isValidVersionColumnType()) {
if (isValidVersionColumn(cursor, cursorColumnType)) {
// Cursor column is valid, use it as version column
add(cursor) // Make cursor column non-nullable too
}

View File

@@ -173,7 +173,8 @@ The connector converts arrays and unions to strings for compatibility. If you ne
| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------|
| 2.1.16 | 2025-12-12 | [70897](https://github.com/airbytehq/airbyte/pull/70897) | Promoting release candidate 2.1.16-rc.3 to a main version. |
| 2.1.17 | 2025-12-12 | [70835](https://github.com/airbytehq/airbyte/pull/70835) | Fix: Skip CDC cursor for version column consideration for dedupe. |
| 2.1.16 | 2025-12-12 | [70897](https://github.com/airbytehq/airbyte/pull/70897) | Promoting release candidate 2.1.16-rc.3 to a main version. |
| 2.1.16-rc.3| 2025-12-09 | [70835](https://github.com/airbytehq/airbyte/pull/70835) | Pick up CDK fixes for namespace / prefix handling |
| 2.1.16-rc.2| 2025-12-09 | [70358](https://github.com/airbytehq/airbyte/pull/70358) | Internal refactor: Use TableSchemaMapper for schema operations cont. |
| 2.1.16-rc.1| 2025-12-04 | [70279](https://github.com/airbytehq/airbyte/pull/70279) | Internal refactor: Use TableSchemaMapper for schema operations |