feat: identify and update iceberg schema based on the incoming schema (#50413)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: Francis Genet <francis.genet@airbyte.io>
This commit is contained in:
committed by
GitHub
parent
d5be4b75f3
commit
a33a02bcae
@@ -26,7 +26,7 @@ data:
|
||||
alias: airbyte-connector-testing-secret-store
|
||||
connectorType: destination
|
||||
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
|
||||
dockerImageTag: 0.2.13
|
||||
dockerImageTag: 0.2.14
|
||||
dockerRepository: airbyte/destination-s3-data-lake
|
||||
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
|
||||
githubIssueLabel: destination-s3-data-lake
|
||||
|
||||
@@ -0,0 +1,138 @@
|
||||
/*
|
||||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.s3_data_lake
|
||||
|
||||
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.PARENT_CHILD_SEPARATOR
|
||||
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.splitIntoParentAndLeaf
|
||||
import jakarta.inject.Singleton
|
||||
import org.apache.iceberg.Schema
|
||||
import org.apache.iceberg.Table
|
||||
import org.apache.iceberg.UpdateSchema
|
||||
import org.apache.iceberg.types.Type
|
||||
import org.apache.iceberg.types.Type.PrimitiveType
|
||||
|
||||
/**
|
||||
* Applies schema changes to an Iceberg [Table], including nested columns (struct fields).
|
||||
*
|
||||
* Supports:
|
||||
* - Adding new columns (possibly nested).
|
||||
* - Removing top-level columns.
|
||||
* - Updating types (finding a supertype).
|
||||
* - Marking columns newly optional.
|
||||
*
|
||||
* @property comparator Used to compare schemas and find differences.
|
||||
* @property superTypeFinder Used to find a common supertype when data types differ.
|
||||
*/
|
||||
@Singleton
|
||||
class S3DataLakeTableSynchronizer(
|
||||
private val comparator: S3DataLakeTypesComparator,
|
||||
private val superTypeFinder: S3DataLakeSuperTypeFinder,
|
||||
) {
|
||||
|
||||
/**
|
||||
* Compare [table]'s current schema with [incomingSchema] and apply changes as needed:
|
||||
*
|
||||
* 1. Remove columns that are no longer in the incoming schema.
|
||||
* 2. Update column types to a common supertype if they differ.
|
||||
* 3. Mark columns newly optional if changed from required.
|
||||
* 4. Add columns that don't exist in the existing schema.
|
||||
*
|
||||
* @param table The Iceberg table to update.
|
||||
* @param incomingSchema The schema describing incoming data.
|
||||
* @return The updated [Schema], after changes have been applied and committed.
|
||||
*/
|
||||
fun applySchemaChanges(table: Table, incomingSchema: Schema): Schema {
|
||||
val existingSchema = table.schema()
|
||||
val diff = comparator.compareSchemas(incomingSchema, existingSchema)
|
||||
|
||||
if (!diff.hasChanges()) {
|
||||
// If no differences, return the existing schema as-is.
|
||||
return existingSchema
|
||||
}
|
||||
|
||||
val update: UpdateSchema = table.updateSchema().allowIncompatibleChanges()
|
||||
|
||||
// 1) Remove columns that no longer exist in the incoming schema
|
||||
diff.removedColumns.forEach { removedColumn -> update.deleteColumn(removedColumn) }
|
||||
|
||||
// 2) Update types => find a supertype for each changed column
|
||||
diff.updatedDataTypes.forEach { columnName ->
|
||||
val existingField =
|
||||
existingSchema.findField(columnName)
|
||||
?: error("Field \"$columnName\" not found in the existing schema!")
|
||||
val incomingField =
|
||||
incomingSchema.findField(columnName)
|
||||
?: error("Field \"$columnName\" not found in the incoming schema!")
|
||||
|
||||
val superType: Type =
|
||||
superTypeFinder.findSuperType(
|
||||
existingType = existingField.type(),
|
||||
incomingType = incomingField.type(),
|
||||
columnName = columnName
|
||||
)
|
||||
require(superType is PrimitiveType) {
|
||||
"Currently only primitive type updates are supported. Attempted type: $superType"
|
||||
}
|
||||
|
||||
// Update the column to the supertype
|
||||
update.updateColumn(columnName, superType)
|
||||
}
|
||||
|
||||
// 3) Mark columns newly optional
|
||||
diff.newlyOptionalColumns.forEach { columnName -> update.makeColumnOptional(columnName) }
|
||||
|
||||
// 4) Add new columns, sorted by nesting depth (so that parents are created before children)
|
||||
val sortedNewColumns =
|
||||
diff.newColumns.sortedBy { it.count { char -> char == PARENT_CHILD_SEPARATOR } }
|
||||
|
||||
for (newColumnFqn in sortedNewColumns) {
|
||||
val (parentPath, leafName) = splitIntoParentAndLeaf(newColumnFqn)
|
||||
|
||||
// Only 1-level nesting is supported
|
||||
if (parentPath.count { it == PARENT_CHILD_SEPARATOR } > 0) {
|
||||
throw IllegalArgumentException(
|
||||
"Adding nested columns more than 1 level deep is not supported: $newColumnFqn"
|
||||
)
|
||||
}
|
||||
|
||||
// Locate the appropriate incoming field
|
||||
val incomingField =
|
||||
if (parentPath.isEmpty()) {
|
||||
// Top-level column
|
||||
incomingSchema.findField(leafName)
|
||||
?: error("Field \"$leafName\" not found in the incoming schema.")
|
||||
} else {
|
||||
// 1-level nested column: "structFieldName~childField"
|
||||
val parentField =
|
||||
incomingSchema.findField(parentPath)
|
||||
?: error(
|
||||
"Parent field \"$parentPath\" not found in the incoming schema."
|
||||
)
|
||||
|
||||
require(parentField.type().isStructType) {
|
||||
"Attempting to add a sub-field to a non-struct parent field: $parentPath"
|
||||
}
|
||||
|
||||
parentField.type().asStructType().asSchema().findField(leafName)
|
||||
?: error(
|
||||
"Sub-field \"$leafName\" not found in the schema under \"$parentPath\"."
|
||||
)
|
||||
}
|
||||
|
||||
// Add the column via the Iceberg API
|
||||
if (parentPath.isEmpty()) {
|
||||
update.addColumn(null, leafName, incomingField.type())
|
||||
} else {
|
||||
update.addColumn(parentPath, leafName, incomingField.type())
|
||||
}
|
||||
}
|
||||
|
||||
// Commit all changes and refresh the table schema
|
||||
update.commit()
|
||||
table.refresh()
|
||||
|
||||
return table.schema()
|
||||
}
|
||||
}
|
||||
@@ -17,23 +17,30 @@ import org.apache.iceberg.Schema
|
||||
class S3DataLakeWriter(
|
||||
private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory,
|
||||
private val icebergConfiguration: S3DataLakeConfiguration,
|
||||
private val s3DataLakeUtil: S3DataLakeUtil
|
||||
private val s3DataLakeUtil: S3DataLakeUtil,
|
||||
private val s3DataLakeTableSynchronizer: S3DataLakeTableSynchronizer
|
||||
) : DestinationWriter {
|
||||
|
||||
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
|
||||
val properties = s3DataLakeUtil.toCatalogProperties(config = icebergConfiguration)
|
||||
val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
|
||||
val pipeline = IcebergParquetPipelineFactory().create(stream)
|
||||
val schema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline)
|
||||
val incomingSchema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline)
|
||||
val table =
|
||||
s3DataLakeUtil.createTable(
|
||||
streamDescriptor = stream.descriptor,
|
||||
catalog = catalog,
|
||||
schema = schema,
|
||||
schema = incomingSchema,
|
||||
properties = properties
|
||||
)
|
||||
|
||||
existingAndIncomingSchemaShouldBeSame(catalogSchema = schema, tableSchema = table.schema())
|
||||
// TODO : See if the identifier fields are allowed to change
|
||||
identifierFieldsShouldNotChange(
|
||||
incomingSchema = incomingSchema,
|
||||
existingSchema = table.schema()
|
||||
)
|
||||
|
||||
s3DataLakeTableSynchronizer.applySchemaChanges(table, incomingSchema)
|
||||
|
||||
return S3DataLakeStreamLoader(
|
||||
stream = stream,
|
||||
@@ -46,38 +53,9 @@ class S3DataLakeWriter(
|
||||
)
|
||||
}
|
||||
|
||||
private fun existingAndIncomingSchemaShouldBeSame(catalogSchema: Schema, tableSchema: Schema) {
|
||||
val incomingFieldSet =
|
||||
catalogSchema
|
||||
.asStruct()
|
||||
.fields()
|
||||
.map { Triple(it.name(), it.type().typeId(), it.isOptional) }
|
||||
.toSet()
|
||||
val existingFieldSet =
|
||||
tableSchema
|
||||
.asStruct()
|
||||
.fields()
|
||||
.map { Triple(it.name(), it.type().typeId(), it.isOptional) }
|
||||
.toSet()
|
||||
|
||||
val missingInIncoming = existingFieldSet - incomingFieldSet
|
||||
val extraInIncoming = incomingFieldSet - existingFieldSet
|
||||
|
||||
if (missingInIncoming.isNotEmpty() || extraInIncoming.isNotEmpty()) {
|
||||
val errorMessage = buildString {
|
||||
append("Table schema fields are different than catalog schema:\n")
|
||||
if (missingInIncoming.isNotEmpty()) {
|
||||
append("Fields missing in incoming schema: $missingInIncoming\n")
|
||||
}
|
||||
if (extraInIncoming.isNotEmpty()) {
|
||||
append("Extra fields in incoming schema: $extraInIncoming\n")
|
||||
}
|
||||
}
|
||||
throw IllegalArgumentException(errorMessage)
|
||||
}
|
||||
|
||||
val incomingIdentifierFields = catalogSchema.identifierFieldNames()
|
||||
val existingIdentifierFieldNames = tableSchema.identifierFieldNames()
|
||||
private fun identifierFieldsShouldNotChange(incomingSchema: Schema, existingSchema: Schema) {
|
||||
val incomingIdentifierFields = incomingSchema.identifierFieldNames()
|
||||
val existingIdentifierFieldNames = existingSchema.identifierFieldNames()
|
||||
|
||||
val identifiersMissingInIncoming = existingIdentifierFieldNames - incomingIdentifierFields
|
||||
val identifiersExtraInIncoming = incomingIdentifierFields - existingIdentifierFieldNames
|
||||
|
||||
@@ -0,0 +1,301 @@
|
||||
/*
|
||||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.s3_data_lake
|
||||
|
||||
import io.mockk.every
|
||||
import io.mockk.just
|
||||
import io.mockk.mockk
|
||||
import io.mockk.runs
|
||||
import io.mockk.spyk
|
||||
import io.mockk.verify
|
||||
import org.apache.iceberg.Schema
|
||||
import org.apache.iceberg.Table
|
||||
import org.apache.iceberg.UpdateSchema
|
||||
import org.apache.iceberg.types.Type.PrimitiveType
|
||||
import org.apache.iceberg.types.Types
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
/**
|
||||
* Tests for [S3DataLakeTableSynchronizer].
|
||||
*
|
||||
* We use a mocked [Table] and [UpdateSchema] to verify that the right calls are made based on the
|
||||
* computed [S3DataLakeTypesComparator.ColumnDiff].
|
||||
*/
|
||||
class S3DataLakeTableSynchronizerTest {
|
||||
|
||||
// Mocks
|
||||
private lateinit var mockTable: Table
|
||||
private lateinit var mockUpdateSchema: UpdateSchema
|
||||
|
||||
// Collaborators under test
|
||||
private val comparator = spyk(S3DataLakeTypesComparator())
|
||||
private val superTypeFinder = spyk(S3DataLakeSuperTypeFinder(comparator))
|
||||
private val synchronizer = S3DataLakeTableSynchronizer(comparator, superTypeFinder)
|
||||
|
||||
@BeforeEach
|
||||
fun setUp() {
|
||||
// Prepare the mocks before each test
|
||||
mockTable = mockk(relaxed = true)
|
||||
mockUpdateSchema = mockk(relaxed = true)
|
||||
|
||||
// By default, let table.schema() return an empty schema. Tests can override this as needed.
|
||||
every { mockTable.schema() } returns Schema(listOf())
|
||||
|
||||
// Table.updateSchema() should return the mock UpdateSchema
|
||||
every { mockTable.updateSchema().allowIncompatibleChanges() } returns mockUpdateSchema
|
||||
|
||||
// No-op for the commit call unless specifically tested for. We'll verify calls later.
|
||||
every { mockUpdateSchema.commit() } just runs
|
||||
|
||||
// Similarly for refresh.
|
||||
every { mockTable.refresh() } just runs
|
||||
}
|
||||
|
||||
/** Helper to build a schema with [Types.NestedField]s. */
|
||||
private fun buildSchema(vararg fields: Types.NestedField): Schema {
|
||||
return Schema(fields.toList())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test no changes - should do nothing`() {
|
||||
// The existing schema is the same as incoming => no diffs
|
||||
val existingSchema =
|
||||
buildSchema(Types.NestedField.required(1, "id", Types.IntegerType.get()))
|
||||
val incomingSchema =
|
||||
buildSchema(Types.NestedField.required(1, "id", Types.IntegerType.get()))
|
||||
|
||||
every { mockTable.schema() } returns existingSchema
|
||||
// The comparator will see no changes
|
||||
every { comparator.compareSchemas(incomingSchema, existingSchema) } answers
|
||||
{
|
||||
S3DataLakeTypesComparator.ColumnDiff()
|
||||
}
|
||||
|
||||
val result = synchronizer.applySchemaChanges(mockTable, incomingSchema)
|
||||
|
||||
// We expect the original schema to be returned
|
||||
assertThat(result).isSameAs(existingSchema)
|
||||
|
||||
// Verify that no calls to updateSchema() manipulation were made
|
||||
verify(exactly = 0) { mockUpdateSchema.deleteColumn(any()) }
|
||||
verify(exactly = 0) { mockUpdateSchema.updateColumn(any(), any<PrimitiveType>()) }
|
||||
verify(exactly = 0) { mockUpdateSchema.makeColumnOptional(any()) }
|
||||
verify(exactly = 0) { mockUpdateSchema.addColumn(any<String>(), any<String>(), any()) }
|
||||
verify(exactly = 0) { mockUpdateSchema.commit() }
|
||||
verify(exactly = 0) { mockTable.refresh() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test remove columns`() {
|
||||
val existingSchema =
|
||||
buildSchema(Types.NestedField.optional(1, "remove_me", Types.StringType.get()))
|
||||
val incomingSchema = buildSchema() // empty => remove everything
|
||||
|
||||
every { mockTable.schema() } returns existingSchema
|
||||
|
||||
val result = synchronizer.applySchemaChanges(mockTable, incomingSchema)
|
||||
|
||||
// The result is a new schema after changes, but we can only verify calls on the mock
|
||||
// Here we expect remove_me to be deleted.
|
||||
verify { mockUpdateSchema.deleteColumn("remove_me") }
|
||||
verify { mockUpdateSchema.commit() }
|
||||
verify { mockTable.refresh() }
|
||||
|
||||
// The final returned schema is the table's schema after refresh
|
||||
// Since we aren't actually applying changes, just assert that it's whatever the mock
|
||||
// returns
|
||||
assertThat(result).isEqualTo(mockTable.schema())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test update data type to supertype`() {
|
||||
val existingSchema =
|
||||
buildSchema(Types.NestedField.optional(2, "age", Types.IntegerType.get()))
|
||||
val incomingSchema = buildSchema(Types.NestedField.optional(2, "age", Types.LongType.get()))
|
||||
|
||||
every { mockTable.schema() } returns existingSchema
|
||||
|
||||
// Apply changes
|
||||
synchronizer.applySchemaChanges(mockTable, incomingSchema)
|
||||
|
||||
// Verify that "age" is updated to LONG
|
||||
verify { mockUpdateSchema.updateColumn("age", Types.LongType.get()) }
|
||||
// And that changes are committed
|
||||
verify { mockUpdateSchema.commit() }
|
||||
verify { mockTable.refresh() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test newly optional columns`() {
|
||||
val existingSchema =
|
||||
buildSchema(Types.NestedField.required(3, "make_optional", Types.StringType.get()))
|
||||
val incomingSchema =
|
||||
buildSchema(Types.NestedField.optional(3, "make_optional", Types.StringType.get()))
|
||||
|
||||
every { mockTable.schema() } returns existingSchema
|
||||
|
||||
synchronizer.applySchemaChanges(mockTable, incomingSchema)
|
||||
|
||||
// We expect makeColumnOptional("make_optional") to be called
|
||||
verify { mockUpdateSchema.makeColumnOptional("make_optional") }
|
||||
verify { mockUpdateSchema.commit() }
|
||||
verify { mockTable.refresh() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test add new columns - top level`() {
|
||||
val existingSchema =
|
||||
buildSchema(Types.NestedField.required(1, "id", Types.IntegerType.get()))
|
||||
val incomingSchema =
|
||||
buildSchema(
|
||||
Types.NestedField.required(1, "id", Types.IntegerType.get()),
|
||||
Types.NestedField.optional(2, "new_col", Types.StringType.get()),
|
||||
)
|
||||
|
||||
every { mockTable.schema() } returns existingSchema
|
||||
|
||||
synchronizer.applySchemaChanges(mockTable, incomingSchema)
|
||||
|
||||
verify { mockUpdateSchema.addColumn(null, "new_col", Types.StringType.get()) }
|
||||
verify { mockUpdateSchema.commit() }
|
||||
verify { mockTable.refresh() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test add new columns - nested one-level`() {
|
||||
val existingSchema =
|
||||
buildSchema(
|
||||
Types.NestedField.required(
|
||||
1,
|
||||
"user_info",
|
||||
Types.StructType.of(
|
||||
Types.NestedField.required(2, "nested_id", Types.IntegerType.get())
|
||||
)
|
||||
)
|
||||
)
|
||||
val incomingSchema =
|
||||
buildSchema(
|
||||
Types.NestedField.required(
|
||||
1,
|
||||
"user_info",
|
||||
Types.StructType.of(
|
||||
Types.NestedField.required(2, "nested_id", Types.IntegerType.get()),
|
||||
// new subfield
|
||||
Types.NestedField.optional(3, "nested_name", Types.StringType.get()),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
every { mockTable.schema() } returns existingSchema
|
||||
|
||||
// For the newly added leaf "nested_name"
|
||||
// We'll also ensure that the subfield is found
|
||||
val userInfoStruct = incomingSchema.findField("user_info")!!.type().asStructType()
|
||||
val nestedNameField = userInfoStruct.asSchema().findField("nested_name")
|
||||
assertThat(nestedNameField).isNotNull // Just a sanity check in the test
|
||||
|
||||
synchronizer.applySchemaChanges(mockTable, incomingSchema)
|
||||
|
||||
verify { mockUpdateSchema.addColumn("user_info", "nested_name", Types.StringType.get()) }
|
||||
verify { mockUpdateSchema.commit() }
|
||||
verify { mockTable.refresh() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test add new columns - more than one-level nesting throws`() {
|
||||
// e.g. "outer~inner~leaf" is two levels
|
||||
val existingSchema = buildSchema()
|
||||
val incomingSchema = buildSchema() // Not too relevant, since we expect an exception
|
||||
|
||||
every { mockTable.schema() } returns existingSchema
|
||||
val diff =
|
||||
S3DataLakeTypesComparator.ColumnDiff(newColumns = mutableListOf("outer~inner~leaf"))
|
||||
every { comparator.compareSchemas(incomingSchema, existingSchema) } returns diff
|
||||
|
||||
assertThatThrownBy { synchronizer.applySchemaChanges(mockTable, incomingSchema) }
|
||||
.isInstanceOf(IllegalArgumentException::class.java)
|
||||
.hasMessageContaining("Adding nested columns more than 1 level deep is not supported")
|
||||
|
||||
// No calls to commit
|
||||
verify(exactly = 0) { mockUpdateSchema.commit() }
|
||||
verify(exactly = 0) { mockTable.refresh() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test update with non-primitive supertype throws`() {
|
||||
// Suppose the comparator says that "complex_col" has an updated data type
|
||||
// but the superTypeFinder returns a struct (non-primitive).
|
||||
val existingSchema =
|
||||
buildSchema(Types.NestedField.required(10, "complex_col", Types.StructType.of()))
|
||||
val incomingSchema =
|
||||
buildSchema(Types.NestedField.required(10, "complex_col", Types.StructType.of()))
|
||||
|
||||
every { mockTable.schema() } returns existingSchema
|
||||
val diff =
|
||||
S3DataLakeTypesComparator.ColumnDiff(updatedDataTypes = mutableListOf("complex_col"))
|
||||
every { comparator.compareSchemas(incomingSchema, existingSchema) } returns diff
|
||||
|
||||
// Let superTypeFinder return a struct type
|
||||
val structType =
|
||||
Types.StructType.of(Types.NestedField.optional(1, "field", Types.StringType.get()))
|
||||
every { superTypeFinder.findSuperType(any(), any(), "complex_col") } returns structType
|
||||
|
||||
assertThatThrownBy { synchronizer.applySchemaChanges(mockTable, incomingSchema) }
|
||||
.isInstanceOf(IllegalArgumentException::class.java)
|
||||
.hasMessageContaining("Currently only primitive type updates are supported.")
|
||||
|
||||
// No updates or commits
|
||||
verify(exactly = 0) { mockUpdateSchema.updateColumn(any(), any<PrimitiveType>()) }
|
||||
verify(exactly = 0) { mockUpdateSchema.commit() }
|
||||
verify(exactly = 0) { mockTable.refresh() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test multiple operations in one pass`() {
|
||||
val existingSchema =
|
||||
buildSchema(
|
||||
Types.NestedField.required(1, "id", Types.IntegerType.get()),
|
||||
Types.NestedField.optional(2, "remove_me", Types.StringType.get()),
|
||||
Types.NestedField.required(3, "make_optional", Types.IntegerType.get()),
|
||||
Types.NestedField.required(4, "upgrade_int", Types.IntegerType.get())
|
||||
)
|
||||
val incomingSchema =
|
||||
buildSchema(
|
||||
// "remove_me" is gone -> removal
|
||||
Types.NestedField.required(1, "id", Types.IntegerType.get()),
|
||||
// make_optional -> newly optional
|
||||
Types.NestedField.optional(3, "make_optional", Types.IntegerType.get()),
|
||||
// upgrade_int -> changed to long
|
||||
Types.NestedField.required(4, "upgrade_int", Types.LongType.get()),
|
||||
// brand_new -> new column
|
||||
Types.NestedField.optional(5, "brand_new", Types.FloatType.get())
|
||||
)
|
||||
|
||||
every { mockTable.schema() } returns existingSchema
|
||||
|
||||
// Suppose superTypeFinder says int->long is valid
|
||||
every {
|
||||
superTypeFinder.findSuperType(
|
||||
Types.IntegerType.get(),
|
||||
Types.LongType.get(),
|
||||
"upgrade_int"
|
||||
)
|
||||
} returns Types.LongType.get()
|
||||
|
||||
synchronizer.applySchemaChanges(mockTable, incomingSchema)
|
||||
|
||||
// Verify calls, in any order
|
||||
verify { mockUpdateSchema.deleteColumn("remove_me") }
|
||||
verify { mockUpdateSchema.updateColumn("upgrade_int", Types.LongType.get()) }
|
||||
verify { mockUpdateSchema.makeColumnOptional("make_optional") }
|
||||
verify { mockUpdateSchema.addColumn(null, "brand_new", Types.FloatType.get()) }
|
||||
|
||||
verify { mockUpdateSchema.commit() }
|
||||
verify { mockTable.refresh() }
|
||||
}
|
||||
}
|
||||
@@ -26,10 +26,15 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
|
||||
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory
|
||||
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil
|
||||
import io.mockk.every
|
||||
import io.mockk.just
|
||||
import io.mockk.mockk
|
||||
import io.mockk.runs
|
||||
import io.mockk.verify
|
||||
import org.apache.iceberg.Schema
|
||||
import org.apache.iceberg.Table
|
||||
import org.apache.iceberg.UpdateSchema
|
||||
import org.apache.iceberg.catalog.Catalog
|
||||
import org.apache.iceberg.types.Type.PrimitiveType
|
||||
import org.apache.iceberg.types.Types
|
||||
import org.junit.jupiter.api.Assertions.assertNotNull
|
||||
import org.junit.jupiter.api.Assertions.assertTrue
|
||||
@@ -50,7 +55,7 @@ internal class S3DataLakeWriterTest {
|
||||
linkedMapOf(
|
||||
"id" to FieldType(IntegerType, nullable = true),
|
||||
"name" to FieldType(StringType, nullable = true),
|
||||
)
|
||||
),
|
||||
),
|
||||
generationId = 1,
|
||||
minimumGenerationId = 1,
|
||||
@@ -80,18 +85,18 @@ internal class S3DataLakeWriterTest {
|
||||
10,
|
||||
false,
|
||||
"change",
|
||||
Types.StringType.get()
|
||||
Types.StringType.get(),
|
||||
),
|
||||
Types.NestedField.of(
|
||||
11,
|
||||
false,
|
||||
"reason",
|
||||
Types.StringType.get()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
Types.StringType.get(),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()),
|
||||
)
|
||||
@@ -134,6 +139,11 @@ internal class S3DataLakeWriterTest {
|
||||
s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory,
|
||||
icebergConfiguration = icebergConfiguration,
|
||||
s3DataLakeUtil = s3DataLakeUtil,
|
||||
s3DataLakeTableSynchronizer =
|
||||
S3DataLakeTableSynchronizer(
|
||||
S3DataLakeTypesComparator(),
|
||||
S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()),
|
||||
)
|
||||
)
|
||||
val streamLoader = s3DataLakeWriter.createStreamLoader(stream = stream)
|
||||
assertNotNull(streamLoader)
|
||||
@@ -151,7 +161,7 @@ internal class S3DataLakeWriterTest {
|
||||
linkedMapOf(
|
||||
"id" to FieldType(IntegerType, nullable = true),
|
||||
"name" to FieldType(StringType, nullable = true),
|
||||
)
|
||||
),
|
||||
),
|
||||
generationId = 1,
|
||||
minimumGenerationId = 1,
|
||||
@@ -184,6 +194,23 @@ internal class S3DataLakeWriterTest {
|
||||
}
|
||||
val catalog: Catalog = mockk()
|
||||
val table: Table = mockk { every { schema() } returns icebergSchema }
|
||||
val updateSchema: UpdateSchema = mockk()
|
||||
every { table.updateSchema().allowIncompatibleChanges() } returns updateSchema
|
||||
every {
|
||||
updateSchema.updateColumn(
|
||||
any<String>(),
|
||||
any<PrimitiveType>(),
|
||||
)
|
||||
} returns updateSchema
|
||||
every {
|
||||
updateSchema.addColumn(
|
||||
any<String>(),
|
||||
any<String>(),
|
||||
any<PrimitiveType>(),
|
||||
)
|
||||
} returns updateSchema
|
||||
every { updateSchema.commit() } just runs
|
||||
every { table.refresh() } just runs
|
||||
val s3DataLakeUtil: S3DataLakeUtil = mockk {
|
||||
every { createCatalog(any(), any()) } returns catalog
|
||||
every { createTable(any(), any(), any(), any()) } returns table
|
||||
@@ -199,14 +226,24 @@ internal class S3DataLakeWriterTest {
|
||||
s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory,
|
||||
icebergConfiguration = icebergConfiguration,
|
||||
s3DataLakeUtil = s3DataLakeUtil,
|
||||
s3DataLakeTableSynchronizer =
|
||||
S3DataLakeTableSynchronizer(
|
||||
S3DataLakeTypesComparator(),
|
||||
S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()),
|
||||
),
|
||||
)
|
||||
val e =
|
||||
assertThrows<IllegalArgumentException> {
|
||||
s3DataLakeWriter.createStreamLoader(stream = stream)
|
||||
}
|
||||
assertTrue(
|
||||
e.message?.startsWith("Table schema fields are different than catalog schema") ?: false
|
||||
)
|
||||
s3DataLakeWriter.createStreamLoader(stream = stream)
|
||||
|
||||
verify(exactly = 0) { updateSchema.deleteColumn(any()) }
|
||||
verify(exactly = 0) { updateSchema.updateColumn(any(), any<PrimitiveType>()) }
|
||||
verify(exactly = 0) { updateSchema.makeColumnOptional(any()) }
|
||||
verify { updateSchema.addColumn(null, "_airbyte_raw_id", Types.StringType.get()) }
|
||||
verify { updateSchema.addColumn(null, "id", Types.LongType.get()) }
|
||||
verify { updateSchema.addColumn(null, "_airbyte_meta", any()) }
|
||||
verify { updateSchema.addColumn(null, "_airbyte_generation_id", Types.LongType.get()) }
|
||||
verify { updateSchema.addColumn(null, "id", Types.LongType.get()) }
|
||||
verify { updateSchema.commit() }
|
||||
verify { table.refresh() }
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -222,7 +259,7 @@ internal class S3DataLakeWriterTest {
|
||||
linkedMapOf(
|
||||
"id" to FieldType(IntegerType, nullable = false),
|
||||
"name" to FieldType(StringType, nullable = true),
|
||||
)
|
||||
),
|
||||
),
|
||||
generationId = 1,
|
||||
minimumGenerationId = 1,
|
||||
@@ -252,18 +289,18 @@ internal class S3DataLakeWriterTest {
|
||||
10,
|
||||
false,
|
||||
"change",
|
||||
Types.StringType.get()
|
||||
Types.StringType.get(),
|
||||
),
|
||||
Types.NestedField.of(
|
||||
11,
|
||||
false,
|
||||
"reason",
|
||||
Types.StringType.get()
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
Types.StringType.get(),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()),
|
||||
)
|
||||
@@ -306,6 +343,11 @@ internal class S3DataLakeWriterTest {
|
||||
s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory,
|
||||
icebergConfiguration = icebergConfiguration,
|
||||
s3DataLakeUtil = s3DataLakeUtil,
|
||||
s3DataLakeTableSynchronizer =
|
||||
S3DataLakeTableSynchronizer(
|
||||
S3DataLakeTypesComparator(),
|
||||
S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()),
|
||||
),
|
||||
)
|
||||
val e =
|
||||
assertThrows<IllegalArgumentException> {
|
||||
|
||||
@@ -15,15 +15,16 @@ for more information.
|
||||
<details>
|
||||
<summary>Expand to review</summary>
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------|
|
||||
| 0.2.13 | 2025-01-14 | [\#50412](https://github.com/airbytehq/airbyte/pull/50412) | Implement logic to determine super types between iceberg types |
|
||||
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
|
||||
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
|
||||
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
|
||||
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
|
||||
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
|
||||
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
|
||||
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------|
|
||||
| 0.2.14 | 2025-01-14 | [\#50413](https://github.com/airbytehq/airbyte/pull/50413) | Update existing table schema based on the incoming schema |
|
||||
| 0.2.13 | 2025-01-14 | [\#50412](https://github.com/airbytehq/airbyte/pull/50412) | Implement logic to determine super types between iceberg types |
|
||||
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
|
||||
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
|
||||
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
|
||||
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
|
||||
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
|
||||
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
|
||||
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |
|
||||
|
||||
</details>
|
||||
|
||||
Reference in New Issue
Block a user