diff --git a/airbyte-cdk/bulk/changelog.md b/airbyte-cdk/bulk/changelog.md index 8a2c3483983..8dcb8ea8332 100644 --- a/airbyte-cdk/bulk/changelog.md +++ b/airbyte-cdk/bulk/changelog.md @@ -1,3 +1,7 @@ +## Version 0.1.65 + +extract cdk: fix bug when getting table metadata that cause timeout + ## Version 0.1.64 extract cdk: add table filtering to jdbc connectors diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt index 15e9052dc00..0b1148e2d94 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt @@ -23,6 +23,7 @@ import java.sql.ResultSet import java.sql.ResultSetMetaData import java.sql.SQLException import java.sql.Statement +import kotlin.collections.isNotEmpty /** Default implementation of [MetadataQuerier]. */ class JdbcMetadataQuerier( @@ -62,7 +63,10 @@ class JdbcMetadataQuerier( return null } - val tableFilters = config.tableFilters + val tableFiltersBySchema: Map> = + config.tableFilters + .groupBy { it.schemaName } + .mapValues { (_, filters) -> filters.flatMap { it.patterns } } val memoizedTableNames: List by lazy { log.info { "Querying table names for catalog discovery." } @@ -93,21 +97,13 @@ class JdbcMetadataQuerier( NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace } - if (tableFilters.isEmpty()) { - addTablesFromQuery(catalog, schema, null) - } else { - val filtersForSchema = - tableFilters.filter { it.schemaName.equals(schema, ignoreCase = true) } - - if (filtersForSchema.isEmpty()) { - addTablesFromQuery(catalog, schema, null) - } else { - for (filter in filtersForSchema) { - for (pattern in filter.patterns) { - addTablesFromQuery(catalog, filter.schemaName, pattern) - } - } + val patterns = tableFiltersBySchema[namespace] + if (patterns != null && patterns.isNotEmpty()) { + for (pattern in patterns) { + addTablesFromQuery(catalog, schema, pattern) } + } else { + addTablesFromQuery(catalog, schema, null) } } log.info { "Discovered ${allTables.size} table(s) in namespaces ${config.namespaces}." } @@ -127,28 +123,45 @@ class JdbcMetadataQuerier( log.info { "Querying column names for catalog discovery." } try { val dbmd: DatabaseMetaData = conn.metaData - memoizedTableNames - .filter { it.namespace() != null } - .forEach { table -> - dbmd.getPseudoColumns(table.catalog, table.schema, table.name, null).use { - rs: ResultSet -> - while (rs.next()) { - val (tableName: TableName, metadata: ColumnMetadata) = - columnMetadataFromResultSet(rs, isPseudoColumn = true) - val joinedTableName: TableName = joinMap[tableName] ?: continue - results.add(joinedTableName to metadata) - } - } - dbmd.getColumns(table.catalog, table.schema, table.name, null).use { - rs: ResultSet -> - while (rs.next()) { - val (tableName: TableName, metadata: ColumnMetadata) = - columnMetadataFromResultSet(rs, isPseudoColumn = false) - val joinedTableName: TableName = joinMap[tableName] ?: continue - results.add(joinedTableName to metadata) - } + + fun addColumnsFromQuery( + catalog: String?, + schema: String?, + tablePattern: String?, + isPseudoColumn: Boolean + ) { + val rsMethod = if (isPseudoColumn) dbmd::getPseudoColumns else dbmd::getColumns + rsMethod(catalog, schema, tablePattern, null).use { rs: ResultSet -> + while (rs.next()) { + val (tableName: TableName, metadata: ColumnMetadata) = + columnMetadataFromResultSet(rs, isPseudoColumn) + val joinedTableName: TableName = joinMap[tableName] ?: continue + results.add(joinedTableName to metadata) } } + } + // Query columns using the same pattern as table discovery: + // - If schema has filters, query per filter pattern + // - If no filters, query entire schema at once + for (namespace in config.namespaces + config.namespaces.map { it.uppercase() }) { + val (catalog: String?, schema: String?) = + when (constants.namespaceKind) { + NamespaceKind.CATALOG -> namespace to null + NamespaceKind.SCHEMA -> null to namespace + NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace + } + + val patterns = tableFiltersBySchema[namespace] + if (patterns != null && patterns.isNotEmpty()) { + for (pattern in patterns) { + addColumnsFromQuery(catalog, schema, pattern, isPseudoColumn = true) + addColumnsFromQuery(catalog, schema, pattern, isPseudoColumn = false) + } + } else { + addColumnsFromQuery(catalog, schema, null, isPseudoColumn = true) + addColumnsFromQuery(catalog, schema, null, isPseudoColumn = false) + } + } log.info { "Discovered ${results.size} column(s) and pseudo-column(s)." } } catch (e: Exception) { throw RuntimeException("Column name discovery query failed: ${e.message}", e) diff --git a/airbyte-cdk/bulk/version.properties b/airbyte-cdk/bulk/version.properties index fd90ee21aa9..5a2a041c7f4 100644 --- a/airbyte-cdk/bulk/version.properties +++ b/airbyte-cdk/bulk/version.properties @@ -1 +1 @@ -version=0.1.64 +version=0.1.65