diff --git a/airbyte-cdk/bulk/changelog.md b/airbyte-cdk/bulk/changelog.md index 11b00070a4f..d7fc766d6b5 100644 --- a/airbyte-cdk/bulk/changelog.md +++ b/airbyte-cdk/bulk/changelog.md @@ -1,3 +1,7 @@ +## Version 0.1.89 + +load cdk: components tests: data coercion tests for int+number + ## Version 0.1.88 **Load CDK** diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/DataCoercionFixtures.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/DataCoercionFixtures.kt new file mode 100644 index 00000000000..4114408b6ce --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/DataCoercionFixtures.kt @@ -0,0 +1,475 @@ +/* + * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.component + +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.dataflow.transform.ValueCoercer +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason +import java.math.BigDecimal +import java.math.BigInteger +import org.junit.jupiter.params.provider.Arguments + +/* + * This file defines "interesting values" for all data types, along with expected behavior for those values. + * You're free to define your own values/behavior depending on the destination, but it's recommended + * that you try to match behavior to an existing fixture. + * + * Classes also include some convenience functions for JUnit. For example, you could annotate your + * method with: + * ```kotlin + * @ParameterizedTest + * @MethodSource("io.airbyte.cdk.load.component.DataCoercionIntegerFixtures#int64") + * ``` + * + * By convention, all fixtures are declared as: + * 1. One or more `val : List>` (each pair representing the input value, + * and the expected output value) + * 2. One or more `fun (): List = .toArgs()`, which can be provided to JUnit's MethodSource + * + * If you need to mutate fixtures in some way, you should reference the `val`, and use the `toArgs()` + * extension function to convert it to JUnit's Arguments class. See [DataCoercionIntegerFixtures.int64AsBigInteger] + * for an example. + */ + +object DataCoercionIntegerFixtures { + // "9".repeat(38) + val numeric38_0Max = bigint("99999999999999999999999999999999999999") + val numeric38_0Min = bigint("-99999999999999999999999999999999999999") + + const val ZERO = "0" + const val ONE = "1" + const val NEGATIVE_ONE = "-1" + const val FORTY_TWO = "42" + const val NEGATIVE_FORTY_TWO = "-42" + const val INT32_MAX = "int32 max" + const val INT32_MIN = "int32 min" + const val INT32_MAX_PLUS_ONE = "int32_max + 1" + const val INT32_MIN_MINUS_ONE = "int32_min - 1" + const val INT64_MAX = "int64 max" + const val INT64_MIN = "int64 min" + const val INT64_MAX_PLUS_ONE = "int64_max + 1" + const val INT64_MIN_MINUS_1 = "int64_min - 1" + const val NUMERIC_38_0_MAX = "numeric(38,0) max" + const val NUMERIC_38_0_MIN = "numeric(38,0) min" + const val NUMERIC_38_0_MAX_PLUS_ONE = "numeric(38,0)_max + 1" + const val NUMERIC_38_0_MIN_MINUS_ONE = "numeric(38,0)_min - 1" + + /** + * Many destinations use int64 to represent integers. In this case, we null out any value beyond + * Long.MIN/MAX_VALUE. + */ + val int64 = + listOf( + case(ZERO, IntegerValue(0), 0L), + case(ONE, IntegerValue(1), 1L), + case(NEGATIVE_ONE, IntegerValue(-1), -1L), + case(FORTY_TWO, IntegerValue(42), 42L), + case(NEGATIVE_FORTY_TWO, IntegerValue(-42), -42L), + // int32 bounds, and slightly out of bounds + case(INT32_MAX, IntegerValue(Integer.MAX_VALUE.toLong()), Integer.MAX_VALUE.toLong()), + case(INT32_MIN, IntegerValue(Integer.MIN_VALUE.toLong()), Integer.MIN_VALUE.toLong()), + case( + INT32_MAX_PLUS_ONE, + IntegerValue(Integer.MAX_VALUE.toLong() + 1), + Integer.MAX_VALUE.toLong() + 1 + ), + case( + INT32_MIN_MINUS_ONE, + IntegerValue(Integer.MIN_VALUE.toLong() - 1), + Integer.MIN_VALUE.toLong() - 1 + ), + // int64 bounds, and slightly out of bounds + case(INT64_MAX, IntegerValue(Long.MAX_VALUE), Long.MAX_VALUE), + case(INT64_MIN, IntegerValue(Long.MIN_VALUE), Long.MIN_VALUE), + // values out of int64 bounds are nulled + case( + INT64_MAX_PLUS_ONE, + IntegerValue(bigint(Long.MAX_VALUE) + BigInteger.ONE), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + INT64_MIN_MINUS_1, + IntegerValue(bigint(Long.MIN_VALUE) - BigInteger.ONE), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + // NUMERIC(38, 9) bounds, and slightly out of bounds + // (these are all out of bounds for an int64 value, so they all get nulled) + case( + NUMERIC_38_0_MAX, + IntegerValue(numeric38_0Max), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + NUMERIC_38_0_MIN, + IntegerValue(numeric38_0Min), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + NUMERIC_38_0_MAX_PLUS_ONE, + IntegerValue(numeric38_0Max + BigInteger.ONE), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + NUMERIC_38_0_MIN_MINUS_ONE, + IntegerValue(numeric38_0Min - BigInteger.ONE), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + ) + + /** + * Many destination warehouses represent integers as a fixed-point type with 38 digits of + * precision. In this case, we only need to null out numbers larger than `1e38 - 1` / smaller + * than `-1e38 + 1`. + */ + val numeric38_0 = + listOf( + case(ZERO, IntegerValue(0), bigint(0L)), + case(ONE, IntegerValue(1), bigint(1L)), + case(NEGATIVE_ONE, IntegerValue(-1), bigint(-1L)), + case(FORTY_TWO, IntegerValue(42), bigint(42L)), + case(NEGATIVE_FORTY_TWO, IntegerValue(-42), bigint(-42L)), + // int32 bounds, and slightly out of bounds + case( + INT32_MAX, + IntegerValue(Integer.MAX_VALUE.toLong()), + bigint(Integer.MAX_VALUE.toLong()) + ), + case( + INT32_MIN, + IntegerValue(Integer.MIN_VALUE.toLong()), + bigint(Integer.MIN_VALUE.toLong()) + ), + case( + INT32_MAX_PLUS_ONE, + IntegerValue(Integer.MAX_VALUE.toLong() + 1), + bigint(Integer.MAX_VALUE.toLong() + 1) + ), + case( + INT32_MIN_MINUS_ONE, + IntegerValue(Integer.MIN_VALUE.toLong() - 1), + bigint(Integer.MIN_VALUE.toLong() - 1) + ), + // int64 bounds, and slightly out of bounds + case(INT64_MAX, IntegerValue(Long.MAX_VALUE), bigint(Long.MAX_VALUE)), + case(INT64_MIN, IntegerValue(Long.MIN_VALUE), bigint(Long.MIN_VALUE)), + case( + INT64_MAX_PLUS_ONE, + IntegerValue(bigint(Long.MAX_VALUE) + BigInteger.ONE), + bigint(Long.MAX_VALUE) + BigInteger.ONE + ), + case( + INT64_MIN_MINUS_1, + IntegerValue(bigint(Long.MIN_VALUE) - BigInteger.ONE), + bigint(Long.MIN_VALUE) - BigInteger.ONE + ), + // NUMERIC(38, 9) bounds, and slightly out of bounds + case(NUMERIC_38_0_MAX, IntegerValue(numeric38_0Max), numeric38_0Max), + case(NUMERIC_38_0_MIN, IntegerValue(numeric38_0Min), numeric38_0Min), + // These values exceed the 38-digit range, so they get nulled out + case( + NUMERIC_38_0_MAX_PLUS_ONE, + IntegerValue(numeric38_0Max + BigInteger.ONE), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + NUMERIC_38_0_MIN_MINUS_ONE, + IntegerValue(numeric38_0Min - BigInteger.ONE), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + ) + + @JvmStatic fun int64() = int64.toArgs() + + /** + * Convenience fixture if your [TestTableOperationsClient] returns integers as [BigInteger] + * rather than [Long]. + */ + @JvmStatic + fun int64AsBigInteger() = + int64.map { it.copy(outputValue = it.outputValue?.let { bigint(it as Long) }) } + + /** + * Convenience fixture if your [TestTableOperationsClient] returns integers as [BigDecimal] + * rather than [Long]. + */ + @JvmStatic + fun int64AsBigDecimal() = + int64.map { it.copy(outputValue = it.outputValue?.let { BigDecimal.valueOf(it as Long) }) } + + @JvmStatic fun numeric38_0() = numeric38_0.toArgs() +} + +object DataCoercionNumberFixtures { + val numeric38_9Max = bigdec("99999999999999999999999999999.999999999") + val numeric38_9Min = bigdec("-99999999999999999999999999999.999999999") + + const val ZERO = "0" + const val ONE = "1" + const val NEGATIVE_ONE = "-1" + const val ONE_HUNDRED_TWENTY_THREE_POINT_FOUR = "123.4" + const val NEGATIVE_ONE_HUNDRED_TWENTY_THREE_POINT_FOUR = "123.4" + const val POSITIVE_HIGH_PRECISION_FLOAT = "positive high-precision float" + const val NEGATIVE_HIGH_PRECISION_FLOAT = "negative high-precision float" + const val NUMERIC_38_9_MAX = "numeric(38,9) max" + const val NUMERIC_38_9_MIN = "numeric(38,9) min" + const val SMALLEST_POSITIVE_FLOAT32 = "smallest positive float32" + const val SMALLEST_NEGATIVE_FLOAT32 = "smallest negative float32" + const val LARGEST_POSITIVE_FLOAT32 = "largest positive float32" + const val LARGEST_NEGATIVE_FLOAT32 = "largest negative float32" + const val SMALLEST_POSITIVE_FLOAT64 = "smallest positive float64" + const val SMALLEST_NEGATIVE_FLOAT64 = "smallest negative float64" + const val LARGEST_POSITIVE_FLOAT64 = "largest positive float64" + const val LARGEST_NEGATIVE_FLOAT64 = "largest negative float64" + const val SLIGHTLY_ABOVE_LARGEST_POSITIVE_FLOAT64 = "slightly above largest positive float64" + const val SLIGHTLY_BELOW_LARGEST_NEGATIVE_FLOAT64 = "slightly below largest negative float64" + + val float64 = + listOf( + case(ZERO, NumberValue(bigdec(0)), 0.0), + case(ONE, NumberValue(bigdec(1)), 1.0), + case(NEGATIVE_ONE, NumberValue(bigdec(-1)), -1.0), + // This value isn't exactly representable as a float64 + // (the exact value is `123.400000000000005684341886080801486968994140625`) + // but we should preserve the canonical representation + case(ONE_HUNDRED_TWENTY_THREE_POINT_FOUR, NumberValue(bigdec("123.4")), 123.4), + case( + NEGATIVE_ONE_HUNDRED_TWENTY_THREE_POINT_FOUR, + NumberValue(bigdec("-123.4")), + -123.4 + ), + // These values have too much precision for a float64, so we round them + case( + POSITIVE_HIGH_PRECISION_FLOAT, + NumberValue(bigdec("1234567890.1234567890123456789")), + 1234567890.1234567, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + NEGATIVE_HIGH_PRECISION_FLOAT, + NumberValue(bigdec("-1234567890.1234567890123456789")), + -1234567890.1234567, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + NUMERIC_38_9_MAX, + NumberValue(numeric38_9Max), + 1.0E29, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + NUMERIC_38_9_MIN, + NumberValue(numeric38_9Min), + -1.0E29, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + // min/max_value are all positive values, so we need to manually test their negative + // version + case( + SMALLEST_POSITIVE_FLOAT32, + NumberValue(bigdec(Float.MIN_VALUE.toDouble())), + Float.MIN_VALUE.toDouble() + ), + case( + SMALLEST_NEGATIVE_FLOAT32, + NumberValue(bigdec(-Float.MIN_VALUE.toDouble())), + -Float.MIN_VALUE.toDouble() + ), + case( + LARGEST_POSITIVE_FLOAT32, + NumberValue(bigdec(Float.MAX_VALUE.toDouble())), + Float.MAX_VALUE.toDouble() + ), + case( + LARGEST_NEGATIVE_FLOAT32, + NumberValue(bigdec(-Float.MAX_VALUE.toDouble())), + -Float.MAX_VALUE.toDouble() + ), + case( + SMALLEST_POSITIVE_FLOAT64, + NumberValue(bigdec(Double.MIN_VALUE)), + Double.MIN_VALUE + ), + case( + SMALLEST_NEGATIVE_FLOAT64, + NumberValue(bigdec(-Double.MIN_VALUE)), + -Double.MIN_VALUE + ), + case(LARGEST_POSITIVE_FLOAT64, NumberValue(bigdec(Double.MAX_VALUE)), Double.MAX_VALUE), + case( + LARGEST_NEGATIVE_FLOAT64, + NumberValue(bigdec(-Double.MAX_VALUE)), + -Double.MAX_VALUE + ), + // These values are out of bounds, so we null them + case( + SLIGHTLY_ABOVE_LARGEST_POSITIVE_FLOAT64, + NumberValue(bigdec(Double.MAX_VALUE) + bigdec(Double.MIN_VALUE)), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + SLIGHTLY_BELOW_LARGEST_NEGATIVE_FLOAT64, + NumberValue(bigdec(-Double.MAX_VALUE) - bigdec(Double.MIN_VALUE)), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + ) + + val numeric38_9 = + listOf( + case(ZERO, NumberValue(bigdec(0)), bigdec(0.0)), + case(ONE, NumberValue(bigdec(1)), bigdec(1.0)), + case(NEGATIVE_ONE, NumberValue(bigdec(-1)), bigdec(-1.0)), + // This value isn't exactly representable as a float64 + // (the exact value is `123.400000000000005684341886080801486968994140625`) + // but it's perfectly fine as a numeric(38, 9) + case( + ONE_HUNDRED_TWENTY_THREE_POINT_FOUR, + NumberValue(bigdec("123.4")), + bigdec("123.4") + ), + case( + NEGATIVE_ONE_HUNDRED_TWENTY_THREE_POINT_FOUR, + NumberValue(bigdec("-123.4")), + bigdec("-123.4") + ), + // These values have too much precision for a numeric(38, 9), so we round them + case( + POSITIVE_HIGH_PRECISION_FLOAT, + NumberValue(bigdec("1234567890.1234567890123456789")), + bigdec("1234567890.123456789"), + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + NEGATIVE_HIGH_PRECISION_FLOAT, + NumberValue(bigdec("-1234567890.1234567890123456789")), + bigdec("-1234567890.123456789"), + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + SMALLEST_POSITIVE_FLOAT32, + NumberValue(bigdec(Float.MIN_VALUE.toDouble())), + bigdec(0), + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + SMALLEST_NEGATIVE_FLOAT32, + NumberValue(bigdec(-Float.MIN_VALUE.toDouble())), + bigdec(0), + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + SMALLEST_POSITIVE_FLOAT64, + NumberValue(bigdec(Double.MIN_VALUE)), + bigdec(0), + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + SMALLEST_NEGATIVE_FLOAT64, + NumberValue(bigdec(-Double.MIN_VALUE)), + bigdec(0), + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + // numeric bounds are perfectly fine + case(NUMERIC_38_9_MAX, NumberValue(numeric38_9Max), numeric38_9Max), + case(NUMERIC_38_9_MIN, NumberValue(numeric38_9Min), numeric38_9Min), + // These values are out of bounds, so we null them + case( + LARGEST_POSITIVE_FLOAT32, + NumberValue(bigdec(Float.MAX_VALUE.toDouble())), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + LARGEST_NEGATIVE_FLOAT32, + NumberValue(bigdec(-Float.MAX_VALUE.toDouble())), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + LARGEST_POSITIVE_FLOAT64, + NumberValue(bigdec(Double.MAX_VALUE)), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + LARGEST_NEGATIVE_FLOAT64, + NumberValue(bigdec(-Double.MAX_VALUE)), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + SLIGHTLY_ABOVE_LARGEST_POSITIVE_FLOAT64, + NumberValue(bigdec(Double.MAX_VALUE) + bigdec(Double.MIN_VALUE)), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + case( + SLIGHTLY_BELOW_LARGEST_NEGATIVE_FLOAT64, + NumberValue(bigdec(-Double.MAX_VALUE) - bigdec(Double.MIN_VALUE)), + null, + Reason.DESTINATION_FIELD_SIZE_LIMITATION + ), + ) + .map { it.copy(outputValue = (it.outputValue as BigDecimal?)?.setScale(9)) } + + @JvmStatic fun float64() = float64.toArgs() + @JvmStatic fun numeric38_9() = numeric38_9.toArgs() +} + +fun List.toArgs(): List = + this.map { Arguments.argumentSet(it.name, it.inputValue, it.outputValue, it.changeReason) } + .toList() + +/** + * Utility method to use the BigDecimal constructor (supports exponential notation like `1e38`) to + * construct a BigInteger. + */ +fun bigint(str: String): BigInteger = BigDecimal(str).toBigIntegerExact() + +/** Shorthand utility method to construct a bigint from a long */ +fun bigint(long: Long): BigInteger = BigInteger.valueOf(long) + +fun bigdec(str: String): BigDecimal = BigDecimal(str) + +fun bigdec(double: Double): BigDecimal = BigDecimal.valueOf(double) + +fun bigdec(int: Int): BigDecimal = BigDecimal.valueOf(int.toDouble()) + +/** + * Represents a single data coercion test case. You probably want to use [case] as a shorthand + * constructor. + * + * @param name A short human-readable name for the test. Primarily useful for tests where + * [inputValue] is either very long, or otherwise hard to read. + * @param inputValue The value to pass into [ValueCoercer.validate] + * @param outputValue The value that we expect to read back from the destination. Should be + * basically equivalent to the output of [ValueCoercer.validate] + * @param changeReason If `validate` returns Truncate/Nullify, the reason for that + * truncation/nullification. If `validate` returns Valid, this should be null. + */ +data class DataCoercionTestCase( + val name: String, + val inputValue: AirbyteValue, + val outputValue: Any?, + val changeReason: Reason? = null, +) + +fun case( + name: String, + inputValue: AirbyteValue, + outputValue: Any?, + changeReason: Reason? = null, +) = DataCoercionTestCase(name, inputValue, outputValue, changeReason) diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/DataCoercionSuite.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/DataCoercionSuite.kt new file mode 100644 index 00000000000..d8530cc7161 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/DataCoercionSuite.kt @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.component + +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.dataflow.transform.ValueCoercer +import io.airbyte.cdk.load.message.Meta +import io.airbyte.cdk.load.schema.TableSchemaFactory +import io.airbyte.cdk.load.table.ColumnNameMapping +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import kotlinx.coroutines.test.runTest + +/** + * The tests in this class are designed to reference the parameters defined in + * `DataCoercionFixtures.kt`. For example, you might annotate [`handle integer values`] with + * `@MethodSource("io.airbyte.cdk.load.component.DataCoercionIntegerFixtures#int32")`. See each + * fixture class for explanations of what behavior they are exercising. + * + * Note that this class _only_ exercises [ValueCoercer.validate]. You should write separate unit + * tests for [ValueCoercer.map]. For now, the `map` function is primarily intended for transforming + * `UnionType` fields into other types (typically `StringType`), at which point your `validate` + * implementation should be able to handle any StringValue (regardless of whether it was originally + * a StringType or UnionType). + */ +@MicronautTest(environments = ["component"], resolveParameters = false) +interface DataCoercionSuite { + val coercer: ValueCoercer + val airbyteMetaColumnMapping: Map + get() = Meta.COLUMN_NAMES.associateWith { it } + val columnNameMapping: ColumnNameMapping + get() = ColumnNameMapping(mapOf("test" to "test")) + + val opsClient: TableOperationsClient + val testClient: TestTableOperationsClient + val schemaFactory: TableSchemaFactory + + val harness: TableOperationsTestHarness + get() = + TableOperationsTestHarness( + opsClient, + testClient, + schemaFactory, + airbyteMetaColumnMapping + ) + + /** Fixtures are defined in [DataCoercionIntegerFixtures]. */ + fun `handle integer values`( + inputValue: AirbyteValue, + expectedValue: Any?, + expectedChangeReason: Reason? + ) = runTest { + harness.testValueCoercion( + coercer, + columnNameMapping, + FieldType(IntegerType, nullable = true), + inputValue, + expectedValue, + expectedChangeReason, + ) + } + + /** Fixtures are defined in [DataCoercionNumberFixtures]. */ + fun `handle number values`( + inputValue: AirbyteValue, + expectedValue: Any?, + expectedChangeReason: Reason? + ) = runTest { + harness.testValueCoercion( + coercer, + columnNameMapping, + FieldType(NumberType, nullable = true), + inputValue, + expectedValue, + expectedChangeReason, + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsFixtures.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsFixtures.kt index 7ab3956976b..907f507e0e6 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsFixtures.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsFixtures.kt @@ -714,6 +714,11 @@ object TableOperationsFixtures { return map { record -> record.mapKeys { (k, _) -> totalMapping.invert()[k] ?: k } } } + fun List>.removeAirbyteColumns( + airbyteMetaColumnMapping: Map + ): List> = + this.map { rec -> rec.filter { !airbyteMetaColumnMapping.containsValue(it.key) } } + fun List>.removeNulls() = this.map { record -> record.filterValues { it != null } } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsSuite.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsSuite.kt index f7e83aa85e0..490baffe267 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsSuite.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsSuite.kt @@ -58,7 +58,8 @@ interface TableOperationsSuite { get() = Meta.COLUMN_NAMES.associateWith { it } private val harness: TableOperationsTestHarness - get() = TableOperationsTestHarness(client, testClient, airbyteMetaColumnMapping) + get() = + TableOperationsTestHarness(client, testClient, schemaFactory, airbyteMetaColumnMapping) /** Tests basic database connectivity by pinging the database. */ fun `connect to database`() = runTest { assertDoesNotThrow { testClient.ping() } } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsTestHarness.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsTestHarness.kt index 5289d439a58..22e96f730b0 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsTestHarness.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableOperationsTestHarness.kt @@ -4,11 +4,24 @@ package io.airbyte.cdk.load.component +import io.airbyte.cdk.load.command.Append import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.component.TableOperationsFixtures.inputRecord import io.airbyte.cdk.load.component.TableOperationsFixtures.insertRecords +import io.airbyte.cdk.load.component.TableOperationsFixtures.removeAirbyteColumns +import io.airbyte.cdk.load.component.TableOperationsFixtures.removeNulls +import io.airbyte.cdk.load.component.TableOperationsFixtures.reverseColumnNameMapping import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.EnrichedAirbyteValue +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.dataflow.transform.ValidationResult +import io.airbyte.cdk.load.dataflow.transform.ValueCoercer +import io.airbyte.cdk.load.schema.TableSchemaFactory import io.airbyte.cdk.load.schema.model.TableName import io.airbyte.cdk.load.table.ColumnNameMapping +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason import io.github.oshai.kotlinlogging.KotlinLogging import org.junit.jupiter.api.Assertions.assertEquals @@ -21,6 +34,7 @@ private val log = KotlinLogging.logger {} class TableOperationsTestHarness( private val client: TableOperationsClient, private val testClient: TestTableOperationsClient, + private val schemaFactory: TableSchemaFactory, private val airbyteMetaColumnMapping: Map, ) { @@ -100,8 +114,74 @@ class TableOperationsTestHarness( /** Reads records from a table, filtering out Meta columns. */ suspend fun readTableWithoutMetaColumns(tableName: TableName): List> { val tableRead = testClient.readTable(tableName) - return tableRead.map { rec -> - rec.filter { !airbyteMetaColumnMapping.containsValue(it.key) } + return tableRead.removeAirbyteColumns(airbyteMetaColumnMapping) + } + + /** Apply the coercer to a value and verify that we can write the coerced value correctly */ + suspend fun testValueCoercion( + coercer: ValueCoercer, + columnNameMapping: ColumnNameMapping, + fieldType: FieldType, + inputValue: AirbyteValue, + expectedValue: Any?, + expectedChangeReason: Reason?, + ) { + val testNamespace = TableOperationsFixtures.generateTestNamespace("test") + val tableName = + TableOperationsFixtures.generateTestTableName("table-test-table", testNamespace) + val schema = ObjectType(linkedMapOf("test" to fieldType)) + val tableSchema = schemaFactory.make(tableName, schema.properties, Append) + val stream = + TableOperationsFixtures.createStream( + namespace = tableName.namespace, + name = tableName.name, + tableSchema = tableSchema, + ) + + val inputValueAsEnrichedAirbyteValue = + EnrichedAirbyteValue( + inputValue, + fieldType.type, + "test", + airbyteMetaField = null, + ) + val validatedValue = coercer.validate(inputValueAsEnrichedAirbyteValue) + val valueToInsert: AirbyteValue + val changeReason: Reason? + when (validatedValue) { + is ValidationResult.ShouldNullify -> { + valueToInsert = NullValue + changeReason = validatedValue.reason + } + is ValidationResult.ShouldTruncate -> { + valueToInsert = validatedValue.truncatedValue + changeReason = validatedValue.reason + } + ValidationResult.Valid -> { + valueToInsert = inputValue + changeReason = null + } } + + client.createNamespace(testNamespace) + client.createTable(stream, tableName, columnNameMapping, replace = false) + testClient.insertRecords( + tableName, + columnNameMapping, + inputRecord("test" to valueToInsert), + ) + + val actualRecords = + testClient + .readTable(tableName) + .removeAirbyteColumns(airbyteMetaColumnMapping) + .reverseColumnNameMapping(columnNameMapping, airbyteMetaColumnMapping) + .removeNulls() + assertEquals( + listOf(mapOf("test" to expectedValue)).removeNulls(), + actualRecords, + "For input $inputValue, expected $expectedValue. Coercer output was $validatedValue.", + ) + assertEquals(expectedChangeReason, changeReason) } } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableSchemaEvolutionSuite.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableSchemaEvolutionSuite.kt index 34568395285..07fb4ddae24 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableSchemaEvolutionSuite.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/component/TableSchemaEvolutionSuite.kt @@ -44,7 +44,13 @@ interface TableSchemaEvolutionSuite { val schemaFactory: TableSchemaFactory private val harness: TableOperationsTestHarness - get() = TableOperationsTestHarness(opsClient, testClient, airbyteMetaColumnMapping) + get() = + TableOperationsTestHarness( + opsClient, + testClient, + schemaFactory, + airbyteMetaColumnMapping + ) /** * Test that the connector can correctly discover all of its own data types. This test creates a diff --git a/airbyte-cdk/bulk/version.properties b/airbyte-cdk/bulk/version.properties index 1dd51a3a32d..75629880cd3 100644 --- a/airbyte-cdk/bulk/version.properties +++ b/airbyte-cdk/bulk/version.properties @@ -1 +1 @@ -version=0.1.88 +version=0.1.89