1
0
mirror of synced 2025-12-25 02:09:19 -05:00

destination-postgres: bump CDK (#43331)

Refactored `PostgresDestination`, `PostgresSqlOperations`, and `PostgresDestinationHandler` to support the new `PostgresGenerationHandler` class for handling generation IDs.
This commit is contained in:
Stephane Geneix
2024-08-07 16:57:31 -07:00
committed by GitHub
parent 634c4941c2
commit d8c5a93c8f
7 changed files with 66 additions and 51 deletions

View File

@@ -3,7 +3,7 @@ plugins {
}
airbyteJavaConnector {
cdkVersionRequired = '0.44.2'
cdkVersionRequired = '0.44.3'
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
useLocalCdk = false
}

View File

@@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.3.1
dockerImageTag: 2.3.2
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres

View File

@@ -15,7 +15,6 @@ import io.airbyte.cdk.integrations.base.IntegrationRunner
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator
import io.airbyte.cdk.integrations.util.PostgresSslConnectionUtils
@@ -147,9 +146,14 @@ class PostgresDestination :
override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator {
return PostgresSqlGenerator(PostgresSQLNameTransformer(), hasDropCascadeMode(config))
}
override fun getSqlOperations(config: JsonNode): SqlOperations {
override fun getSqlOperations(config: JsonNode): PostgresSqlOperations {
return PostgresSqlOperations(hasDropCascadeMode(config))
}
override fun getGenerationHandler(): PostgresGenerationHandler {
return PostgresGenerationHandler()
}
private fun hasDropCascadeMode(config: JsonNode): Boolean {
val dropCascadeNode = config[DROP_CASCADE_OPTION]
return dropCascadeNode != null && dropCascadeNode.asBoolean()
@@ -165,7 +169,7 @@ class PostgresDestination :
databaseName,
database,
rawTableSchema,
getSqlOperations(config)
getGenerationHandler(),
)
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.postgres
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.destination.jdbc.JdbcGenerationHandler
class PostgresGenerationHandler : JdbcGenerationHandler {
override fun getGenerationIdInTable(
database: JdbcDatabase,
namespace: String,
name: String
): Long? {
val selectTableResultSet =
database
.unsafeQuery(
"""SELECT 1
| FROM pg_catalog.pg_namespace n
| JOIN pg_catalog.pg_class c
| ON c.relnamespace=n.oid
| JOIN pg_catalog.pg_attribute a
| ON a.attrelid = c.oid
| WHERE n.nspname=?
| AND c.relkind='r'
| AND c.relname=?
| AND a.attname=?
| LIMIT 1
""".trimMargin(),
namespace,
name,
"_airbyte_generation_id"
)
.use { it.toList() }
if (selectTableResultSet.isEmpty()) {
return null
} else {
val selectGenIdResultSet =
database
.unsafeQuery("SELECT _airbyte_generation_id FROM $namespace.$name LIMIT 1;")
.use { it.toList() }
if (selectGenIdResultSet.isEmpty()) {
return null
} else {
val genIdInTable =
selectGenIdResultSet.first().get("_airbyte_generation_id")?.asLong()
LOGGER.info { "found generationId in table $namespace.$name: $genIdInTable" }
return genIdInTable ?: -1L
}
}
}
}

View File

@@ -146,47 +146,4 @@ class PostgresSqlOperations(useDropCascade: Boolean) : JdbcSqlOperations() {
)
)
}
override fun getGenerationIdInTable(
database: JdbcDatabase,
namespace: String,
name: String
): Long? {
val selectTableResultSet =
database
.unsafeQuery(
"""SELECT 1
| FROM pg_catalog.pg_namespace n
| JOIN pg_catalog.pg_class c
| ON c.relnamespace=n.oid
| JOIN pg_catalog.pg_attribute a
| ON a.attrelid = c.oid
| WHERE n.nspname=?
| AND c.relkind='r'
| AND c.relname=?
| AND a.attname=?
| LIMIT 1
""".trimMargin(),
namespace,
name,
"_airbyte_generation_id"
)
.use { it.toList() }
if (selectTableResultSet.isEmpty()) {
return null
} else {
val selectGenIdResultSet =
database
.unsafeQuery("SELECT _airbyte_generation_id FROM $namespace.$name LIMIT 1;")
.use { it.toList() }
if (selectGenIdResultSet.isEmpty()) {
return null
} else {
val genIdInTable =
selectGenIdResultSet.first().get("_airbyte_generation_id")?.asLong()
LOGGER.info { "found generationId in table $namespace.$name: $genIdInTable" }
return genIdInTable ?: -1L
}
}
}
}

View File

@@ -5,7 +5,6 @@ package io.airbyte.integrations.destination.postgres.typing_deduping
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
@@ -15,20 +14,21 @@ import io.airbyte.integrations.base.destination.typing_deduping.Sql
import io.airbyte.integrations.base.destination.typing_deduping.Struct
import io.airbyte.integrations.base.destination.typing_deduping.Union
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.integrations.destination.postgres.PostgresGenerationHandler
import org.jooq.SQLDialect
class PostgresDestinationHandler(
databaseName: String?,
jdbcDatabase: JdbcDatabase,
rawTableSchema: String,
sqlOperations: SqlOperations,
generationHandler: PostgresGenerationHandler,
) :
JdbcDestinationHandler<PostgresState>(
databaseName,
jdbcDatabase,
rawTableSchema,
SQLDialect.POSTGRES,
sqlOperations = sqlOperations,
generationHandler = generationHandler
) {
override fun toJdbcTypeName(airbyteType: AirbyteType): String {
// This is mostly identical to the postgres implementation, but swaps jsonb to super