Destination Bigquery / bulk load CDK: validate catalog at startup (#61700)
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
|
||||
package io.airbyte.cdk.load.command
|
||||
|
||||
import io.airbyte.cdk.ConfigErrorException
|
||||
import io.airbyte.cdk.Operation
|
||||
import io.airbyte.cdk.load.config.CHECK_STREAM_NAMESPACE
|
||||
import io.airbyte.cdk.load.data.FieldType
|
||||
@@ -35,6 +36,16 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
|
||||
"Catalog must have at least one stream: check that files are in the correct location."
|
||||
)
|
||||
}
|
||||
|
||||
val duplicateStreamDescriptors =
|
||||
streams.groupingBy { it.mappedDescriptor }.eachCount().filter { it.value > 1 }.keys
|
||||
if (duplicateStreamDescriptors.isNotEmpty()) {
|
||||
throw ConfigErrorException(
|
||||
"Some streams appeared multiple times: ${duplicateStreamDescriptors.map { it.toPrettyString() }}"
|
||||
)
|
||||
}
|
||||
throwIfInvalidDedupConfig()
|
||||
|
||||
log.info { "Destination catalog initialized: $streams" }
|
||||
}
|
||||
|
||||
@@ -53,6 +64,31 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
|
||||
ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() })
|
||||
|
||||
fun size(): Int = streams.size
|
||||
|
||||
internal fun throwIfInvalidDedupConfig() {
|
||||
streams.forEach { stream ->
|
||||
if (stream.importType is Dedupe) {
|
||||
stream.importType.primaryKey.forEach { pk ->
|
||||
if (pk.isNotEmpty()) {
|
||||
val firstPkElement = pk.first()
|
||||
if (!stream.schema.asColumns().containsKey(firstPkElement)) {
|
||||
throw ConfigErrorException(
|
||||
"For stream ${stream.mappedDescriptor.toPrettyString()}: A primary key column does not exist in the schema: $firstPkElement"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (stream.importType.cursor.isNotEmpty()) {
|
||||
val firstCursorElement = stream.importType.cursor.first()
|
||||
if (!stream.schema.asColumns().containsKey(firstCursorElement)) {
|
||||
throw ConfigErrorException(
|
||||
"For stream ${stream.mappedDescriptor.toPrettyString()}: The cursor does not exist in the schema: $firstCursorElement"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface DestinationCatalogFactory {
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
package io.airbyte.cdk.load.command
|
||||
|
||||
import io.airbyte.cdk.ConfigErrorException
|
||||
import io.airbyte.cdk.load.data.AirbyteValueProxy.FieldAccessor
|
||||
import io.airbyte.cdk.load.data.BooleanType
|
||||
import io.airbyte.cdk.load.data.FieldType
|
||||
@@ -18,6 +19,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
|
||||
import io.airbyte.protocol.models.v0.DestinationSyncMode
|
||||
import kotlin.test.assertEquals
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.assertThrows
|
||||
|
||||
class DestinationCatalogTest {
|
||||
private val originalCatalog =
|
||||
@@ -45,7 +47,9 @@ class DestinationCatalogTest {
|
||||
.withIncludeFiles(true)
|
||||
.withStream(
|
||||
AirbyteStream()
|
||||
.withJsonSchema("""{"type": "object"}""".deserializeToNode())
|
||||
.withJsonSchema(
|
||||
"""{"type": "object", "properties": {"id1": {"type": "integer"}, "id2": {"type": "integer"}, "cursor": {"type": "integer"}}, "additionalProperties": false}""".deserializeToNode()
|
||||
)
|
||||
.withNamespace("namespace2")
|
||||
.withName("name2")
|
||||
.withIsFileBased(true)
|
||||
@@ -73,7 +77,9 @@ class DestinationCatalogTest {
|
||||
.withIncludeFiles(false)
|
||||
.withStream(
|
||||
AirbyteStream()
|
||||
.withJsonSchema("""{"type": "object"}""".deserializeToNode())
|
||||
.withJsonSchema(
|
||||
"""{"type": "object", "properties": {"id1": {"type": "integer"}, "id2": {"type": "integer"}, "cursor": {"type": "integer"}}, "additionalProperties": false}""".deserializeToNode()
|
||||
)
|
||||
.withNamespace("namespace4")
|
||||
.withName("name4")
|
||||
.withIsFileBased(true)
|
||||
@@ -135,4 +141,98 @@ class DestinationCatalogTest {
|
||||
stream.airbyteValueProxyFieldAccessors.toList()
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun throwOnDuplicateStreams() {
|
||||
val e =
|
||||
assertThrows<ConfigErrorException> {
|
||||
DestinationCatalog(
|
||||
listOf(
|
||||
DestinationStream(
|
||||
unmappedNamespace = null,
|
||||
unmappedName = "foo",
|
||||
importType = Append,
|
||||
generationId = 1,
|
||||
minimumGenerationId = 0,
|
||||
syncId = 1,
|
||||
includeFiles = false,
|
||||
schema = ObjectType(linkedMapOf()),
|
||||
namespaceMapper = NamespaceMapper(),
|
||||
),
|
||||
DestinationStream(
|
||||
unmappedNamespace = null,
|
||||
unmappedName = "foo",
|
||||
importType = Append,
|
||||
generationId = 1,
|
||||
minimumGenerationId = 0,
|
||||
syncId = 1,
|
||||
includeFiles = false,
|
||||
schema = ObjectType(linkedMapOf()),
|
||||
namespaceMapper = NamespaceMapper(),
|
||||
),
|
||||
)
|
||||
)
|
||||
}
|
||||
assertEquals("Some streams appeared multiple times: [foo]", e.message)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun validatePkExists() {
|
||||
val e =
|
||||
assertThrows<ConfigErrorException> {
|
||||
DestinationCatalog(
|
||||
listOf(
|
||||
DestinationStream(
|
||||
unmappedNamespace = null,
|
||||
unmappedName = "foo",
|
||||
importType =
|
||||
Dedupe(primaryKey = listOf(listOf("id")), cursor = emptyList()),
|
||||
generationId = 1,
|
||||
minimumGenerationId = 0,
|
||||
syncId = 1,
|
||||
includeFiles = false,
|
||||
schema = ObjectType(linkedMapOf()),
|
||||
namespaceMapper = NamespaceMapper(),
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
assertEquals(
|
||||
"For stream foo: A primary key column does not exist in the schema: id",
|
||||
e.message
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun validateCursorExists() {
|
||||
val e =
|
||||
assertThrows<ConfigErrorException> {
|
||||
DestinationCatalog(
|
||||
listOf(
|
||||
DestinationStream(
|
||||
unmappedNamespace = null,
|
||||
unmappedName = "foo",
|
||||
importType =
|
||||
Dedupe(
|
||||
primaryKey = listOf(listOf("id")),
|
||||
cursor = listOf("updated_at"),
|
||||
),
|
||||
generationId = 1,
|
||||
minimumGenerationId = 0,
|
||||
syncId = 1,
|
||||
includeFiles = false,
|
||||
schema =
|
||||
ObjectType(
|
||||
linkedMapOf("id" to FieldType(IntegerType, nullable = true))
|
||||
),
|
||||
namespaceMapper = NamespaceMapper(),
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
assertEquals(
|
||||
"For stream foo: The cursor does not exist in the schema: updated_at",
|
||||
e.message
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: destination
|
||||
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
|
||||
dockerImageTag: 2.12.4-rc.1
|
||||
dockerImageTag: 2.12.4-rc.2
|
||||
dockerRepository: airbyte/destination-bigquery
|
||||
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
|
||||
githubIssueLabel: destination-bigquery
|
||||
|
||||
@@ -16,11 +16,15 @@ class BigqueryCheckCleaner : CheckCleaner<BigqueryConfiguration> {
|
||||
override fun cleanup(config: BigqueryConfiguration, stream: DestinationStream) {
|
||||
val bq = BigqueryBeansFactory().getBigqueryClient(config)
|
||||
bq.getTable(
|
||||
BigqueryRawTableNameGenerator(config).getTableName(stream.descriptor).toTableId()
|
||||
BigqueryRawTableNameGenerator(config)
|
||||
.getTableName(stream.mappedDescriptor)
|
||||
.toTableId()
|
||||
)
|
||||
?.delete()
|
||||
bq.getTable(
|
||||
BigqueryFinalTableNameGenerator(config).getTableName(stream.descriptor).toTableId()
|
||||
BigqueryFinalTableNameGenerator(config)
|
||||
.getTableName(stream.mappedDescriptor)
|
||||
.toTableId()
|
||||
)
|
||||
?.delete()
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ object BigqueryRawTableDataDumper : DestinationDataDumper {
|
||||
val bigquery = BigqueryBeansFactory().getBigqueryClient(config)
|
||||
|
||||
val (_, rawTableName) =
|
||||
BigqueryRawTableNameGenerator(config).getTableName(stream.descriptor)
|
||||
BigqueryRawTableNameGenerator(config).getTableName(stream.mappedDescriptor)
|
||||
|
||||
return bigquery.getTable(TableId.of(config.rawTableDataset, rawTableName))?.let { table ->
|
||||
val bigquerySchema = table.getDefinition<StandardTableDefinition>().schema!!
|
||||
@@ -95,7 +95,7 @@ object BigqueryFinalTableDataDumper : DestinationDataDumper {
|
||||
val bigquery = BigqueryBeansFactory().getBigqueryClient(config)
|
||||
|
||||
val (datasetName, finalTableName) =
|
||||
BigqueryFinalTableNameGenerator(config).getTableName(stream.descriptor)
|
||||
BigqueryFinalTableNameGenerator(config).getTableName(stream.mappedDescriptor)
|
||||
|
||||
return bigquery.getTable(TableId.of(datasetName, finalTableName))?.let { table ->
|
||||
val bigquerySchema = table.getDefinition<StandardTableDefinition>().schema!!
|
||||
|
||||
@@ -213,6 +213,7 @@ tutorials:
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 2.12.4-rc.2 | 2025-06-18 | [61700](https://github.com/airbytehq/airbyte/pull/61700) | 2.12.4 RC 2 (throw more informative error on invalid catalog) |
|
||||
| 2.12.4-rc.1 | 2025-06-16 | [61637](https://github.com/airbytehq/airbyte/pull/61637) | 2.12.4 RC 1 (theoretically equivalent to 2.12.0, but with fixed global state handling in CDK) |
|
||||
| 2.12.3 | 2025-06-16 | [61648](https://github.com/airbytehq/airbyte/pull/61648) | This is not the release you are looking for. identical to 2.10.2 |
|
||||
| 2.12.1 | 2025-06-13 | [61588](https://github.com/airbytehq/airbyte/pull/61588) | ~~Publish version to account for possible duplicate publishing in pipeline. Noop change.~~ WARNING: THIS HAS A BUG. DO NOT USE. |
|
||||
|
||||
Reference in New Issue
Block a user