1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Bulk load CKD: schema evolution test suite (#69234)

This commit is contained in:
Edward Gao
2025-11-13 15:17:28 -08:00
committed by GitHub
parent 02822b6364
commit f943c0f299
5 changed files with 552 additions and 2 deletions

View File

@@ -1,3 +1,7 @@
## Version 0.1.78
load cdk: add basic schema evolution test cases
## Version 0.1.77
**Extract CDK**

View File

@@ -9,14 +9,23 @@ import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.NamespaceMapper
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.DateType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META
@@ -24,6 +33,7 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.load.table.CDC_DELETED_AT_COLUMN
import io.airbyte.cdk.load.table.ColumnNameMapping
import io.airbyte.cdk.load.table.TableName
import io.airbyte.cdk.load.util.Jsons
import java.util.UUID
/**
@@ -37,6 +47,7 @@ object TableOperationsFixtures {
// Common schemas
val TEST_INTEGER_SCHEMA = ObjectType(linkedMapOf(TEST_FIELD to FieldType(IntegerType, true)))
val TEST_STRING_SCHEMA = ObjectType(linkedMapOf(TEST_FIELD to FieldType(StringType, true)))
val ID_AND_TEST_SCHEMA =
ObjectType(
@@ -55,8 +66,46 @@ object TableOperationsFixtures {
),
)
val ALL_TYPES_SCHEMA =
ObjectType(
linkedMapOf(
"string" to FieldType(StringType, true),
"boolean" to FieldType(BooleanType, true),
"integer" to FieldType(IntegerType, true),
"number" to FieldType(NumberType, true),
"date" to FieldType(DateType, true),
"timestamp_tz" to FieldType(TimestampTypeWithTimezone, true),
"timestamp_ntz" to FieldType(TimestampTypeWithoutTimezone, true),
"time_tz" to FieldType(TimeTypeWithTimezone, true),
"time_ntz" to FieldType(TimeTypeWithoutTimezone, true),
"array" to FieldType(ArrayType(FieldType(StringType, true)), true),
"object" to
FieldType(ObjectType(linkedMapOf("key" to FieldType(StringType, true))), true),
"unknown" to FieldType(UnknownType(Jsons.readTree("""{"type": "potato"}""")), true),
),
)
val ALL_TYPES_MAPPING =
ColumnNameMapping(
mapOf(
"string" to "string",
"boolean" to "boolean",
"integer" to "integer",
"number" to "number",
"date" to "date",
"timestamp_tz" to "timestamp_tz",
"timestamp_ntz" to "timestamp_ntz",
"time_tz" to "time_tz",
"time_ntz" to "time_ntz",
"array" to "array",
"object" to "object",
"unknown" to "unknown",
)
)
// Common column mappings
val TEST_MAPPING = ColumnNameMapping(mapOf(TEST_FIELD to TEST_FIELD))
val ID_AND_TEST_MAPPING =
ColumnNameMapping(mapOf(TEST_FIELD to TEST_FIELD, ID_FIELD to ID_FIELD))
val ID_TEST_WITH_CDC_MAPPING =
ColumnNameMapping(
@@ -276,11 +325,21 @@ object TableOperationsFixtures {
prefix: String,
namespace: String,
): TableName {
return TableName(namespace, "$prefix-${UUID.randomUUID()}")
return TableName(
namespace,
"$prefix-${UUID.randomUUID()}"
// this is a hack for now - eventually we probably want to plumb in a
// TableNameGenerator,
// but until then - underscores are generally nicer than hyphens.
.replace('-', '_'),
)
}
fun generateTestNamespace(prefix: String): String {
return "$prefix-${UUID.randomUUID()}"
// this is a hack for now - eventually we probably want to plumb in a TableNameGenerator,
// but until then - underscores are generally nicer than hyphens.
.replace('-', '_')
}
// Create common destination stream configurations
@@ -342,12 +401,21 @@ object TableOperationsFixtures {
return map { record -> record.mapKeys { (k, _) -> totalMapping.originalName(k) ?: k } }
}
fun <V> List<Map<String, V>>.removeNulls() =
this.map { record -> record.filterValues { it != null } }
suspend fun TestTableOperationsClient.insertRecords(
table: TableName,
records: List<Map<String, AirbyteValue>>,
columnNameMapping: ColumnNameMapping,
) = insertRecords(table, records.applyColumnNameMapping(columnNameMapping))
suspend fun TestTableOperationsClient.insertRecords(
table: TableName,
columnNameMapping: ColumnNameMapping,
vararg records: Map<String, AirbyteValue>,
) = insertRecords(table, records.toList(), columnNameMapping)
fun inputRecord(
rawId: String,
extractedAt: String,

View File

@@ -0,0 +1,474 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.component
import io.airbyte.cdk.load.component.TableOperationsFixtures as Fixtures
import io.airbyte.cdk.load.component.TableOperationsFixtures.ID_FIELD
import io.airbyte.cdk.load.component.TableOperationsFixtures.TEST_FIELD
import io.airbyte.cdk.load.component.TableOperationsFixtures.insertRecords
import io.airbyte.cdk.load.component.TableOperationsFixtures.removeNulls
import io.airbyte.cdk.load.component.TableOperationsFixtures.reverseColumnNameMapping
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.load.table.ColumnNameMapping
import io.airbyte.cdk.load.table.TableName
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import kotlin.test.assertEquals
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.assertAll
@MicronautTest(environments = ["component"])
interface TableSchemaEvolutionSuite {
val client: TableSchemaEvolutionClient
val airbyteMetaColumnMapping: Map<String, String>
get() = Meta.COLUMN_NAMES.associateWith { it }
val opsClient: TableOperationsClient
val testClient: TestTableOperationsClient
private val harness: TableOperationsTestHarness
get() = TableOperationsTestHarness(opsClient, testClient, airbyteMetaColumnMapping)
/**
* Test that the connector can correctly discover all of its own data types. This test creates a
* table with a column for each data type, then tries to discover its schema.
*
* @param expectedDiscoveredSchema The schema to expect. This should be the same schema as
* [`computeSchema handles all data types`].
*/
fun `discover recognizes all data types`(expectedDiscoveredSchema: TableSchema) {
`discover recognizes all data types`(expectedDiscoveredSchema, Fixtures.ALL_TYPES_MAPPING)
}
fun `discover recognizes all data types`(
expectedDiscoveredSchema: TableSchema,
columnNameMapping: ColumnNameMapping
) = runTest {
val testNamespace = Fixtures.generateTestNamespace("namespace-test")
val testTable = Fixtures.generateTestTableName("table-test-table", testNamespace)
val stream =
Fixtures.createAppendStream(
namespace = testTable.namespace,
name = testTable.name,
schema = Fixtures.ALL_TYPES_SCHEMA,
)
opsClient.createNamespace(testNamespace)
opsClient.createTable(
tableName = testTable,
columnNameMapping = columnNameMapping,
stream = stream,
replace = false,
)
val discoveredSchema = client.discoverSchema(testTable)
assertEquals(expectedDiscoveredSchema, discoveredSchema)
}
/**
* Test that the connector can correctly compute a schema for a stream containing all data
* types.
*
* @param expectedComputedSchema The schema to expect. This should be the same schema as
* [`discover recognizes all data types`].
*/
fun `computeSchema handles all data types`(expectedComputedSchema: TableSchema) {
`computeSchema handles all data types`(expectedComputedSchema, Fixtures.ALL_TYPES_MAPPING)
}
fun `computeSchema handles all data types`(
expectedComputedSchema: TableSchema,
columnNameMapping: ColumnNameMapping
) {
val testNamespace = Fixtures.generateTestNamespace("namespace-test")
val testTable = Fixtures.generateTestTableName("table-test-table", testNamespace)
val stream =
Fixtures.createAppendStream(
namespace = testTable.namespace,
name = testTable.name,
schema = Fixtures.ALL_TYPES_SCHEMA,
)
val computedSchema = client.computeSchema(stream, columnNameMapping)
assertEquals(expectedComputedSchema, computedSchema)
}
/**
* Test that the connector correctly detects a no-change situation. This test just creates a
* table, discovers its schema, and computes a changeset against that same schema.
*/
fun `noop diff`() {
`noop diff`(Fixtures.TEST_MAPPING)
}
fun `noop diff`(
columnNameMapping: ColumnNameMapping,
) = runTest {
val testNamespace = Fixtures.generateTestNamespace("namespace-test")
val testTable = Fixtures.generateTestTableName("table-test-table", testNamespace)
val (_, _, columnChangeset) =
computeSchemaEvolution(
testTable,
Fixtures.TEST_INTEGER_SCHEMA,
columnNameMapping,
Fixtures.TEST_INTEGER_SCHEMA,
columnNameMapping,
)
assertTrue(columnChangeset.isNoop(), "Expected changeset to be noop. Got $columnChangeset")
}
/** Test that the connector can correctly detect when a new column needs to be added */
fun `changeset is correct when adding a column`() {
`changeset is correct when adding a column`(
initialColumnNameMapping = Fixtures.TEST_MAPPING,
modifiedColumnNameMapping = Fixtures.ID_AND_TEST_MAPPING
)
}
fun `changeset is correct when adding a column`(
initialColumnNameMapping: ColumnNameMapping,
modifiedColumnNameMapping: ColumnNameMapping
) = runTest {
val testNamespace = Fixtures.generateTestNamespace("namespace-test")
val testTable = Fixtures.generateTestTableName("table-test-table", testNamespace)
val (_, _, columnChangeset) =
computeSchemaEvolution(
testTable,
Fixtures.TEST_INTEGER_SCHEMA,
initialColumnNameMapping,
Fixtures.ID_AND_TEST_SCHEMA,
modifiedColumnNameMapping,
)
// The changeset should indicate that we're trying to add a column
assertAll(
{
assertEquals(
columnChangeset.columnsToAdd.keys,
setOf(modifiedColumnNameMapping[ID_FIELD]),
"Expected to add exactly one column. Got ${columnChangeset.columnsToAdd}"
)
},
{
assertEquals(
0,
columnChangeset.columnsToDrop.size,
"Expected to not drop any columns. Got ${columnChangeset.columnsToDrop}"
)
},
{
assertEquals(
0,
columnChangeset.columnsToChange.size,
"Expected to not change any columns. Got ${columnChangeset.columnsToChange}"
)
},
{
assertEquals(
setOf(modifiedColumnNameMapping[TEST_FIELD]),
columnChangeset.columnsToRetain.keys,
"Expected to retain the original column. Got ${columnChangeset.columnsToRetain}"
)
},
)
}
/** Test that the connector can correctly detect when a column needs to be dropped */
fun `changeset is correct when dropping a column`() {
`changeset is correct when dropping a column`(
initialColumnNameMapping = Fixtures.ID_AND_TEST_MAPPING,
modifiedColumnNameMapping = Fixtures.TEST_MAPPING
)
}
fun `changeset is correct when dropping a column`(
initialColumnNameMapping: ColumnNameMapping,
modifiedColumnNameMapping: ColumnNameMapping
) = runTest {
val testNamespace = Fixtures.generateTestNamespace("namespace-test")
val testTable = Fixtures.generateTestTableName("table-test-table", testNamespace)
val (_, _, columnChangeset) =
computeSchemaEvolution(
testTable,
Fixtures.ID_AND_TEST_SCHEMA,
initialColumnNameMapping,
Fixtures.TEST_INTEGER_SCHEMA,
modifiedColumnNameMapping,
)
// The changeset should indicate that we're trying to drop a column
assertAll(
{
assertEquals(
0,
columnChangeset.columnsToAdd.size,
"Expected to not add any columns. Got ${columnChangeset.columnsToAdd}"
)
},
{
assertEquals(
setOf(initialColumnNameMapping[ID_FIELD]),
columnChangeset.columnsToDrop.keys,
"Expected to drop exactly one column. Got ${columnChangeset.columnsToDrop}"
)
},
{
assertEquals(
0,
columnChangeset.columnsToChange.size,
"Expected to not change any columns. Got ${columnChangeset.columnsToChange}"
)
},
{
assertEquals(
setOf(initialColumnNameMapping[TEST_FIELD]),
columnChangeset.columnsToRetain.keys,
"Expected to retain the original column. Got ${columnChangeset.columnsToRetain}"
)
},
)
}
/**
* Test that the connector can correctly detect when a column's type needs to be changed. Note
* that this only tests changing the actual type, _not_ changing the column's nullability.
*/
fun `changeset is correct when changing a column's type`() {
`changeset is correct when changing a column's type`(Fixtures.TEST_MAPPING)
}
fun `changeset is correct when changing a column's type`(
columnNameMapping: ColumnNameMapping,
) = runTest {
val testNamespace = Fixtures.generateTestNamespace("namespace-test")
val testTable = Fixtures.generateTestTableName("table-test-table", testNamespace)
val (actualSchema, expectedSchema, columnChangeset) =
computeSchemaEvolution(
testTable,
Fixtures.TEST_INTEGER_SCHEMA,
columnNameMapping,
Fixtures.TEST_STRING_SCHEMA,
columnNameMapping,
)
val actualType = actualSchema.columns[columnNameMapping[TEST_FIELD]]!!
val expectedType = expectedSchema.columns[columnNameMapping[TEST_FIELD]]!!
// The changeset should indicate that we're trying to drop a column
assertAll(
{
assertEquals(
0,
columnChangeset.columnsToAdd.size,
"Expected to not add any columns. Got ${columnChangeset.columnsToAdd}"
)
},
{
assertEquals(
0,
columnChangeset.columnsToDrop.size,
"Expected to not drop any columns. Got ${columnChangeset.columnsToDrop}"
)
},
{
assertEquals(
1,
columnChangeset.columnsToChange.size,
"Expected to change exactly one column. Got ${columnChangeset.columnsToChange}"
)
},
{
assertEquals(
ColumnTypeChange(actualType, expectedType),
columnChangeset.columnsToChange[columnNameMapping[TEST_FIELD]],
"Expected column to change from $actualType to $expectedType. Got ${columnChangeset.columnsToChange}"
)
},
{
assertEquals(
0,
columnChangeset.columnsToRetain.size,
"Expected to retain the original column. Got ${columnChangeset.columnsToRetain}"
)
},
)
}
fun `basic apply changeset`() {
`basic apply changeset`(
initialColumnNameMapping =
ColumnNameMapping(
mapOf(
"to_retain" to "to_retain",
"to_change" to "to_change",
"to_drop" to "to_drop",
)
),
modifiedColumnNameMapping =
ColumnNameMapping(
mapOf(
"to_retain" to "to_retain",
"to_change" to "to_change",
"to_add" to "to_add",
)
),
)
}
/**
* Execute a basic set of schema changes. We're not changing the sync mode, the types are just
* string/int (i.e. no JSON), and there's no funky characters anywhere.
*/
fun `basic apply changeset`(
initialColumnNameMapping: ColumnNameMapping,
modifiedColumnNameMapping: ColumnNameMapping
) = runTest {
val testNamespace = Fixtures.generateTestNamespace("namespace-test")
val testTable = Fixtures.generateTestTableName("table-test-table", testNamespace)
val initialSchema =
ObjectType(
linkedMapOf(
"to_retain" to FieldType(StringType, true),
"to_change" to FieldType(IntegerType, true),
"to_drop" to FieldType(StringType, true),
),
)
val modifiedSchema =
ObjectType(
linkedMapOf(
"to_retain" to FieldType(StringType, true),
"to_change" to FieldType(StringType, true),
"to_add" to FieldType(StringType, true),
),
)
val modifiedStream =
Fixtures.createAppendStream(
namespace = testTable.namespace,
name = testTable.name,
schema = modifiedSchema,
)
// Create the table and compute the schema changeset
val (_, expectedSchema, changeset) =
computeSchemaEvolution(
testTable,
initialSchema,
initialColumnNameMapping,
modifiedSchema,
modifiedColumnNameMapping,
)
// Insert a record before applying the changeset
testClient.insertRecords(
testTable,
initialColumnNameMapping,
mapOf(
COLUMN_NAME_AB_RAW_ID to StringValue("fcc784dd-bf06-468e-ad59-666d5aaceae8"),
COLUMN_NAME_AB_EXTRACTED_AT to TimestampWithTimezoneValue("2025-01-22T00:00:00Z"),
COLUMN_NAME_AB_META to ObjectValue(linkedMapOf()),
COLUMN_NAME_AB_GENERATION_ID to IntegerValue(1),
"to_retain" to StringValue("to_retain original value"),
"to_change" to IntegerValue(42),
"to_drop" to StringValue("to_drop original value"),
),
)
client.applyChangeset(
modifiedStream,
modifiedColumnNameMapping,
testTable,
expectedSchema.columns,
changeset,
)
val postAlterationRecords = harness.readTableWithoutMetaColumns(testTable)
Assertions.assertEquals(
listOf(
mapOf(
"to_retain" to "to_retain original value",
// changed from int to string
"to_change" to "42",
// note the lack of `to_add` - new columns should be initialized to null
)
),
postAlterationRecords
.removeNulls()
.reverseColumnNameMapping(modifiedColumnNameMapping, airbyteMetaColumnMapping),
) {
"Expected records were not in the overwritten table."
}
val postAlterationDiscoveredSchema = client.discoverSchema(testTable)
val postAlterationChangeset =
client.computeChangeset(postAlterationDiscoveredSchema.columns, expectedSchema.columns)
assertTrue(
postAlterationChangeset.isNoop(),
"After applying the changeset, we should be a noop against the expected schema"
)
}
/**
* Utility method for a typical schema evolution test. Creates a table with [initialSchema]
* using [initialColumnNameMapping], then computes the column changeset using [modifiedSchema]
* and [modifiedColumnNameMapping]. This method does _not_ actually apply the changeset.
*
* Returns a tuple of `(discoveredInitialSchema, computedModifiedSchema, changeset)`.
*/
private suspend fun computeSchemaEvolution(
testTable: TableName,
initialSchema: ObjectType,
initialColumnNameMapping: ColumnNameMapping,
modifiedSchema: ObjectType,
modifiedColumnNameMapping: ColumnNameMapping,
): SchemaEvolutionComputation {
val initialStream =
Fixtures.createAppendStream(
namespace = testTable.namespace,
name = testTable.name,
schema = initialSchema,
)
val modifiedStream =
Fixtures.createAppendStream(
namespace = testTable.namespace,
name = testTable.name,
schema = modifiedSchema,
)
opsClient.createNamespace(testTable.namespace)
opsClient.createTable(
tableName = testTable,
columnNameMapping = initialColumnNameMapping,
stream = initialStream,
replace = false,
)
val actualSchema = client.discoverSchema(testTable)
val expectedSchema = client.computeSchema(modifiedStream, modifiedColumnNameMapping)
val columnChangeset = client.computeChangeset(actualSchema.columns, expectedSchema.columns)
return SchemaEvolutionComputation(
actualSchema,
expectedSchema,
columnChangeset,
)
}
data class SchemaEvolutionComputation(
val discoveredSchema: TableSchema,
val computedSchema: TableSchema,
val columnChangeset: ColumnChangeset,
)
}

View File

@@ -20,6 +20,10 @@ interface TestTableOperationsClient {
*/
suspend fun insertRecords(table: TableName, records: List<Map<String, AirbyteValue>>) = Unit
suspend fun insertRecords(table: TableName, vararg records: Map<String, AirbyteValue>) {
insertRecords(table, records.toList())
}
/**
* Reads all records from a table for test verification. Do not use in production code - this
* loads entire table into memory.

View File

@@ -1 +1 @@
version=0.1.77
version=0.1.78