1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Destination S3 Data Lake: Write objects+unions as stringified json (#51042)

This commit is contained in:
Edward Gao
2025-01-15 09:41:01 -08:00
committed by GitHub
parent 85e7f3daa8
commit 5c90a7e968
19 changed files with 868 additions and 225 deletions

View File

@@ -8,6 +8,8 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopNameMapper
import io.airbyte.cdk.load.test.util.UncoercedExpectedRecordMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.UnionBehavior
import io.airbyte.cdk.load.write.Untyped
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
@@ -23,7 +25,9 @@ class MockBasicFunctionalityIntegrationTest :
isStreamSchemaRetroactive = false,
supportsDedup = true,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
commitDataIncrementally = false,
allTypesBehavior = Untyped,

View File

@@ -0,0 +1,146 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.data.json.JsonToAirbyteValue
import io.airbyte.cdk.load.util.serializeToString
import java.math.BigDecimal
import java.math.BigInteger
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
/**
* Utility class to coerce AirbyteValue to specific types. Does **not** support recursive coercion.
*
* More specifically: This class coerces the output of [JsonToAirbyteValue] to strongly-typed
* [AirbyteValue]. In particular, this class will parse temporal types, and performs some
* common-sense conversions among numeric types, as well as upcasting any value to StringValue.
*/
object AirbyteValueCoercer {
fun coerceBoolean(value: AirbyteValue): BooleanValue? = requireType<BooleanValue>(value)
fun coerceInt(value: AirbyteValue): IntegerValue? =
when (value) {
// Maybe we should truncate non-int values?
// But to match existing behavior, let's just null for now.
is NumberValue -> IntegerValue(value.value.toBigIntegerExact())
is IntegerValue -> value
is StringValue -> IntegerValue(BigInteger(value.value))
else -> null
}
fun coerceNumber(value: AirbyteValue): NumberValue? =
when (value) {
is NumberValue -> value
is IntegerValue -> NumberValue(value.value.toBigDecimal())
is StringValue -> NumberValue(BigDecimal(value.value))
else -> null
}
fun coerceString(value: AirbyteValue): StringValue {
val stringified =
when (value) {
// this should never happen, because we handle `value is NullValue`
// in the top-level if statement
NullValue -> throw IllegalStateException("Unexpected NullValue")
is StringValue -> value.value
is NumberValue -> value.value.toString()
is IntegerValue -> value.value.toString()
is BooleanValue -> value.value.toString()
is ArrayValue,
is ObjectValue -> value.serializeToString()
// JsonToAirbyteValue never outputs these values, so don't handle them.
is DateValue,
is TimeWithTimezoneValue,
is TimeWithoutTimezoneValue,
is TimestampWithTimezoneValue,
is TimestampWithoutTimezoneValue,
is UnknownValue ->
throw IllegalArgumentException(
"Invalid value type ${value.javaClass.canonicalName}"
)
}
return StringValue(stringified)
}
fun coerceDate(value: AirbyteValue): DateValue? =
requireType<StringValue, DateValue>(value) {
DateValue(LocalDate.parse(it.value, DATE_TIME_FORMATTER))
}
fun coerceTimeTz(value: AirbyteValue): TimeWithTimezoneValue? =
requireType<StringValue, TimeWithTimezoneValue>(value) {
val ot =
try {
OffsetTime.parse(it.value, TIME_FORMATTER)
} catch (e: Exception) {
LocalTime.parse(it.value, TIME_FORMATTER).atOffset(ZoneOffset.UTC)
}
TimeWithTimezoneValue(ot)
}
fun coerceTimeNtz(value: AirbyteValue): TimeWithoutTimezoneValue? =
requireType<StringValue, TimeWithoutTimezoneValue>(value) {
TimeWithoutTimezoneValue(LocalTime.parse(it.value, TIME_FORMATTER))
}
fun coerceTimestampTz(value: AirbyteValue): TimestampWithTimezoneValue? =
requireType<StringValue, TimestampWithTimezoneValue>(value) {
TimestampWithTimezoneValue(offsetDateTime(it))
}
fun coerceTimestampNtz(value: AirbyteValue): TimestampWithoutTimezoneValue? =
requireType<StringValue, TimestampWithoutTimezoneValue>(value) {
TimestampWithoutTimezoneValue(offsetDateTime(it).toLocalDateTime())
}
private fun offsetDateTime(it: StringValue): OffsetDateTime {
val odt =
try {
ZonedDateTime.parse(it.value, AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER)
.toOffsetDateTime()
} catch (e: Exception) {
LocalDateTime.parse(it.value, AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER)
.atOffset(ZoneOffset.UTC)
}
return odt
}
// In theory, we could e.g. Jsons.readTree((value as StringValue).value).
// But for now, just require that the source emits an actual ObjectNode.
fun coerceObject(value: AirbyteValue): ObjectValue? = requireType<ObjectValue>(value)
fun coerceArray(value: AirbyteValue): ArrayValue? = requireType<ArrayValue>(value)
private inline fun <reified T : AirbyteValue> requireType(
value: AirbyteValue,
): T? = requireType<T, T>(value) { it }
private inline fun <reified InputType : AirbyteValue, OutputType : AirbyteValue> requireType(
value: AirbyteValue,
convertToOutputType: (InputType) -> OutputType,
): OutputType? {
return if (value is InputType) {
convertToOutputType(value)
} else {
null
}
}
val DATE_TIME_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d][[' '][G]][[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X][[' '][G]]]]"
)
val TIME_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]"
)
}

View File

@@ -4,16 +4,9 @@
package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.util.serializeToString
import java.math.BigDecimal
import java.math.BigInteger
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneOffset
import java.time.ZonedDateTime
import io.airbyte.cdk.load.message.Meta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
import java.time.format.DateTimeFormatter
/**
@@ -28,147 +21,112 @@ import java.time.format.DateTimeFormatter
* This mapper performs common-sense type coercions. For example, it will promote IntegerValue to
* NumberValue, or parse StringValue to TimestampValue.
*/
class AirbyteValueDeepCoercingMapper : AirbyteValueIdentityMapper() {
class AirbyteValueDeepCoercingMapper(
recurseIntoObjects: Boolean,
recurseIntoArrays: Boolean,
recurseIntoUnions: Boolean,
) :
AirbyteValueIdentityMapper(
recurseIntoObjects = recurseIntoObjects,
recurseIntoArrays = recurseIntoArrays,
recurseIntoUnions = recurseIntoUnions,
) {
override fun mapObject(
value: AirbyteValue,
schema: ObjectType,
context: Context
context: Context,
): Pair<AirbyteValue, Context> =
// force to object, and then use the superclass recursion
requireType<ObjectValue>(value, schema, context) { super.mapObject(it, schema, context) }
// We should inspect the object's fields if we're doing full recursion,
// or if this is the root object.
if (recurseIntoObjects || context.path.isEmpty()) {
// force to object, and then use the superclass recursion
AirbyteValueCoercer.coerceObject(value)?.let { super.mapObject(it, schema, context) }
?: nulledOut(schema, context)
} else {
// otherwise, try to get an ObjectValue out of this value, but don't recurse.
withContext(AirbyteValueCoercer.coerceObject(value), context)
}
override fun mapObjectWithEmptySchema(
value: AirbyteValue,
schema: ObjectTypeWithEmptySchema,
context: Context
): Pair<AirbyteValue, Context> = requireType<ObjectValue>(value, schema, context)
): Pair<AirbyteValue, Context> = withContext(AirbyteValueCoercer.coerceObject(value), context)
override fun mapObjectWithoutSchema(
value: AirbyteValue,
schema: ObjectTypeWithoutSchema,
context: Context
): Pair<AirbyteValue, Context> = requireType<ObjectValue>(value, schema, context)
): Pair<AirbyteValue, Context> = withContext(AirbyteValueCoercer.coerceObject(value), context)
override fun mapArray(
value: AirbyteValue,
schema: ArrayType,
context: Context
): Pair<AirbyteValue, Context> =
// force to array, and then use the superclass recursion
requireType<ArrayValue>(value, schema, context) { super.mapArray(it, schema, context) }
// similar to mapObject, recurse if needed.
// Realistically, the root node is _never_ an array, i.e. `context.path.isEmpty()` is
// always false.
// But might as well be consistent.
if (recurseIntoArrays || context.path.isEmpty()) {
// force to array, and then use the superclass recursion
AirbyteValueCoercer.coerceArray(value)?.let { super.mapArray(it, schema, context) }
?: nulledOut(schema, context)
} else {
withContext(AirbyteValueCoercer.coerceArray(value), context)
}
override fun mapArrayWithoutSchema(
value: AirbyteValue,
schema: ArrayTypeWithoutSchema,
context: Context
): Pair<AirbyteValue, Context> = requireType<ArrayValue>(value, schema, context)
): Pair<AirbyteValue, Context> = withContext(AirbyteValueCoercer.coerceArray(value), context)
override fun mapBoolean(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> =
requireType<BooleanValue>(value, BooleanType, context)
withContext(AirbyteValueCoercer.coerceBoolean(value), context)
override fun mapNumber(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> =
when (value) {
is NumberValue -> value to context
is IntegerValue -> NumberValue(value.value.toBigDecimal()) to context
is StringValue -> NumberValue(BigDecimal(value.value)) to context
else -> nulledOut(NumberType, context)
}
withContext(AirbyteValueCoercer.coerceNumber(value), context)
override fun mapInteger(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> =
when (value) {
// Maybe we should truncate non-int values?
// But to match existing behavior, let's just null for now.
is NumberValue -> IntegerValue(value.value.toBigIntegerExact()) to context
is IntegerValue -> value to context
is StringValue -> IntegerValue(BigInteger(value.value)) to context
else -> nulledOut(IntegerType, context)
}
withContext(AirbyteValueCoercer.coerceInt(value), context)
override fun mapString(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> {
val stringified =
when (value) {
// this should never happen, because we handle `value is NullValue`
// in the top-level if statement
NullValue -> throw IllegalStateException("Unexpected NullValue")
is StringValue -> value.value
is NumberValue -> value.value.toString()
is IntegerValue -> value.value.toString()
is BooleanValue -> value.value.toString()
is ArrayValue,
is ObjectValue -> value.serializeToString()
// JsonToAirbyteValue never outputs these values, so don't handle them.
is DateValue,
is TimeWithTimezoneValue,
is TimeWithoutTimezoneValue,
is TimestampWithTimezoneValue,
is TimestampWithoutTimezoneValue,
is UnknownValue ->
throw IllegalArgumentException(
"Invalid value type ${value.javaClass.canonicalName}"
)
}
return StringValue(stringified) to context
}
override fun mapString(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> =
withContext(AirbyteValueCoercer.coerceString(value), context)
override fun mapDate(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> =
requireType<StringValue>(value, DateType, context) {
DateValue(LocalDate.parse(it.value, DATE_TIME_FORMATTER)) to context
}
withContext(AirbyteValueCoercer.coerceDate(value), context)
override fun mapTimeWithTimezone(
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> =
requireType<StringValue>(value, TimeTypeWithTimezone, context) {
val ot =
try {
OffsetTime.parse(it.value, TIME_FORMATTER)
} catch (e: Exception) {
LocalTime.parse(it.value, TIME_FORMATTER).atOffset(ZoneOffset.UTC)
}
TimeWithTimezoneValue(ot) to context
}
): Pair<AirbyteValue, Context> = withContext(AirbyteValueCoercer.coerceTimeTz(value), context)
override fun mapTimeWithoutTimezone(
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> =
requireType<StringValue>(value, TimeTypeWithoutTimezone, context) {
TimeWithoutTimezoneValue(LocalTime.parse(it.value, TIME_FORMATTER)) to context
}
): Pair<AirbyteValue, Context> = withContext(AirbyteValueCoercer.coerceTimeNtz(value), context)
override fun mapTimestampWithTimezone(
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> =
requireType<StringValue>(value, TimestampTypeWithTimezone, context) {
TimestampWithTimezoneValue(offsetDateTime(it)) to context
}
withContext(AirbyteValueCoercer.coerceTimestampTz(value), context)
override fun mapTimestampWithoutTimezone(
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> =
requireType<StringValue>(value, TimestampTypeWithoutTimezone, context) {
TimestampWithoutTimezoneValue(offsetDateTime(it).toLocalDateTime()) to context
}
private fun offsetDateTime(it: StringValue): OffsetDateTime {
val odt =
try {
ZonedDateTime.parse(it.value, DATE_TIME_FORMATTER).toOffsetDateTime()
} catch (e: Exception) {
LocalDateTime.parse(it.value, DATE_TIME_FORMATTER).atOffset(ZoneOffset.UTC)
}
return odt
}
withContext(AirbyteValueCoercer.coerceTimestampNtz(value), context)
override fun mapUnion(
value: AirbyteValue,
schema: UnionType,
context: Context
): Pair<AirbyteValue, Context> =
if (schema.options.isEmpty()) {
if (!recurseIntoUnions) {
value to context
} else if (schema.options.isEmpty()) {
nulledOut(schema, context)
} else {
val option =
@@ -205,18 +163,21 @@ class AirbyteValueDeepCoercingMapper : AirbyteValueIdentityMapper() {
return mappedValue !is NullValue
}
private inline fun <reified T : AirbyteValue> requireType(
value: AirbyteValue,
schema: AirbyteType,
context: Context,
f: (T) -> Pair<AirbyteValue, Context> = { value to context },
): Pair<AirbyteValue, Context> {
return if (value is T) {
f(value)
private fun withContext(value: AirbyteValue?, context: Context): Pair<AirbyteValue, Context> =
if (value != null) {
// Note: This only triggers if the value was explicitly nulled out.
// If the value was originally null, then value would be NullValue.
value to context
} else {
nulledOut(schema, context)
context.changes.add(
Meta.Change(
context.path.joinToString("."),
Change.NULLED,
Reason.DESTINATION_SERIALIZATION_ERROR
)
)
NullValue to context
}
}
companion object {
val DATE_TIME_FORMATTER: DateTimeFormatter =

View File

@@ -25,7 +25,11 @@ class AirbyteValueNoopMapper : AirbyteValueMapper {
): Pair<AirbyteValue, List<Meta.Change>> = value to changes
}
open class AirbyteValueIdentityMapper : AirbyteValueMapper {
open class AirbyteValueIdentityMapper(
protected val recurseIntoObjects: Boolean = true,
protected val recurseIntoArrays: Boolean = true,
protected val recurseIntoUnions: Boolean = true,
) : AirbyteValueMapper {
data class Context(
val nullable: Boolean = false,
val path: List<String> = emptyList(),
@@ -91,7 +95,8 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
schema: ObjectType,
context: Context
): Pair<AirbyteValue, Context> {
if (value !is ObjectValue) {
val shouldRecurse = recurseIntoObjects || context.path.isEmpty()
if (value !is ObjectValue || !shouldRecurse) {
return value to context
}
val values = LinkedHashMap<String, AirbyteValue>()
@@ -124,7 +129,7 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
schema: ArrayType,
context: Context
): Pair<AirbyteValue, Context> {
if (value !is ArrayValue) {
if (value !is ArrayValue || !recurseIntoArrays) {
return value to context
}
val mapped =
@@ -153,6 +158,9 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
schema: UnionType,
context: Context
): Pair<AirbyteValue, Context> {
if (!recurseIntoUnions) {
return value to context
}
/*
This mapper should not perform validation, so make a best-faith effort to recurse,
but if nothing matches the union, pass the value through unchanged. If clients validated

View File

@@ -8,6 +8,7 @@ import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper.Companion.DATE_TI
import io.airbyte.cdk.load.data.json.toAirbyteValue
import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
@@ -21,7 +22,12 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
class AirbyteValueDeepCoercingMapperTest {
private val mapper = AirbyteValueDeepCoercingMapper()
private val mapper =
AirbyteValueDeepCoercingMapper(
recurseIntoObjects = false,
recurseIntoArrays = false,
recurseIntoUnions = false,
)
@Test
fun testBasicCoerce() {
@@ -286,5 +292,127 @@ class AirbyteValueDeepCoercingMapperTest {
}
}
@Test
fun testCoerceNestedValue() {
val (mappedValue, changes) =
mapper.map(
Jsons.readTree(
"""
{
"sub_object": {
"undeclared": 42,
"timestamptz": "invalid"
},
"sub_array": ["invalid"]
}
""".trimIndent()
)
.toAirbyteValue(),
ObjectType(
linkedMapOf(
"sub_object" to
f(
ObjectType(
linkedMapOf("timestamptz" to f(TimestampTypeWithTimezone))
)
),
"sub_array" to f(ArrayType(f(IntegerType)))
)
),
)
assertAll(
{
assertEquals(
ObjectValue(
linkedMapOf(
"sub_object" to
ObjectValue(
linkedMapOf(
"undeclared" to IntegerValue(42),
"timestamptz" to StringValue("invalid"),
)
),
"sub_array" to ArrayValue(listOf(StringValue("invalid")))
)
),
mappedValue
)
},
{ assertEquals(emptyList<Meta.Change>(), changes) },
)
}
/**
* Identical to [testCoerceNestedValue], but uses a mapper with
* [AirbyteValueDeepCoercingMapper.recurseIntoObjects] and
* [AirbyteValueDeepCoercingMapper.recurseIntoArrays] enabled.
*/
@Test
fun testCoerceNestedValueRecursing() {
val mapper =
AirbyteValueDeepCoercingMapper(
recurseIntoObjects = true,
recurseIntoArrays = true,
recurseIntoUnions = true,
)
val (mappedValue, changes) =
mapper.map(
Jsons.readTree(
"""
{
"sub_object": {
"undeclared": 42,
"timestamptz": "invalid"
},
"sub_array": ["invalid"]
}
""".trimIndent()
)
.toAirbyteValue(),
ObjectType(
linkedMapOf(
"sub_object" to
f(
ObjectType(
linkedMapOf("timestamptz" to f(TimestampTypeWithTimezone))
)
),
"sub_array" to f(ArrayType(f(IntegerType)))
)
),
)
assertAll(
// Note: undeclared field is gone, and we null the invalid timestamp
{
assertEquals(
ObjectValue(
linkedMapOf(
"sub_object" to ObjectValue(linkedMapOf("timestamptz" to NullValue)),
"sub_array" to ArrayValue(listOf(NullValue))
)
),
mappedValue
)
},
{
assertEquals(
listOf(
Meta.Change(
"sub_object.timestamptz",
AirbyteRecordMessageMetaChange.Change.NULLED,
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR
),
Meta.Change(
"sub_array.[0]",
AirbyteRecordMessageMetaChange.Change.NULLED,
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR
)
),
changes
)
},
)
}
private fun f(type: AirbyteType) = FieldType(type, nullable = true)
}

View File

@@ -8,6 +8,9 @@ import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.test.util.Root
import io.airbyte.cdk.load.test.util.SchemaRecordBuilder
import io.airbyte.cdk.load.test.util.ValueTestBuilder
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
import kotlin.test.assertEquals
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
@@ -62,4 +65,75 @@ class AirbyteValueIdentityMapperTest {
{ Assertions.assertEquals(IntegerValue(1000), (values as ObjectValue).values["bad"]) },
)
}
@Test
fun testNonRecursiveMapping() {
val type =
ObjectType(
linkedMapOf(
"int" to f(IntegerType),
"object" to
f(
ObjectType(
linkedMapOf("sub_int" to FieldType(IntegerType, nullable = true))
),
),
"array" to f(ArrayType(f(IntegerType))),
"union" to f(UnionType(setOf(IntegerType, BooleanType))),
)
)
val value =
ObjectValue(
linkedMapOf(
"int" to StringValue("invalid1"),
"object" to ObjectValue(linkedMapOf("sub_int" to StringValue("invalid2"))),
"array" to ArrayValue(listOf(StringValue("invalid3"))),
"union" to IntegerValue(42),
)
)
// Dumb mapper, which nulls all root-level integer fields
val mapper =
object :
AirbyteValueIdentityMapper(
recurseIntoObjects = false,
recurseIntoArrays = false,
recurseIntoUnions = false,
) {
override fun mapInteger(
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> = nulledOut(IntegerType, context)
}
val (mappedValue, changes) = mapper.map(value, type)
assertAll(
{
assertEquals(
ObjectValue(
linkedMapOf(
// The root int was nulled
"int" to NullValue,
// The nested ints were not nulled
"object" to
ObjectValue(linkedMapOf("sub_int" to StringValue("invalid2"))),
"array" to ArrayValue(listOf(StringValue("invalid3"))),
"union" to IntegerValue(42),
)
),
mappedValue
)
},
{
assertEquals(
listOf(
Meta.Change("int", Change.NULLED, Reason.DESTINATION_SERIALIZATION_ERROR),
),
changes
)
}
)
}
private fun f(type: AirbyteType) = FieldType(type, nullable = true)
}

View File

@@ -50,6 +50,7 @@ import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.NoopNameMapper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.util.deserializeToNode
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.math.BigDecimal
@@ -88,6 +89,56 @@ data class StronglyTyped(
data object Untyped : AllTypesBehavior
/**
* Destinations may choose to handle nested objects/arrays in a few different ways.
*
* Note that this is _not_ the same as
* [BasicFunctionalityIntegrationTest.stringifySchemalessObjects]. This enum is only used for
* objects with an explicit, non-empty list of properties.
*/
enum class SchematizedNestedValueBehavior {
/**
* Nested objects are written without modification: undeclared fields are retained; values not
* matching the schema are retained.
*/
PASS_THROUGH,
/**
* Nested objects are written as structs: undeclared fields are dropped, and values not matching
* the schema are nulled.
*/
STRONGLY_TYPE,
/**
* Nested objects/arrays are JSON-serialized and written as strings. Similar to [PASS_THROUGH],
* objects are written without modification.
*/
STRINGIFY,
}
enum class UnionBehavior {
/**
* Values corresponding to union fields are passed through, regardless of whether they actually
* match any of the union options.
*/
PASS_THROUGH,
/**
* Union fields are turned into objects, with a `type` field indicating the selected union
* option. For example, the value `42` in a union with an Integer option would be represented as
* `{"type": "integer", "integer": 42}`.
*
* Values which do not match any union options are nulled.
*/
PROMOTE_TO_OBJECT,
/**
* Union fields are JSON-serialized and written as strings. Similar to the [PASS_THROUGH]
* option, no validation is performed.
*/
STRINGIFY,
}
abstract class BasicFunctionalityIntegrationTest(
/** The config to pass into the connector, as a serialized JSON blob */
val configContents: String,
@@ -112,7 +163,9 @@ abstract class BasicFunctionalityIntegrationTest(
val isStreamSchemaRetroactive: Boolean,
val supportsDedup: Boolean,
val stringifySchemalessObjects: Boolean,
val promoteUnionToObject: Boolean,
val schematizedObjectBehavior: SchematizedNestedValueBehavior,
val schematizedArrayBehavior: SchematizedNestedValueBehavior,
val unionBehavior: UnionBehavior,
val preserveUndeclaredFields: Boolean,
val supportFileTransfer: Boolean,
/**
@@ -1753,7 +1806,7 @@ abstract class BasicFunctionalityIntegrationTest(
data =
mapOf(
"id" to 4,
"struct" to mapOf("foo" to nestedFloat),
"struct" to schematizedObject(linkedMapOf("foo" to nestedFloat)),
"number" to topLevelFloat,
"integer" to bigInt,
),
@@ -1821,7 +1874,7 @@ abstract class BasicFunctionalityIntegrationTest(
"""
{
"id": 1,
"schematized_object": { "id": 1, "name": "Joe" },
"schematized_object": { "id": 1, "name": "Joe", "undeclared": 42 },
"empty_object": {},
"schemaless_object": { "uuid": "38F52396-736D-4B23-B5B4-F504D8894B97", "probability": 1.5 },
"schematized_array": [10, null],
@@ -1868,7 +1921,11 @@ abstract class BasicFunctionalityIntegrationTest(
data =
mapOf(
"id" to 1,
"schematized_object" to mapOf("id" to 1, "name" to "Joe"),
"schematized_object" to
schematizedObject(
linkedMapOf("id" to 1, "name" to "Joe", "undeclared" to 42),
linkedMapOf("id" to 1, "name" to "Joe"),
),
"empty_object" to
if (stringifySchemalessObjects) "{}" else emptyMap<Any, Any>(),
"schemaless_object" to
@@ -1880,7 +1937,12 @@ abstract class BasicFunctionalityIntegrationTest(
"probability" to 1.5
)
},
"schematized_array" to listOf(10, null),
"schematized_array" to
when (schematizedArrayBehavior) {
SchematizedNestedValueBehavior.PASS_THROUGH -> listOf(10, null)
SchematizedNestedValueBehavior.STRONGLY_TYPE -> listOf(10, null)
SchematizedNestedValueBehavior.STRINGIFY -> "[10,null]"
},
"schemaless_array" to
if (stringifySchemalessObjects) {
"""[10,"foo",null,{"bar":"qua"}]"""
@@ -1896,7 +1958,8 @@ abstract class BasicFunctionalityIntegrationTest(
data =
mapOf(
"id" to 2,
"schematized_object" to mapOf("id" to 2, "name" to "Jane"),
"schematized_object" to
schematizedObject(linkedMapOf("id" to 2, "name" to "Jane")),
"empty_object" to
if (stringifySchemalessObjects) {
"""{"extra":"stuff"}"""
@@ -1916,7 +1979,13 @@ abstract class BasicFunctionalityIntegrationTest(
"flags" to listOf(true, false, false)
)
},
"schematized_array" to emptyList<Long>(),
"schematized_array" to
when (schematizedArrayBehavior) {
SchematizedNestedValueBehavior.PASS_THROUGH -> emptyList<Long>()
SchematizedNestedValueBehavior.STRONGLY_TYPE ->
emptyList<Long>()
SchematizedNestedValueBehavior.STRINGIFY -> "[]"
},
"schemaless_array" to
if (stringifySchemalessObjects) {
"[]"
@@ -2195,14 +2264,20 @@ abstract class BasicFunctionalityIntegrationTest(
)
)
fun maybePromote(typeName: String, value: Any?) =
if (promoteUnionToObject) {
mapOf(
"type" to typeName,
typeName to value,
)
} else {
value
fun unionValue(typeName: String, value: Any?, skipSerialize: Boolean = false) =
when (unionBehavior) {
UnionBehavior.PASS_THROUGH -> value
UnionBehavior.PROMOTE_TO_OBJECT ->
mapOf(
"type" to typeName,
typeName to value,
)
UnionBehavior.STRINGIFY ->
if (value is String && skipSerialize) {
StringValue(value)
} else {
StringValue(value.serializeToString())
}
}
val expectedRecords: List<OutputRecord> =
listOf(
@@ -2212,28 +2287,49 @@ abstract class BasicFunctionalityIntegrationTest(
data =
mapOf(
"id" to 1,
"combined_type" to maybePromote("string", "string1"),
"combined_type" to unionValue("string", "string1"),
"union_of_string_and_schemaless_type" to
maybePromote(
unionValue(
"object",
if (stringifySchemalessObjects) {
"""{"foo":"bar"}"""
} else {
mapOf("foo" to "bar")
}
schematizedObject(linkedMapOf("foo" to "bar"))
},
// Don't double-serialize the object.
skipSerialize = stringifySchemalessObjects,
),
"union_of_objects_with_properties_identical" to
mapOf("id" to 10, "name" to "Joe"),
schematizedObject(linkedMapOf("id" to 10, "name" to "Joe")),
"union_of_objects_with_properties_overlapping" to
mapOf("id" to 20, "name" to "Jane", "flagged" to true),
schematizedObject(
linkedMapOf("id" to 20, "name" to "Jane", "flagged" to true)
),
"union_of_objects_with_properties_contradicting" to
mapOf("id" to maybePromote("integer", 1), "name" to "Jenny"),
// can't just call schematizedObject(... unionValue) - there's some
// nontrivial interactions here
when (schematizedObjectBehavior) {
// these two cases are simple
SchematizedNestedValueBehavior.PASS_THROUGH,
SchematizedNestedValueBehavior.STRONGLY_TYPE ->
linkedMapOf(
"id" to unionValue("integer", 1),
"name" to "Jenny"
)
// If we stringify, then the nested union value is _not_
// processed
// (note that `id` is mapped to 1 and not "1")
SchematizedNestedValueBehavior.STRINGIFY ->
"""{"id":1,"name":"Jenny"}"""
},
"union_of_objects_with_properties_nonoverlapping" to
mapOf(
"id" to 30,
"name" to "Phil",
"flagged" to false,
"description" to "Very Phil",
schematizedObject(
linkedMapOf(
"id" to 30,
"name" to "Phil",
"flagged" to false,
"description" to "Very Phil",
)
)
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
@@ -2244,18 +2340,30 @@ abstract class BasicFunctionalityIntegrationTest(
data =
mapOf(
"id" to 2,
"combined_type" to maybePromote("integer", 20),
"combined_type" to unionValue("integer", 20),
"union_of_objects_with_properties_identical" to
emptyMap<String, Any?>(),
schematizedObject(linkedMapOf()),
"union_of_objects_with_properties_nonoverlapping" to
emptyMap<String, Any?>(),
schematizedObject(linkedMapOf()),
"union_of_objects_with_properties_overlapping" to
emptyMap<String, Any?>(),
schematizedObject(linkedMapOf()),
"union_of_objects_with_properties_contradicting" to
mapOf(
"id" to maybePromote("string", "seal-one-hippity"),
"name" to "James"
)
// similar to the previous record - need to handle this branch
// manually
when (schematizedObjectBehavior) {
// these two cases are simple
SchematizedNestedValueBehavior.PASS_THROUGH,
SchematizedNestedValueBehavior.STRONGLY_TYPE ->
linkedMapOf(
"id" to unionValue("string", "seal-one-hippity"),
"name" to "James"
)
// If we stringify, then the nested union value is _not_
// processed
// (note that `id` is mapped to 1 and not "1")
SchematizedNestedValueBehavior.STRINGIFY ->
"""{"id":"seal-one-hippity","name":"James"}"""
}
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
@@ -2331,6 +2439,23 @@ abstract class BasicFunctionalityIntegrationTest(
)
}
private fun schematizedObject(
fullObject: LinkedHashMap<String, Any?>,
coercedObject: LinkedHashMap<String, Any?> = fullObject
): AirbyteValue =
schematizedObject(ObjectValue.from(fullObject), ObjectValue.from(coercedObject))
private fun schematizedObject(
fullObject: ObjectValue,
coercedObject: ObjectValue = fullObject
): AirbyteValue {
return when (schematizedObjectBehavior) {
SchematizedNestedValueBehavior.PASS_THROUGH -> fullObject
SchematizedNestedValueBehavior.STRONGLY_TYPE -> coercedObject
SchematizedNestedValueBehavior.STRINGIFY -> StringValue(fullObject.serializeToString())
}
}
companion object {
private val intType = FieldType(IntegerType, nullable = true)
private val numberType = FieldType(NumberType, nullable = true)

View File

@@ -23,7 +23,12 @@ class AvroMapperPipelineFactory : MapperPipelineFactory {
listOf(
FailOnAllUnknownTypesExceptNull() to AirbyteValueNoopMapper(),
MergeUnions() to AirbyteValueNoopMapper(),
AirbyteSchemaNoopMapper() to AirbyteValueDeepCoercingMapper(),
AirbyteSchemaNoopMapper() to
AirbyteValueDeepCoercingMapper(
recurseIntoObjects = true,
recurseIntoArrays = true,
recurseIntoUnions = true,
),
// We need to maintain the original ObjectWithNoProperties/etc type.
// For example, if a stream declares no columns, we will (correctly) recognize
// the root schema as ObjectTypeWithEmptySchema.

View File

@@ -21,6 +21,7 @@ import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.message.Meta
import java.util.UUID
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Type
@@ -29,31 +30,35 @@ import org.apache.iceberg.types.Types.NestedField
class AirbyteTypeToIcebergSchema {
fun convert(airbyteSchema: AirbyteType): Type {
fun convert(airbyteSchema: AirbyteType, stringifyObjects: Boolean): Type {
return when (airbyteSchema) {
is ObjectType -> {
Types.StructType.of(
*airbyteSchema.properties.entries
.map { (name, field) ->
if (field.nullable) {
NestedField.optional(
UUID.randomUUID().hashCode(),
name,
convert(field.type)
)
} else {
NestedField.required(
UUID.randomUUID().hashCode(),
name,
convert(field.type)
)
if (stringifyObjects) {
Types.StringType.get()
} else {
Types.StructType.of(
*airbyteSchema.properties.entries
.map { (name, field) ->
if (field.nullable) {
NestedField.optional(
UUID.randomUUID().hashCode(),
name,
convert(field.type, stringifyObjects)
)
} else {
NestedField.required(
UUID.randomUUID().hashCode(),
name,
convert(field.type, stringifyObjects)
)
}
}
}
.toTypedArray()
)
.toTypedArray()
)
}
}
is ArrayType -> {
val convert = convert(airbyteSchema.items.type)
val convert = convert(airbyteSchema.items.type, stringifyObjects)
if (airbyteSchema.items.nullable) {
return Types.ListType.ofOptional(UUID.randomUUID().hashCode(), convert)
}
@@ -73,17 +78,17 @@ class AirbyteTypeToIcebergSchema {
is TimestampTypeWithTimezone -> Types.TimestampType.withZone()
is TimestampTypeWithoutTimezone -> Types.TimestampType.withoutZone()
is UnionType -> {
// We should never get a trivial union, b/c the AirbyteType parser already handles
// this case.
// but it costs nothing to have this check here
if (airbyteSchema.options.size == 1) {
return Types.ListType.ofOptional(
UUID.randomUUID().hashCode(),
convert(airbyteSchema.options.first())
convert(airbyteSchema.options.first(), stringifyObjects)
)
}
// Iceberg doesnt support a UNION data type
return Types.ListType.ofOptional(
UUID.randomUUID().hashCode(),
Types.StringType.get()
)
// We stringify nontrivial unions
return Types.StringType.get()
}
is UnknownType -> Types.StringType.get()
}
@@ -99,12 +104,15 @@ fun ObjectType.toIcebergSchema(primaryKeys: List<List<String>>): Schema {
val id = generatedSchemaFieldId()
val isPrimaryKey = identifierFieldNames.contains(name)
val isOptional = !isPrimaryKey && field.nullable
// There's no _airbyte_data field, because we flatten the fields.
// But we should leave the _airbyte_meta field as an actual object.
val stringifyObjects = name != Meta.COLUMN_NAME_AB_META
fields.add(
NestedField.of(
id,
isOptional,
name,
icebergTypeConverter.convert(field.type),
icebergTypeConverter.convert(field.type, stringifyObjects = stringifyObjects),
),
)
if (isPrimaryKey) {

View File

@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.data.iceberg.parquet
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.AirbyteValueIdentityMapper
import io.airbyte.cdk.load.data.NullOutOfRangeIntegers
import java.math.BigInteger
/**
* Iceberg wants to write objects/unions as JSON strings, but arrays as strongly-typed. Therefore,
* we need to handle top-level ints, and arrays of ints, but should ignore ints inside objects, and
* ints inside unions.
*/
class IcebergNullOutOfRangeIntegers(
minValue: BigInteger = Long.MIN_VALUE.toBigInteger(),
maxValue: BigInteger = Long.MAX_VALUE.toBigInteger()
) :
AirbyteValueIdentityMapper(
recurseIntoObjects = false,
recurseIntoArrays = true,
recurseIntoUnions = false,
) {
private val delegate = NullOutOfRangeIntegers(minValue = minValue, maxValue = maxValue)
override fun mapInteger(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> {
return delegate.mapInteger(value, context)
}
}

View File

@@ -11,10 +11,6 @@ import io.airbyte.cdk.load.data.AirbyteValueNoopMapper
import io.airbyte.cdk.load.data.MapperPipeline
import io.airbyte.cdk.load.data.MapperPipelineFactory
import io.airbyte.cdk.load.data.MergeUnions
import io.airbyte.cdk.load.data.NullOutOfRangeIntegers
import io.airbyte.cdk.load.data.SchemalessValuesToJsonString
import io.airbyte.cdk.load.data.UnionTypeToDisjointRecord
import io.airbyte.cdk.load.data.UnionValueToDisjointRecord
class IcebergParquetPipelineFactory : MapperPipelineFactory {
override fun create(stream: DestinationStream): MapperPipeline =
@@ -22,17 +18,15 @@ class IcebergParquetPipelineFactory : MapperPipelineFactory {
stream.schema,
listOf(
MergeUnions() to AirbyteValueNoopMapper(),
AirbyteSchemaNoopMapper() to AirbyteValueDeepCoercingMapper(),
// We need to maintain the original ObjectWithNoProperties/etc type.
// For example, if a stream declares no columns, we will (correctly) recognize
// the root schema as ObjectTypeWithEmptySchema.
// If we then map that root schema to StringType, then
// AirbyteTypeToAirbyteTypeWithMeta will crash on it.
// Furthermore, in UnionTypeToDisjointRecord, this enables us to write thes fields
// as "object" rather than as "string".
AirbyteSchemaNoopMapper() to SchemalessValuesToJsonString(),
AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(),
UnionTypeToDisjointRecord() to UnionValueToDisjointRecord(),
AirbyteSchemaNoopMapper() to
AirbyteValueDeepCoercingMapper(
// See IcebergNullOutOfRangeIntegers for explanation.
recurseIntoObjects = false,
recurseIntoArrays = true,
recurseIntoUnions = false,
),
AirbyteSchemaNoopMapper() to IcebergStringifyComplexTypes(),
AirbyteSchemaNoopMapper() to IcebergNullOutOfRangeIntegers(),
),
)
}

View File

@@ -0,0 +1,62 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.data.iceberg.parquet
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.AirbyteValueIdentityMapper
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.util.serializeToString
class IcebergStringifyComplexTypes :
AirbyteValueIdentityMapper(recurseIntoObjects = false, recurseIntoUnions = false) {
override fun mapObject(
value: AirbyteValue,
schema: ObjectType,
context: Context
): Pair<AirbyteValue, Context> {
if (context.path.isEmpty()) {
return super.mapObject(value, schema, context)
}
return StringValue(value.serializeToString()) to context
}
override fun mapUnion(
value: AirbyteValue,
schema: UnionType,
context: Context
): Pair<AirbyteValue, Context> {
return StringValue(value.serializeToString()) to context
}
// These were copied out of SchemalessTypesToJsonString.
// We can't directly use that class, because it recurses into objects,
// which means it nulls out invalid values / prunes undeclared fields.
override fun mapObjectWithoutSchema(
value: AirbyteValue,
schema: ObjectTypeWithoutSchema,
context: Context
): Pair<AirbyteValue, Context> =
value.toJson().serializeToString().let(::StringValue) to context
override fun mapObjectWithEmptySchema(
value: AirbyteValue,
schema: ObjectTypeWithEmptySchema,
context: Context
): Pair<AirbyteValue, Context> =
value.toJson().serializeToString().let(::StringValue) to context
override fun mapArrayWithoutSchema(
value: AirbyteValue,
schema: ArrayTypeWithoutSchema,
context: Context
): Pair<AirbyteValue, Context> =
value.toJson().serializeToString().let(::StringValue) to context
override fun mapUnknown(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> =
value.toJson().serializeToString().let(::StringValue) to context
}

View File

@@ -26,7 +26,7 @@ class AirbyteTypeToIcebergSchemaTest {
"name" to FieldType(StringType, true),
),
)
val result = converter.convert(objectType) as Types.StructType
val result = converter.convert(objectType, stringifyObjects = false) as Types.StructType
assertEquals(2, result.fields().size)
val idField = result.field("id")
@@ -41,10 +41,24 @@ class AirbyteTypeToIcebergSchemaTest {
assertEquals(Types.StringType.get(), nameField.type())
}
@Test
fun `convert handles stringifying ObjectType`() {
val objectType =
ObjectType(
linkedMapOf(
"id" to FieldType(IntegerType, false),
"name" to FieldType(StringType, true),
),
)
val result = converter.convert(objectType, stringifyObjects = true)
assertEquals(Types.StringType.get(), result)
}
@Test
fun `convert handles ArrayType`() {
val arrayType = ArrayType(FieldType(IntegerType, false))
val result = converter.convert(arrayType) as Types.ListType
val result = converter.convert(arrayType, stringifyObjects = false) as Types.ListType
assertEquals(Types.LongType.get(), result.elementType())
assertFalse(result.isElementOptional)
@@ -53,7 +67,7 @@ class AirbyteTypeToIcebergSchemaTest {
@Test
fun `convert handles ArrayType with nullable items`() {
val arrayType = ArrayType(FieldType(StringType, true))
val result = converter.convert(arrayType) as Types.ListType
val result = converter.convert(arrayType, stringifyObjects = false) as Types.ListType
assertEquals(Types.StringType.get(), result.elementType())
assertTrue(result.isElementOptional)
@@ -61,71 +75,95 @@ class AirbyteTypeToIcebergSchemaTest {
@Test
fun `convert throws exception for ArrayTypeWithoutSchema`() {
assertThrows<IllegalArgumentException> { converter.convert(ArrayTypeWithoutSchema) }
assertThrows<IllegalArgumentException> {
converter.convert(ArrayTypeWithoutSchema, stringifyObjects = false)
}
}
@Test
fun `convert handles BooleanType`() {
assertEquals(Types.BooleanType.get(), converter.convert(BooleanType))
assertEquals(
Types.BooleanType.get(),
converter.convert(BooleanType, stringifyObjects = false)
)
}
@Test
fun `convert handles DateType`() {
assertEquals(Types.DateType.get(), converter.convert(DateType))
assertEquals(Types.DateType.get(), converter.convert(DateType, stringifyObjects = false))
}
@Test
fun `convert handles IntegerType`() {
assertEquals(Types.LongType.get(), converter.convert(IntegerType))
assertEquals(Types.LongType.get(), converter.convert(IntegerType, stringifyObjects = false))
}
@Test
fun `convert handles NumberType`() {
assertEquals(Types.DoubleType.get(), converter.convert(NumberType))
assertEquals(
Types.DoubleType.get(),
converter.convert(NumberType, stringifyObjects = false)
)
}
@Test
fun `convert throws exception for ObjectTypeWithEmptySchema`() {
assertThrows<IllegalArgumentException> { converter.convert(ObjectTypeWithEmptySchema) }
assertThrows<IllegalArgumentException> {
converter.convert(ObjectTypeWithEmptySchema, stringifyObjects = false)
}
}
@Test
fun `convert throws exception for ObjectTypeWithoutSchema`() {
assertThrows<IllegalArgumentException> { converter.convert(ObjectTypeWithoutSchema) }
assertThrows<IllegalArgumentException> {
converter.convert(ObjectTypeWithoutSchema, stringifyObjects = false)
}
}
@Test
fun `convert handles StringType`() {
assertEquals(Types.StringType.get(), converter.convert(StringType))
assertEquals(
Types.StringType.get(),
converter.convert(StringType, stringifyObjects = false)
)
}
@Test
fun `convert handles TimeTypeWithTimezone`() {
assertEquals(Types.TimeType.get(), converter.convert(TimeTypeWithTimezone))
assertEquals(
Types.TimeType.get(),
converter.convert(TimeTypeWithTimezone, stringifyObjects = false)
)
}
@Test
fun `convert handles TimeTypeWithoutTimezone`() {
assertEquals(Types.TimeType.get(), converter.convert(TimeTypeWithoutTimezone))
assertEquals(
Types.TimeType.get(),
converter.convert(TimeTypeWithoutTimezone, stringifyObjects = false)
)
}
@Test
fun `convert handles TimestampTypeWithTimezone`() {
assertEquals(Types.TimestampType.withZone(), converter.convert(TimestampTypeWithTimezone))
assertEquals(
Types.TimestampType.withZone(),
converter.convert(TimestampTypeWithTimezone, stringifyObjects = false)
)
}
@Test
fun `convert handles TimestampTypeWithoutTimezone`() {
assertEquals(
Types.TimestampType.withoutZone(),
converter.convert(TimestampTypeWithoutTimezone)
converter.convert(TimestampTypeWithoutTimezone, stringifyObjects = false)
)
}
@Test
fun `convert handles UnionType with single option`() {
val unionType = UnionType(setOf(IntegerType))
val result = converter.convert(unionType) as Types.ListType
val result = converter.convert(unionType, stringifyObjects = false) as Types.ListType
assertEquals(Types.LongType.get(), result.elementType())
assertTrue(result.isElementOptional)
@@ -134,7 +172,7 @@ class AirbyteTypeToIcebergSchemaTest {
@Test
fun `convert handles UnionType with multiple options`() {
val unionType = UnionType(setOf(StringType, IntegerType))
val result = converter.convert(unionType) as Types.ListType
val result = converter.convert(unionType, stringifyObjects = false) as Types.ListType
assertEquals(Types.StringType.get(), result.elementType())
assertTrue(result.isElementOptional)
@@ -142,7 +180,10 @@ class AirbyteTypeToIcebergSchemaTest {
@Test
fun `convert handles UnknownType`() {
assertEquals(Types.StringType.get(), converter.convert(UnknownType(Jsons.emptyObject())))
assertEquals(
Types.StringType.get(),
converter.convert(UnknownType(Jsons.emptyObject()), stringifyObjects = false)
)
}
@Test

View File

@@ -25,7 +25,12 @@ class ParquetMapperPipelineFactory : MapperPipelineFactory {
listOf(
FailOnAllUnknownTypesExceptNull() to AirbyteValueNoopMapper(),
MergeUnions() to AirbyteValueNoopMapper(),
AirbyteSchemaNoopMapper() to AirbyteValueDeepCoercingMapper(),
AirbyteSchemaNoopMapper() to
AirbyteValueDeepCoercingMapper(
recurseIntoObjects = true,
recurseIntoArrays = true,
recurseIntoUnions = true,
),
// We need to maintain the original ObjectWithNoProperties/etc type.
// For example, if a stream declares no columns, we will (correctly) recognize
// the root schema as ObjectTypeWithEmptySchema.

View File

@@ -7,6 +7,8 @@ package io.airbyte.integrations.destination.dev_null
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.UnionBehavior
import io.airbyte.cdk.load.write.Untyped
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
@@ -22,7 +24,9 @@ class DevNullBasicFunctionalityIntegrationTest :
isStreamSchemaRetroactive = false,
supportsDedup = false,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = false,
commitDataIncrementally = false,
allTypesBehavior = Untyped,

View File

@@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.17
dockerImageTag: 0.2.18
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake

View File

@@ -9,7 +9,9 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.StronglyTyped
import io.airbyte.cdk.load.write.UnionBehavior
import java.nio.file.Files
import java.util.Base64
import okhttp3.FormBody
@@ -33,12 +35,19 @@ abstract class S3DataLakeWriteTest(
isStreamSchemaRetroactive = true,
supportsDedup = true,
stringifySchemalessObjects = true,
promoteUnionToObject = true,
schematizedObjectBehavior = SchematizedNestedValueBehavior.STRINGIFY,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
unionBehavior = UnionBehavior.STRINGIFY,
preserveUndeclaredFields = false,
commitDataIncrementally = false,
supportFileTransfer = false,
envVars = envVars,
allTypesBehavior = StronglyTyped(integerCanBeLarge = false),
allTypesBehavior =
StronglyTyped(
integerCanBeLarge = false,
// we stringify objects, so nested floats stay exact
nestedFloatLosesPrecision = false
),
nullUnknownTypes = true,
nullEqualsUnset = true,
) {
@@ -92,7 +101,12 @@ class GlueWriteTest :
S3DataLakeTestUtil.getAWSSystemCredentials()
)
)
)
) {
@Test
override fun testUnions() {
super.testUnions()
}
}
class GlueAssumeRoleWriteTest :
S3DataLakeWriteTest(

View File

@@ -10,7 +10,9 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.UncoercedExpectedRecordMapper
import io.airbyte.cdk.load.write.AllTypesBehavior
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.StronglyTyped
import io.airbyte.cdk.load.write.UnionBehavior
import io.airbyte.cdk.load.write.Untyped
import java.util.concurrent.TimeUnit
import org.junit.jupiter.api.Disabled
@@ -22,7 +24,9 @@ abstract class S3V2WriteTest(
path: String,
expectedRecordMapper: ExpectedRecordMapper,
stringifySchemalessObjects: Boolean,
promoteUnionToObject: Boolean,
schematizedObjectBehavior: SchematizedNestedValueBehavior,
schematizedArrayBehavior: SchematizedNestedValueBehavior,
unionBehavior: UnionBehavior,
preserveUndeclaredFields: Boolean,
/** This is false for staging mode, and true for non-staging mode. */
commitDataIncrementally: Boolean = true,
@@ -40,7 +44,9 @@ abstract class S3V2WriteTest(
isStreamSchemaRetroactive = false,
supportsDedup = false,
stringifySchemalessObjects = stringifySchemalessObjects,
promoteUnionToObject = promoteUnionToObject,
schematizedObjectBehavior = schematizedObjectBehavior,
schematizedArrayBehavior = schematizedArrayBehavior,
unionBehavior = unionBehavior,
preserveUndeclaredFields = preserveUndeclaredFields,
commitDataIncrementally = commitDataIncrementally,
allTypesBehavior = allTypesBehavior,
@@ -67,7 +73,9 @@ class S3V2WriteTestJsonUncompressed :
S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH,
UncoercedExpectedRecordMapper,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
allTypesBehavior = Untyped,
)
@@ -77,7 +85,9 @@ class S3V2WriteTestJsonRootLevelFlattening :
S3V2TestUtils.JSON_ROOT_LEVEL_FLATTENING_CONFIG_PATH,
UncoercedExpectedRecordMapper,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
allTypesBehavior = Untyped,
)
@@ -88,7 +98,9 @@ class S3V2WriteTestJsonStaging :
S3V2TestUtils.JSON_STAGING_CONFIG_PATH,
UncoercedExpectedRecordMapper,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
allTypesBehavior = Untyped,
commitDataIncrementally = false
@@ -103,7 +115,9 @@ class S3V2WriteTestJsonGzip :
S3V2TestUtils.JSON_GZIP_CONFIG_PATH,
UncoercedExpectedRecordMapper,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
allTypesBehavior = Untyped,
)
@@ -113,7 +127,9 @@ class S3V2WriteTestCsvUncompressed :
S3V2TestUtils.CSV_UNCOMPRESSED_CONFIG_PATH,
UncoercedExpectedRecordMapper,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
allTypesBehavior = Untyped,
) {
@@ -128,7 +144,9 @@ class S3V2WriteTestCsvRootLevelFlattening :
S3V2TestUtils.CSV_ROOT_LEVEL_FLATTENING_CONFIG_PATH,
UncoercedExpectedRecordMapper,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = false,
allTypesBehavior = Untyped,
nullEqualsUnset =
@@ -140,7 +158,9 @@ class S3V2WriteTestCsvGzip :
S3V2TestUtils.CSV_GZIP_CONFIG_PATH,
UncoercedExpectedRecordMapper,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
allTypesBehavior = Untyped,
)
@@ -150,7 +170,9 @@ class S3V2WriteTestAvroUncompressed :
S3V2TestUtils.AVRO_UNCOMPRESSED_CONFIG_PATH,
AvroExpectedRecordMapper,
stringifySchemalessObjects = true,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE,
schematizedArrayBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE,
preserveUndeclaredFields = false,
allTypesBehavior = StronglyTyped(integerCanBeLarge = false),
nullEqualsUnset = true,
@@ -167,7 +189,9 @@ class S3V2WriteTestAvroBzip2 :
S3V2TestUtils.AVRO_BZIP2_CONFIG_PATH,
AvroExpectedRecordMapper,
stringifySchemalessObjects = true,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE,
schematizedArrayBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE,
preserveUndeclaredFields = false,
allTypesBehavior = StronglyTyped(integerCanBeLarge = false),
nullEqualsUnset = true,
@@ -179,7 +203,9 @@ class S3V2WriteTestParquetUncompressed :
S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH,
AvroExpectedRecordMapper,
stringifySchemalessObjects = true,
promoteUnionToObject = true,
unionBehavior = UnionBehavior.PROMOTE_TO_OBJECT,
schematizedObjectBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE,
schematizedArrayBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE,
preserveUndeclaredFields = false,
allTypesBehavior = StronglyTyped(integerCanBeLarge = false),
nullEqualsUnset = true,
@@ -191,7 +217,9 @@ class S3V2WriteTestParquetSnappy :
S3V2TestUtils.PARQUET_SNAPPY_CONFIG_PATH,
AvroExpectedRecordMapper,
stringifySchemalessObjects = true,
promoteUnionToObject = true,
unionBehavior = UnionBehavior.PROMOTE_TO_OBJECT,
schematizedObjectBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE,
schematizedArrayBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE,
preserveUndeclaredFields = false,
allTypesBehavior = StronglyTyped(integerCanBeLarge = false),
nullEqualsUnset = true,
@@ -204,7 +232,9 @@ class S3V2WriteTestEndpointURL :
// this test is writing to CSV
UncoercedExpectedRecordMapper,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = false,
allTypesBehavior = Untyped,
nullEqualsUnset = true,
@@ -216,7 +246,9 @@ class S3V2AmbiguousFilepath :
// this test is writing to CSV
UncoercedExpectedRecordMapper,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
allTypesBehavior = Untyped,
)

View File

@@ -17,6 +17,7 @@ for more information.
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------|
| 0.2.18 | 2025-01-15 | [\#51042](https://github.com/airbytehq/airbyte/pull/51042) | Write structs as JSON strings instead of Iceberg structs. |
| 0.2.17 | 2025-01-14 | [\#51542](https://github.com/airbytehq/airbyte/pull/51542) | New identifier fields should be marked as required. |
| 0.2.16 | 2025-01-14 | [\#51538](https://github.com/airbytehq/airbyte/pull/51538) | Update identifier fields if incoming fields are different than existing ones |
| 0.2.15 | 2025-01-14 | [\#51530](https://github.com/airbytehq/airbyte/pull/51530) | Set AWS region for S3 bucket for nessie catalog |