diff --git a/airbyte-cdk/bulk/changelog.md b/airbyte-cdk/bulk/changelog.md index 3cd6975d9c8..94892237d77 100644 --- a/airbyte-cdk/bulk/changelog.md +++ b/airbyte-cdk/bulk/changelog.md @@ -1,3 +1,7 @@ +## Version 0.1.81 + +load cdk: components tests: more coverage on upsert + ## Version 0.1.80 **Extract CDK** diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsFixtures.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsFixtures.kt index 9672dc980cd..1579d205a32 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsFixtures.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsFixtures.kt @@ -35,6 +35,7 @@ import io.airbyte.cdk.load.table.ColumnNameMapping import io.airbyte.cdk.load.table.TableName import io.airbyte.cdk.load.util.Jsons import java.util.UUID +import org.junit.jupiter.api.Assertions /** * Common test fixtures and constants used across table operations test suites. Provides reusable @@ -44,6 +45,7 @@ object TableOperationsFixtures { // Common field names const val TEST_FIELD = "test" const val ID_FIELD = "id" + const val DESCRIPTION_FIELD = "description" // Common schemas val TEST_INTEGER_SCHEMA = ObjectType(linkedMapOf(TEST_FIELD to FieldType(IntegerType, true))) @@ -63,6 +65,7 @@ object TableOperationsFixtures { ID_FIELD to FieldType(StringType, true), TEST_FIELD to FieldType(IntegerType, true), CDC_DELETED_AT_COLUMN to FieldType(IntegerType, true), + DESCRIPTION_FIELD to FieldType(StringType, true), ), ) @@ -113,6 +116,7 @@ object TableOperationsFixtures { ID_FIELD to ID_FIELD, TEST_FIELD to TEST_FIELD, CDC_DELETED_AT_COLUMN to CDC_DELETED_AT_COLUMN, + DESCRIPTION_FIELD to DESCRIPTION_FIELD, ), ) @@ -198,12 +202,16 @@ object TableOperationsFixtures { val UPSERT_SOURCE_RECORDS: List> = listOf( inputRecord( - "5499cdef-1411-4c7e-987c-b22fe1284a49", + "109d38b9-e001-4f62-86ce-4a457ab013a1", "2025-01-23T00:00:00Z", linkedMapOf(), generationId = 1, - ID_FIELD to StringValue("2"), - TEST_FIELD to IntegerValue(1001), + ID_FIELD to StringValue("0"), + TEST_FIELD to IntegerValue(1000), + DESCRIPTION_FIELD to + StringValue( + "New record, no existing record. Upsert should insert this record." + ), ), inputRecord( "295eb05d-da91-4cf5-8d26-a2bf8b6e8ef7", @@ -213,24 +221,22 @@ object TableOperationsFixtures { ID_FIELD to StringValue("3"), TEST_FIELD to IntegerValue(1002), CDC_DELETED_AT_COLUMN to IntegerValue(1234), + DESCRIPTION_FIELD to + StringValue( + "New deletion record with later cursor and extracted_at than existing record. Upsert should delete the existing record." + ), ), - inputRecord( - "9110dcf0-2171-4daa-a934-695163950d98", - "2025-01-23T00:00:00Z", - linkedMapOf(), - generationId = 1, - ID_FIELD to StringValue("4"), - TEST_FIELD to IntegerValue(4), - ), - // There are two records with id=5, which differ only in extracted_at. - // The second record has non-null deleted_at, so we expect the record to be deleted. inputRecord( "35295b83-302f-49c3-af0f-cf093bc46def", "2025-01-23T00:00:00Z", linkedMapOf(), generationId = 1, ID_FIELD to StringValue("5"), - TEST_FIELD to IntegerValue(1004), + TEST_FIELD to IntegerValue(5), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with no existing record, but there's a second incoming deletion record with later extracted_at. Upsert should discard this record." + ), ), inputRecord( "5773cf6f-f8b7-48f2-8f23-728a4a4eb56d", @@ -238,8 +244,155 @@ object TableOperationsFixtures { linkedMapOf(), generationId = 1, ID_FIELD to StringValue("5"), - TEST_FIELD to IntegerValue(1005), + TEST_FIELD to IntegerValue(5), CDC_DELETED_AT_COLUMN to IntegerValue(1234), + DESCRIPTION_FIELD to + StringValue("Incoming deletion record. This record should be discarded."), + ), + inputRecord( + "1c4d0fc5-1e1e-4f7e-87c8-a46a722ee984", + "2025-01-23T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("6"), + TEST_FIELD to IntegerValue(6), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with no existing record, but there's a second incoming record with later extracted_at. Upsert should discard this record." + ), + ), + inputRecord( + "2ddf5ee9-08a1-4319-824d-187d878edac5", + "2025-01-23T01:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("6"), + TEST_FIELD to IntegerValue(6), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with no existing record. Upsert should insert this record." + ), + ), + inputRecord( + "e8379b8f-e437-4d55-9d16-76f5e6e942d6", + "2025-01-23T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("7"), + TEST_FIELD to IntegerValue(7), + CDC_DELETED_AT_COLUMN to IntegerValue(1234), + DESCRIPTION_FIELD to + StringValue( + "Incoming deletion record, but there's a second incoming record with later extracted_at. Upsert should discard this record." + ), + ), + inputRecord( + "e56fc753-b55a-439b-9b16-528596e2ca3a", + "2025-01-23T01:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("7"), + TEST_FIELD to IntegerValue(7), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with no existing record. Upsert should insert this record." + ), + ), + inputRecord( + "645efad2-f1e6-438a-b29f-15ae5d096015", + "2025-01-23T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("8"), + TEST_FIELD to IntegerValue(8), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with earlier cursor and later extracted_at than existing record. Upsert should discard this record (prefer cursor over extracted_at)." + ), + ), + inputRecord( + "f74b8ddb-45d0-4e30-af25-66885e57a0e6", + "2025-01-23T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("9"), + TEST_FIELD to IntegerValue(9), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with equal cursor and later extracted_at than existing record. Upsert should update with this record (break ties with extracted_at)." + ), + ), + inputRecord( + "877cceb6-23a6-4e7b-92e3-59ca46f8fd6c", + "2025-01-23T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("10"), + TEST_FIELD to IntegerValue(1010), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with later cursor and later extracted_at than existing record. Upsert should update with this record." + ), + ), + inputRecord( + "20410b34-7bb0-4ba5-9c61-0dd23bfeee6d", + "2025-01-22T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("11"), + TEST_FIELD to IntegerValue(11), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with earlier cursor and equal extracted_at than existing record. Upsert should discard this record." + ), + ), + inputRecord( + "70fdf9b0-ade0-4d30-9131-ba217ef506da", + "2025-01-22T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("12"), + TEST_FIELD to IntegerValue(1012), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with later cursor and equal extracted_at than existing record. Upsert should update with this record." + ), + ), + inputRecord( + "20949d9b-8ffc-4497-85e4-cda14abc4049", + "2025-01-21T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("13"), + TEST_FIELD to IntegerValue(13), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with earlier cursor and earlier extracted_at than existing record. Upsert should discard this record." + ), + ), + inputRecord( + "5808a0ef-3c6d-4d9a-851c-edbbc4852e18", + "2025-01-21T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("14"), + TEST_FIELD to IntegerValue(14), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with equal cursor and earlier extracted_at than existing record. Upsert should discard this record." + ), + ), + inputRecord( + "373127a7-a40e-4e23-890b-1a52114686ee", + "2025-01-21T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("15"), + TEST_FIELD to IntegerValue(1015), + DESCRIPTION_FIELD to + StringValue( + "Incoming record with later cursor and earlier extracted_at than existing record. Upsert should update with this record." + ), ), ) @@ -249,7 +402,6 @@ object TableOperationsFixtures { */ val UPSERT_TARGET_RECORDS: List> = listOf( - // id=1 has no incoming record, so it should remain untouched. inputRecord( "6317026e-12f9-4713-976e-ce43901bd7ce", "2025-01-22T00:00:00Z", @@ -257,18 +409,11 @@ object TableOperationsFixtures { 1, ID_FIELD to StringValue("1"), TEST_FIELD to IntegerValue(1), + DESCRIPTION_FIELD to + StringValue( + "Existing record, no incoming record. Upsert should preserve this record." + ), ), - // id=2 has a normal incoming record, which will overwrite this one. - inputRecord( - "46159e3a-9bf9-42d9-8bb7-9f47d37bd663", - "2025-01-22T00:00:00Z", - linkedMapOf(), - generationId = 1, - ID_FIELD to StringValue("2"), - TEST_FIELD to IntegerValue(2), - ), - // id=3 has an incoming record with nonnull deleted_at, so this record should be - // deleted. // TODO what about destinations with CDC soft deletes? // https://github.com/airbytehq/airbyte-internal-issues/issues/14911 inputRecord( @@ -278,22 +423,121 @@ object TableOperationsFixtures { generationId = 1, ID_FIELD to StringValue("3"), TEST_FIELD to IntegerValue(3), + DESCRIPTION_FIELD to + StringValue( + "Existing record with incoming deletion record with later cursor and extracted_at. Upsert should delete this record." + ), ), - // id=4 has an incoming record with the same cursor value (test=4) but later - // extracted_at. - // That record should replace this one. inputRecord( - "02e22e03-587f-4d30-9718-994357407b65", + "8086bdd6-6cf5-479e-a819-e5f347373804", "2025-01-22T00:00:00Z", linkedMapOf(), generationId = 1, - ID_FIELD to StringValue("4"), - TEST_FIELD to IntegerValue(4), + ID_FIELD to StringValue("8"), + TEST_FIELD to IntegerValue(1008), + DESCRIPTION_FIELD to + StringValue( + "Existing record with later cursor and earlier extracted_at than incoming record. Upsert should preserve this record (prefer cursor over extracted_at)." + ), + ), + inputRecord( + "b60e8b33-32f4-4da0-934b-87d14d9ed354", + "2025-01-22T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("9"), + TEST_FIELD to IntegerValue(9), + DESCRIPTION_FIELD to + StringValue( + "Existing record with equal cursor and earlier extracted_at than incoming record. Upsert should discard this record (break ties with extracted_at)." + ), + ), + inputRecord( + "e79d163e-b594-4016-89b9-a85e385778bd", + "2025-01-22T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("10"), + TEST_FIELD to IntegerValue(10), + DESCRIPTION_FIELD to + StringValue( + "Existing record with earlier cursor and earlier extracted_at than incoming record. Upsert should discard this record." + ), + ), + inputRecord( + "3d345fb2-254e-4968-89a6-f896a05fb831", + "2025-01-22T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("11"), + TEST_FIELD to IntegerValue(1011), + DESCRIPTION_FIELD to + StringValue( + "Existing record with later cursor and equal extracted_at than incoming record. Upsert should preserve this record." + ), + ), + inputRecord( + "9c5262e6-44e3-41de-9a5a-c31bc0efdb68", + "2025-01-22T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("12"), + TEST_FIELD to IntegerValue(12), + DESCRIPTION_FIELD to + StringValue( + "Existing record with earlier cursor and equal extracted_at than incoming record. Upsert should discard this record." + ), + ), + inputRecord( + "739a9347-267b-48af-a172-2030320e2193", + "2025-01-22T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("13"), + TEST_FIELD to IntegerValue(1013), + DESCRIPTION_FIELD to + StringValue( + "Existing record with later cursor and later extracted_at than incoming record. Upsert should preserve this record." + ), + ), + inputRecord( + "70243c59-eadb-4840-90fa-be4ed57609fc", + "2025-01-22T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("14"), + TEST_FIELD to IntegerValue(14), + DESCRIPTION_FIELD to + StringValue( + "Existing record with equal cursor and later extracted_at than incoming record. Upsert should preserve this record." + ), + ), + inputRecord( + "966e89ec-c0d2-4358-b8e5-bf9c713f5396", + "2025-01-22T00:00:00Z", + linkedMapOf(), + generationId = 1, + ID_FIELD to StringValue("15"), + TEST_FIELD to IntegerValue(15), + DESCRIPTION_FIELD to + StringValue( + "Existing record with earlier cursor and later extracted_at than existing record. Upsert should discard this record." + ), ), ) val UPSERT_EXPECTED_RECORDS: List> = listOf( + outputRecord( + "109d38b9-e001-4f62-86ce-4a457ab013a1", + "2025-01-23T00:00:00Z", + linkedMapOf(), + generationId = 1L, + ID_FIELD to "0", + TEST_FIELD to 1000L, + DESCRIPTION_FIELD to + "New record, no existing record. Upsert should insert this record.", + ), outputRecord( "6317026e-12f9-4713-976e-ce43901bd7ce", "2025-01-22T00:00:00Z", @@ -301,22 +545,108 @@ object TableOperationsFixtures { generationId = 1L, ID_FIELD to "1", TEST_FIELD to 1L, + DESCRIPTION_FIELD to + "Existing record, no incoming record. Upsert should preserve this record.", ), outputRecord( - "5499cdef-1411-4c7e-987c-b22fe1284a49", + "2ddf5ee9-08a1-4319-824d-187d878edac5", + "2025-01-23T01:00:00Z", + linkedMapOf(), + 1L, + ID_FIELD to "6", + TEST_FIELD to 6L, + DESCRIPTION_FIELD to + "Incoming record with no existing record. Upsert should insert this record.", + ), + outputRecord( + "e56fc753-b55a-439b-9b16-528596e2ca3a", + "2025-01-23T01:00:00Z", + linkedMapOf(), + 1L, + ID_FIELD to "7", + TEST_FIELD to 7L, + DESCRIPTION_FIELD to + "Incoming record with no existing record. Upsert should insert this record.", + ), + outputRecord( + "8086bdd6-6cf5-479e-a819-e5f347373804", + "2025-01-22T00:00:00Z", + linkedMapOf(), + 1L, + ID_FIELD to "8", + TEST_FIELD to 1008L, + DESCRIPTION_FIELD to + "Existing record with later cursor and earlier extracted_at than incoming record. Upsert should preserve this record (prefer cursor over extracted_at).", + ), + outputRecord( + "f74b8ddb-45d0-4e30-af25-66885e57a0e6", "2025-01-23T00:00:00Z", linkedMapOf(), 1L, - ID_FIELD to "2", - TEST_FIELD to 1001L, + ID_FIELD to "9", + TEST_FIELD to 9L, + DESCRIPTION_FIELD to + "Incoming record with equal cursor and later extracted_at than existing record. Upsert should update with this record (break ties with extracted_at).", ), outputRecord( - "9110dcf0-2171-4daa-a934-695163950d98", + "877cceb6-23a6-4e7b-92e3-59ca46f8fd6c", "2025-01-23T00:00:00Z", linkedMapOf(), 1L, - ID_FIELD to "4", - TEST_FIELD to 4L, + ID_FIELD to "10", + TEST_FIELD to 1010L, + DESCRIPTION_FIELD to + "Incoming record with later cursor and later extracted_at than existing record. Upsert should update with this record.", + ), + outputRecord( + "3d345fb2-254e-4968-89a6-f896a05fb831", + "2025-01-22T00:00:00Z", + linkedMapOf(), + 1L, + ID_FIELD to "11", + TEST_FIELD to 1011L, + DESCRIPTION_FIELD to + "Existing record with later cursor and equal extracted_at than incoming record. Upsert should preserve this record.", + ), + outputRecord( + "70fdf9b0-ade0-4d30-9131-ba217ef506da", + "2025-01-22T00:00:00Z", + linkedMapOf(), + 1L, + ID_FIELD to "12", + TEST_FIELD to 1012L, + DESCRIPTION_FIELD to + "Incoming record with later cursor and equal extracted_at than existing record. Upsert should update with this record.", + ), + outputRecord( + "739a9347-267b-48af-a172-2030320e2193", + "2025-01-22T00:00:00Z", + linkedMapOf(), + 1L, + ID_FIELD to "13", + TEST_FIELD to 1013L, + DESCRIPTION_FIELD to + "Existing record with later cursor and later extracted_at than incoming record. Upsert should preserve this record.", + ), + outputRecord( + "70243c59-eadb-4840-90fa-be4ed57609fc", + "2025-01-22T00:00:00Z", + linkedMapOf(), + 1L, + ID_FIELD to "14", + TEST_FIELD to 14L, + DESCRIPTION_FIELD to + "Existing record with equal cursor and later extracted_at than incoming record. Upsert should preserve this record.", + ), + outputRecord( + "373127a7-a40e-4e23-890b-1a52114686ee", + "2025-01-21T00:00:00Z", + linkedMapOf(), + 1L, + ID_FIELD to "15", + TEST_FIELD to 1015L, + DESCRIPTION_FIELD to + "Incoming record with later cursor and earlier extracted_at than existing record. Upsert should update with this record.", ), ) @@ -387,7 +717,11 @@ object TableOperationsFixtures { namespaceMapper = NamespaceMapper(), ) - fun List>.sortByTestField() = this.sortedBy { it["test"] as Long } + fun List>.sortBy(key: String) = + // sketchy unchecked cast is intentional, we're assuming that the tests are written such + // that the sort key is always comparable. + // In practice, it's generally some sort of ID column (int/string/etc.). + @Suppress("UNCHECKED_CAST") this.sortedBy { it[key] as Comparable } fun List>.applyColumnNameMapping(mapping: ColumnNameMapping) = map { record -> @@ -445,4 +779,16 @@ object TableOperationsFixtures { COLUMN_NAME_AB_GENERATION_ID to generationId, *pairs, ) + + fun assertEquals( + expectedRecords: List>, + actualRecords: List>, + sortKey: String, + message: String, + ) = + Assertions.assertEquals( + expectedRecords.sortBy(sortKey).joinToString("\n"), + actualRecords.sortBy(sortKey).joinToString("\n"), + message, + ) } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsSuite.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsSuite.kt index e3fcb79096d..7babf55656a 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsSuite.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsSuite.kt @@ -5,9 +5,9 @@ package io.airbyte.cdk.load.component import io.airbyte.cdk.load.component.TableOperationsFixtures as Fixtures +import io.airbyte.cdk.load.component.TableOperationsFixtures.assertEquals import io.airbyte.cdk.load.component.TableOperationsFixtures.insertRecords import io.airbyte.cdk.load.component.TableOperationsFixtures.reverseColumnNameMapping -import io.airbyte.cdk.load.component.TableOperationsFixtures.sortByTestField import io.airbyte.cdk.load.data.AirbyteValue import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.ObjectValue @@ -402,13 +402,14 @@ interface TableOperationsSuite { val overwrittenTableRecords = harness.readTableWithoutMetaColumns(targetTable) assertEquals( - expectedRecords.sortByTestField(), - overwrittenTableRecords - .reverseColumnNameMapping(columnNameMapping, airbyteMetaColumnMapping) - .sortByTestField(), - ) { - "Expected records were not in the overwritten table." - } + expectedRecords, + overwrittenTableRecords.reverseColumnNameMapping( + columnNameMapping, + airbyteMetaColumnMapping + ), + "test", + "Expected records were not in the overwritten table.", + ) assert(!client.tableExists(sourceTable)) { "Source table: ${sourceTable.namespace}.${sourceTable.name} was not dropped as expected." @@ -473,13 +474,14 @@ interface TableOperationsSuite { val copyTableRecords = harness.readTableWithoutMetaColumns(targetTable) assertEquals( - expectedRecords.sortByTestField(), - copyTableRecords - .reverseColumnNameMapping(columnNameMapping, airbyteMetaColumnMapping) - .sortByTestField(), - ) { + expectedRecords, + copyTableRecords.reverseColumnNameMapping( + columnNameMapping, + airbyteMetaColumnMapping + ), + "test", "Expected source records were not copied to the target table." - } + ) } finally { harness.cleanupTable(sourceTable) harness.cleanupTable(targetTable) @@ -560,13 +562,14 @@ interface TableOperationsSuite { val upsertTableRecords = testClient.readTable(targetTable) assertEquals( - expectedRecords.sortByTestField(), - upsertTableRecords - .reverseColumnNameMapping(columnNameMapping, airbyteMetaColumnMapping) - .sortByTestField(), - ) { + expectedRecords, + upsertTableRecords.reverseColumnNameMapping( + columnNameMapping, + airbyteMetaColumnMapping + ), + "id", "Upserted table did not contain expected records." - } + ) } finally { harness.cleanupTable(sourceTable) harness.cleanupTable(targetTable) diff --git a/airbyte-cdk/bulk/version.properties b/airbyte-cdk/bulk/version.properties index d27370b02e6..6c53c6700d5 100644 --- a/airbyte-cdk/bulk/version.properties +++ b/airbyte-cdk/bulk/version.properties @@ -1 +1 @@ -version=0.1.80 +version=0.1.81 diff --git a/build.gradle b/build.gradle index d0896457e9e..b9ec2aceb44 100644 --- a/build.gradle +++ b/build.gradle @@ -192,7 +192,7 @@ allprojects { if (project.hasProperty('JunitMethodExecutionTimeout')) { junitMethodExecutionTimeout = project.property('JunitMethodExecutionTimeout').toString() } else { - junitMethodExecutionTimeout = '1 m' + junitMethodExecutionTimeout = '1m' } systemProperty 'junit.jupiter.execution.timeout.default', junitMethodExecutionTimeout