1
0
mirror of synced 2025-12-19 18:14:56 -05:00

Bulk load cdk: improve component tests' upsert test coverage (#69338)

This commit is contained in:
Edward Gao
2025-11-19 16:43:33 -08:00
committed by GitHub
parent faada3189b
commit aacf63d66a
5 changed files with 415 additions and 62 deletions

View File

@@ -1,3 +1,7 @@
## Version 0.1.81
load cdk: components tests: more coverage on upsert
## Version 0.1.80 ## Version 0.1.80
**Extract CDK** **Extract CDK**

View File

@@ -35,6 +35,7 @@ import io.airbyte.cdk.load.table.ColumnNameMapping
import io.airbyte.cdk.load.table.TableName import io.airbyte.cdk.load.table.TableName
import io.airbyte.cdk.load.util.Jsons import io.airbyte.cdk.load.util.Jsons
import java.util.UUID import java.util.UUID
import org.junit.jupiter.api.Assertions
/** /**
* Common test fixtures and constants used across table operations test suites. Provides reusable * Common test fixtures and constants used across table operations test suites. Provides reusable
@@ -44,6 +45,7 @@ object TableOperationsFixtures {
// Common field names // Common field names
const val TEST_FIELD = "test" const val TEST_FIELD = "test"
const val ID_FIELD = "id" const val ID_FIELD = "id"
const val DESCRIPTION_FIELD = "description"
// Common schemas // Common schemas
val TEST_INTEGER_SCHEMA = ObjectType(linkedMapOf(TEST_FIELD to FieldType(IntegerType, true))) val TEST_INTEGER_SCHEMA = ObjectType(linkedMapOf(TEST_FIELD to FieldType(IntegerType, true)))
@@ -63,6 +65,7 @@ object TableOperationsFixtures {
ID_FIELD to FieldType(StringType, true), ID_FIELD to FieldType(StringType, true),
TEST_FIELD to FieldType(IntegerType, true), TEST_FIELD to FieldType(IntegerType, true),
CDC_DELETED_AT_COLUMN to FieldType(IntegerType, true), CDC_DELETED_AT_COLUMN to FieldType(IntegerType, true),
DESCRIPTION_FIELD to FieldType(StringType, true),
), ),
) )
@@ -113,6 +116,7 @@ object TableOperationsFixtures {
ID_FIELD to ID_FIELD, ID_FIELD to ID_FIELD,
TEST_FIELD to TEST_FIELD, TEST_FIELD to TEST_FIELD,
CDC_DELETED_AT_COLUMN to CDC_DELETED_AT_COLUMN, CDC_DELETED_AT_COLUMN to CDC_DELETED_AT_COLUMN,
DESCRIPTION_FIELD to DESCRIPTION_FIELD,
), ),
) )
@@ -198,12 +202,16 @@ object TableOperationsFixtures {
val UPSERT_SOURCE_RECORDS: List<Map<String, AirbyteValue>> = val UPSERT_SOURCE_RECORDS: List<Map<String, AirbyteValue>> =
listOf( listOf(
inputRecord( inputRecord(
"5499cdef-1411-4c7e-987c-b22fe1284a49", "109d38b9-e001-4f62-86ce-4a457ab013a1",
"2025-01-23T00:00:00Z", "2025-01-23T00:00:00Z",
linkedMapOf(), linkedMapOf(),
generationId = 1, generationId = 1,
ID_FIELD to StringValue("2"), ID_FIELD to StringValue("0"),
TEST_FIELD to IntegerValue(1001), TEST_FIELD to IntegerValue(1000),
DESCRIPTION_FIELD to
StringValue(
"New record, no existing record. Upsert should insert this record."
),
), ),
inputRecord( inputRecord(
"295eb05d-da91-4cf5-8d26-a2bf8b6e8ef7", "295eb05d-da91-4cf5-8d26-a2bf8b6e8ef7",
@@ -213,24 +221,22 @@ object TableOperationsFixtures {
ID_FIELD to StringValue("3"), ID_FIELD to StringValue("3"),
TEST_FIELD to IntegerValue(1002), TEST_FIELD to IntegerValue(1002),
CDC_DELETED_AT_COLUMN to IntegerValue(1234), CDC_DELETED_AT_COLUMN to IntegerValue(1234),
DESCRIPTION_FIELD to
StringValue(
"New deletion record with later cursor and extracted_at than existing record. Upsert should delete the existing record."
),
), ),
inputRecord(
"9110dcf0-2171-4daa-a934-695163950d98",
"2025-01-23T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("4"),
TEST_FIELD to IntegerValue(4),
),
// There are two records with id=5, which differ only in extracted_at.
// The second record has non-null deleted_at, so we expect the record to be deleted.
inputRecord( inputRecord(
"35295b83-302f-49c3-af0f-cf093bc46def", "35295b83-302f-49c3-af0f-cf093bc46def",
"2025-01-23T00:00:00Z", "2025-01-23T00:00:00Z",
linkedMapOf(), linkedMapOf(),
generationId = 1, generationId = 1,
ID_FIELD to StringValue("5"), ID_FIELD to StringValue("5"),
TEST_FIELD to IntegerValue(1004), TEST_FIELD to IntegerValue(5),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with no existing record, but there's a second incoming deletion record with later extracted_at. Upsert should discard this record."
),
), ),
inputRecord( inputRecord(
"5773cf6f-f8b7-48f2-8f23-728a4a4eb56d", "5773cf6f-f8b7-48f2-8f23-728a4a4eb56d",
@@ -238,8 +244,155 @@ object TableOperationsFixtures {
linkedMapOf(), linkedMapOf(),
generationId = 1, generationId = 1,
ID_FIELD to StringValue("5"), ID_FIELD to StringValue("5"),
TEST_FIELD to IntegerValue(1005), TEST_FIELD to IntegerValue(5),
CDC_DELETED_AT_COLUMN to IntegerValue(1234), CDC_DELETED_AT_COLUMN to IntegerValue(1234),
DESCRIPTION_FIELD to
StringValue("Incoming deletion record. This record should be discarded."),
),
inputRecord(
"1c4d0fc5-1e1e-4f7e-87c8-a46a722ee984",
"2025-01-23T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("6"),
TEST_FIELD to IntegerValue(6),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with no existing record, but there's a second incoming record with later extracted_at. Upsert should discard this record."
),
),
inputRecord(
"2ddf5ee9-08a1-4319-824d-187d878edac5",
"2025-01-23T01:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("6"),
TEST_FIELD to IntegerValue(6),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with no existing record. Upsert should insert this record."
),
),
inputRecord(
"e8379b8f-e437-4d55-9d16-76f5e6e942d6",
"2025-01-23T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("7"),
TEST_FIELD to IntegerValue(7),
CDC_DELETED_AT_COLUMN to IntegerValue(1234),
DESCRIPTION_FIELD to
StringValue(
"Incoming deletion record, but there's a second incoming record with later extracted_at. Upsert should discard this record."
),
),
inputRecord(
"e56fc753-b55a-439b-9b16-528596e2ca3a",
"2025-01-23T01:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("7"),
TEST_FIELD to IntegerValue(7),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with no existing record. Upsert should insert this record."
),
),
inputRecord(
"645efad2-f1e6-438a-b29f-15ae5d096015",
"2025-01-23T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("8"),
TEST_FIELD to IntegerValue(8),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with earlier cursor and later extracted_at than existing record. Upsert should discard this record (prefer cursor over extracted_at)."
),
),
inputRecord(
"f74b8ddb-45d0-4e30-af25-66885e57a0e6",
"2025-01-23T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("9"),
TEST_FIELD to IntegerValue(9),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with equal cursor and later extracted_at than existing record. Upsert should update with this record (break ties with extracted_at)."
),
),
inputRecord(
"877cceb6-23a6-4e7b-92e3-59ca46f8fd6c",
"2025-01-23T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("10"),
TEST_FIELD to IntegerValue(1010),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with later cursor and later extracted_at than existing record. Upsert should update with this record."
),
),
inputRecord(
"20410b34-7bb0-4ba5-9c61-0dd23bfeee6d",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("11"),
TEST_FIELD to IntegerValue(11),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with earlier cursor and equal extracted_at than existing record. Upsert should discard this record."
),
),
inputRecord(
"70fdf9b0-ade0-4d30-9131-ba217ef506da",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("12"),
TEST_FIELD to IntegerValue(1012),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with later cursor and equal extracted_at than existing record. Upsert should update with this record."
),
),
inputRecord(
"20949d9b-8ffc-4497-85e4-cda14abc4049",
"2025-01-21T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("13"),
TEST_FIELD to IntegerValue(13),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with earlier cursor and earlier extracted_at than existing record. Upsert should discard this record."
),
),
inputRecord(
"5808a0ef-3c6d-4d9a-851c-edbbc4852e18",
"2025-01-21T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("14"),
TEST_FIELD to IntegerValue(14),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with equal cursor and earlier extracted_at than existing record. Upsert should discard this record."
),
),
inputRecord(
"373127a7-a40e-4e23-890b-1a52114686ee",
"2025-01-21T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("15"),
TEST_FIELD to IntegerValue(1015),
DESCRIPTION_FIELD to
StringValue(
"Incoming record with later cursor and earlier extracted_at than existing record. Upsert should update with this record."
),
), ),
) )
@@ -249,7 +402,6 @@ object TableOperationsFixtures {
*/ */
val UPSERT_TARGET_RECORDS: List<Map<String, AirbyteValue>> = val UPSERT_TARGET_RECORDS: List<Map<String, AirbyteValue>> =
listOf( listOf(
// id=1 has no incoming record, so it should remain untouched.
inputRecord( inputRecord(
"6317026e-12f9-4713-976e-ce43901bd7ce", "6317026e-12f9-4713-976e-ce43901bd7ce",
"2025-01-22T00:00:00Z", "2025-01-22T00:00:00Z",
@@ -257,18 +409,11 @@ object TableOperationsFixtures {
1, 1,
ID_FIELD to StringValue("1"), ID_FIELD to StringValue("1"),
TEST_FIELD to IntegerValue(1), TEST_FIELD to IntegerValue(1),
DESCRIPTION_FIELD to
StringValue(
"Existing record, no incoming record. Upsert should preserve this record."
),
), ),
// id=2 has a normal incoming record, which will overwrite this one.
inputRecord(
"46159e3a-9bf9-42d9-8bb7-9f47d37bd663",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("2"),
TEST_FIELD to IntegerValue(2),
),
// id=3 has an incoming record with nonnull deleted_at, so this record should be
// deleted.
// TODO what about destinations with CDC soft deletes? // TODO what about destinations with CDC soft deletes?
// https://github.com/airbytehq/airbyte-internal-issues/issues/14911 // https://github.com/airbytehq/airbyte-internal-issues/issues/14911
inputRecord( inputRecord(
@@ -278,22 +423,121 @@ object TableOperationsFixtures {
generationId = 1, generationId = 1,
ID_FIELD to StringValue("3"), ID_FIELD to StringValue("3"),
TEST_FIELD to IntegerValue(3), TEST_FIELD to IntegerValue(3),
DESCRIPTION_FIELD to
StringValue(
"Existing record with incoming deletion record with later cursor and extracted_at. Upsert should delete this record."
),
), ),
// id=4 has an incoming record with the same cursor value (test=4) but later
// extracted_at.
// That record should replace this one.
inputRecord( inputRecord(
"02e22e03-587f-4d30-9718-994357407b65", "8086bdd6-6cf5-479e-a819-e5f347373804",
"2025-01-22T00:00:00Z", "2025-01-22T00:00:00Z",
linkedMapOf(), linkedMapOf(),
generationId = 1, generationId = 1,
ID_FIELD to StringValue("4"), ID_FIELD to StringValue("8"),
TEST_FIELD to IntegerValue(4), TEST_FIELD to IntegerValue(1008),
DESCRIPTION_FIELD to
StringValue(
"Existing record with later cursor and earlier extracted_at than incoming record. Upsert should preserve this record (prefer cursor over extracted_at)."
),
),
inputRecord(
"b60e8b33-32f4-4da0-934b-87d14d9ed354",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("9"),
TEST_FIELD to IntegerValue(9),
DESCRIPTION_FIELD to
StringValue(
"Existing record with equal cursor and earlier extracted_at than incoming record. Upsert should discard this record (break ties with extracted_at)."
),
),
inputRecord(
"e79d163e-b594-4016-89b9-a85e385778bd",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("10"),
TEST_FIELD to IntegerValue(10),
DESCRIPTION_FIELD to
StringValue(
"Existing record with earlier cursor and earlier extracted_at than incoming record. Upsert should discard this record."
),
),
inputRecord(
"3d345fb2-254e-4968-89a6-f896a05fb831",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("11"),
TEST_FIELD to IntegerValue(1011),
DESCRIPTION_FIELD to
StringValue(
"Existing record with later cursor and equal extracted_at than incoming record. Upsert should preserve this record."
),
),
inputRecord(
"9c5262e6-44e3-41de-9a5a-c31bc0efdb68",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("12"),
TEST_FIELD to IntegerValue(12),
DESCRIPTION_FIELD to
StringValue(
"Existing record with earlier cursor and equal extracted_at than incoming record. Upsert should discard this record."
),
),
inputRecord(
"739a9347-267b-48af-a172-2030320e2193",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("13"),
TEST_FIELD to IntegerValue(1013),
DESCRIPTION_FIELD to
StringValue(
"Existing record with later cursor and later extracted_at than incoming record. Upsert should preserve this record."
),
),
inputRecord(
"70243c59-eadb-4840-90fa-be4ed57609fc",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("14"),
TEST_FIELD to IntegerValue(14),
DESCRIPTION_FIELD to
StringValue(
"Existing record with equal cursor and later extracted_at than incoming record. Upsert should preserve this record."
),
),
inputRecord(
"966e89ec-c0d2-4358-b8e5-bf9c713f5396",
"2025-01-22T00:00:00Z",
linkedMapOf(),
generationId = 1,
ID_FIELD to StringValue("15"),
TEST_FIELD to IntegerValue(15),
DESCRIPTION_FIELD to
StringValue(
"Existing record with earlier cursor and later extracted_at than existing record. Upsert should discard this record."
),
), ),
) )
val UPSERT_EXPECTED_RECORDS: List<Map<String, Any>> = val UPSERT_EXPECTED_RECORDS: List<Map<String, Any>> =
listOf( listOf(
outputRecord(
"109d38b9-e001-4f62-86ce-4a457ab013a1",
"2025-01-23T00:00:00Z",
linkedMapOf(),
generationId = 1L,
ID_FIELD to "0",
TEST_FIELD to 1000L,
DESCRIPTION_FIELD to
"New record, no existing record. Upsert should insert this record.",
),
outputRecord( outputRecord(
"6317026e-12f9-4713-976e-ce43901bd7ce", "6317026e-12f9-4713-976e-ce43901bd7ce",
"2025-01-22T00:00:00Z", "2025-01-22T00:00:00Z",
@@ -301,22 +545,108 @@ object TableOperationsFixtures {
generationId = 1L, generationId = 1L,
ID_FIELD to "1", ID_FIELD to "1",
TEST_FIELD to 1L, TEST_FIELD to 1L,
DESCRIPTION_FIELD to
"Existing record, no incoming record. Upsert should preserve this record.",
), ),
outputRecord( outputRecord(
"5499cdef-1411-4c7e-987c-b22fe1284a49", "2ddf5ee9-08a1-4319-824d-187d878edac5",
"2025-01-23T01:00:00Z",
linkedMapOf(),
1L,
ID_FIELD to "6",
TEST_FIELD to 6L,
DESCRIPTION_FIELD to
"Incoming record with no existing record. Upsert should insert this record.",
),
outputRecord(
"e56fc753-b55a-439b-9b16-528596e2ca3a",
"2025-01-23T01:00:00Z",
linkedMapOf(),
1L,
ID_FIELD to "7",
TEST_FIELD to 7L,
DESCRIPTION_FIELD to
"Incoming record with no existing record. Upsert should insert this record.",
),
outputRecord(
"8086bdd6-6cf5-479e-a819-e5f347373804",
"2025-01-22T00:00:00Z",
linkedMapOf(),
1L,
ID_FIELD to "8",
TEST_FIELD to 1008L,
DESCRIPTION_FIELD to
"Existing record with later cursor and earlier extracted_at than incoming record. Upsert should preserve this record (prefer cursor over extracted_at).",
),
outputRecord(
"f74b8ddb-45d0-4e30-af25-66885e57a0e6",
"2025-01-23T00:00:00Z", "2025-01-23T00:00:00Z",
linkedMapOf(), linkedMapOf(),
1L, 1L,
ID_FIELD to "2", ID_FIELD to "9",
TEST_FIELD to 1001L, TEST_FIELD to 9L,
DESCRIPTION_FIELD to
"Incoming record with equal cursor and later extracted_at than existing record. Upsert should update with this record (break ties with extracted_at).",
), ),
outputRecord( outputRecord(
"9110dcf0-2171-4daa-a934-695163950d98", "877cceb6-23a6-4e7b-92e3-59ca46f8fd6c",
"2025-01-23T00:00:00Z", "2025-01-23T00:00:00Z",
linkedMapOf(), linkedMapOf(),
1L, 1L,
ID_FIELD to "4", ID_FIELD to "10",
TEST_FIELD to 4L, TEST_FIELD to 1010L,
DESCRIPTION_FIELD to
"Incoming record with later cursor and later extracted_at than existing record. Upsert should update with this record.",
),
outputRecord(
"3d345fb2-254e-4968-89a6-f896a05fb831",
"2025-01-22T00:00:00Z",
linkedMapOf(),
1L,
ID_FIELD to "11",
TEST_FIELD to 1011L,
DESCRIPTION_FIELD to
"Existing record with later cursor and equal extracted_at than incoming record. Upsert should preserve this record.",
),
outputRecord(
"70fdf9b0-ade0-4d30-9131-ba217ef506da",
"2025-01-22T00:00:00Z",
linkedMapOf(),
1L,
ID_FIELD to "12",
TEST_FIELD to 1012L,
DESCRIPTION_FIELD to
"Incoming record with later cursor and equal extracted_at than existing record. Upsert should update with this record.",
),
outputRecord(
"739a9347-267b-48af-a172-2030320e2193",
"2025-01-22T00:00:00Z",
linkedMapOf(),
1L,
ID_FIELD to "13",
TEST_FIELD to 1013L,
DESCRIPTION_FIELD to
"Existing record with later cursor and later extracted_at than incoming record. Upsert should preserve this record.",
),
outputRecord(
"70243c59-eadb-4840-90fa-be4ed57609fc",
"2025-01-22T00:00:00Z",
linkedMapOf(),
1L,
ID_FIELD to "14",
TEST_FIELD to 14L,
DESCRIPTION_FIELD to
"Existing record with equal cursor and later extracted_at than incoming record. Upsert should preserve this record.",
),
outputRecord(
"373127a7-a40e-4e23-890b-1a52114686ee",
"2025-01-21T00:00:00Z",
linkedMapOf(),
1L,
ID_FIELD to "15",
TEST_FIELD to 1015L,
DESCRIPTION_FIELD to
"Incoming record with later cursor and earlier extracted_at than existing record. Upsert should update with this record.",
), ),
) )
@@ -387,7 +717,11 @@ object TableOperationsFixtures {
namespaceMapper = NamespaceMapper(), namespaceMapper = NamespaceMapper(),
) )
fun <V> List<Map<String, V>>.sortByTestField() = this.sortedBy { it["test"] as Long } fun <V> List<Map<String, V>>.sortBy(key: String) =
// sketchy unchecked cast is intentional, we're assuming that the tests are written such
// that the sort key is always comparable.
// In practice, it's generally some sort of ID column (int/string/etc.).
@Suppress("UNCHECKED_CAST") this.sortedBy { it[key] as Comparable<Any> }
fun <V> List<Map<String, V>>.applyColumnNameMapping(mapping: ColumnNameMapping) = fun <V> List<Map<String, V>>.applyColumnNameMapping(mapping: ColumnNameMapping) =
map { record -> map { record ->
@@ -445,4 +779,16 @@ object TableOperationsFixtures {
COLUMN_NAME_AB_GENERATION_ID to generationId, COLUMN_NAME_AB_GENERATION_ID to generationId,
*pairs, *pairs,
) )
fun assertEquals(
expectedRecords: List<Map<String, Any>>,
actualRecords: List<Map<String, Any>>,
sortKey: String,
message: String,
) =
Assertions.assertEquals(
expectedRecords.sortBy(sortKey).joinToString("\n"),
actualRecords.sortBy(sortKey).joinToString("\n"),
message,
)
} }

View File

@@ -5,9 +5,9 @@
package io.airbyte.cdk.load.component package io.airbyte.cdk.load.component
import io.airbyte.cdk.load.component.TableOperationsFixtures as Fixtures import io.airbyte.cdk.load.component.TableOperationsFixtures as Fixtures
import io.airbyte.cdk.load.component.TableOperationsFixtures.assertEquals
import io.airbyte.cdk.load.component.TableOperationsFixtures.insertRecords import io.airbyte.cdk.load.component.TableOperationsFixtures.insertRecords
import io.airbyte.cdk.load.component.TableOperationsFixtures.reverseColumnNameMapping import io.airbyte.cdk.load.component.TableOperationsFixtures.reverseColumnNameMapping
import io.airbyte.cdk.load.component.TableOperationsFixtures.sortByTestField
import io.airbyte.cdk.load.data.AirbyteValue import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.ObjectValue
@@ -402,13 +402,14 @@ interface TableOperationsSuite {
val overwrittenTableRecords = harness.readTableWithoutMetaColumns(targetTable) val overwrittenTableRecords = harness.readTableWithoutMetaColumns(targetTable)
assertEquals( assertEquals(
expectedRecords.sortByTestField(), expectedRecords,
overwrittenTableRecords overwrittenTableRecords.reverseColumnNameMapping(
.reverseColumnNameMapping(columnNameMapping, airbyteMetaColumnMapping) columnNameMapping,
.sortByTestField(), airbyteMetaColumnMapping
) { ),
"Expected records were not in the overwritten table." "test",
} "Expected records were not in the overwritten table.",
)
assert(!client.tableExists(sourceTable)) { assert(!client.tableExists(sourceTable)) {
"Source table: ${sourceTable.namespace}.${sourceTable.name} was not dropped as expected." "Source table: ${sourceTable.namespace}.${sourceTable.name} was not dropped as expected."
@@ -473,13 +474,14 @@ interface TableOperationsSuite {
val copyTableRecords = harness.readTableWithoutMetaColumns(targetTable) val copyTableRecords = harness.readTableWithoutMetaColumns(targetTable)
assertEquals( assertEquals(
expectedRecords.sortByTestField(), expectedRecords,
copyTableRecords copyTableRecords.reverseColumnNameMapping(
.reverseColumnNameMapping(columnNameMapping, airbyteMetaColumnMapping) columnNameMapping,
.sortByTestField(), airbyteMetaColumnMapping
) { ),
"test",
"Expected source records were not copied to the target table." "Expected source records were not copied to the target table."
} )
} finally { } finally {
harness.cleanupTable(sourceTable) harness.cleanupTable(sourceTable)
harness.cleanupTable(targetTable) harness.cleanupTable(targetTable)
@@ -560,13 +562,14 @@ interface TableOperationsSuite {
val upsertTableRecords = testClient.readTable(targetTable) val upsertTableRecords = testClient.readTable(targetTable)
assertEquals( assertEquals(
expectedRecords.sortByTestField(), expectedRecords,
upsertTableRecords upsertTableRecords.reverseColumnNameMapping(
.reverseColumnNameMapping(columnNameMapping, airbyteMetaColumnMapping) columnNameMapping,
.sortByTestField(), airbyteMetaColumnMapping
) { ),
"id",
"Upserted table did not contain expected records." "Upserted table did not contain expected records."
} )
} finally { } finally {
harness.cleanupTable(sourceTable) harness.cleanupTable(sourceTable)
harness.cleanupTable(targetTable) harness.cleanupTable(targetTable)

View File

@@ -1 +1 @@
version=0.1.80 version=0.1.81

View File

@@ -192,7 +192,7 @@ allprojects {
if (project.hasProperty('JunitMethodExecutionTimeout')) { if (project.hasProperty('JunitMethodExecutionTimeout')) {
junitMethodExecutionTimeout = project.property('JunitMethodExecutionTimeout').toString() junitMethodExecutionTimeout = project.property('JunitMethodExecutionTimeout').toString()
} else { } else {
junitMethodExecutionTimeout = '1 m' junitMethodExecutionTimeout = '1m'
} }
systemProperty 'junit.jupiter.execution.timeout.default', junitMethodExecutionTimeout systemProperty 'junit.jupiter.execution.timeout.default', junitMethodExecutionTimeout