Query table metadata per schema (#69184)
This commit is contained in:
@@ -1,3 +1,7 @@
|
||||
## Version 0.1.65
|
||||
|
||||
extract cdk: fix bug when getting table metadata that cause timeout
|
||||
|
||||
## Version 0.1.64
|
||||
|
||||
extract cdk: add table filtering to jdbc connectors
|
||||
|
||||
@@ -23,6 +23,7 @@ import java.sql.ResultSet
|
||||
import java.sql.ResultSetMetaData
|
||||
import java.sql.SQLException
|
||||
import java.sql.Statement
|
||||
import kotlin.collections.isNotEmpty
|
||||
|
||||
/** Default implementation of [MetadataQuerier]. */
|
||||
class JdbcMetadataQuerier(
|
||||
@@ -62,7 +63,10 @@ class JdbcMetadataQuerier(
|
||||
return null
|
||||
}
|
||||
|
||||
val tableFilters = config.tableFilters
|
||||
val tableFiltersBySchema: Map<String, List<String>> =
|
||||
config.tableFilters
|
||||
.groupBy { it.schemaName }
|
||||
.mapValues { (_, filters) -> filters.flatMap { it.patterns } }
|
||||
|
||||
val memoizedTableNames: List<TableName> by lazy {
|
||||
log.info { "Querying table names for catalog discovery." }
|
||||
@@ -93,21 +97,13 @@ class JdbcMetadataQuerier(
|
||||
NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace
|
||||
}
|
||||
|
||||
if (tableFilters.isEmpty()) {
|
||||
addTablesFromQuery(catalog, schema, null)
|
||||
} else {
|
||||
val filtersForSchema =
|
||||
tableFilters.filter { it.schemaName.equals(schema, ignoreCase = true) }
|
||||
|
||||
if (filtersForSchema.isEmpty()) {
|
||||
addTablesFromQuery(catalog, schema, null)
|
||||
} else {
|
||||
for (filter in filtersForSchema) {
|
||||
for (pattern in filter.patterns) {
|
||||
addTablesFromQuery(catalog, filter.schemaName, pattern)
|
||||
}
|
||||
}
|
||||
val patterns = tableFiltersBySchema[namespace]
|
||||
if (patterns != null && patterns.isNotEmpty()) {
|
||||
for (pattern in patterns) {
|
||||
addTablesFromQuery(catalog, schema, pattern)
|
||||
}
|
||||
} else {
|
||||
addTablesFromQuery(catalog, schema, null)
|
||||
}
|
||||
}
|
||||
log.info { "Discovered ${allTables.size} table(s) in namespaces ${config.namespaces}." }
|
||||
@@ -127,28 +123,45 @@ class JdbcMetadataQuerier(
|
||||
log.info { "Querying column names for catalog discovery." }
|
||||
try {
|
||||
val dbmd: DatabaseMetaData = conn.metaData
|
||||
memoizedTableNames
|
||||
.filter { it.namespace() != null }
|
||||
.forEach { table ->
|
||||
dbmd.getPseudoColumns(table.catalog, table.schema, table.name, null).use {
|
||||
rs: ResultSet ->
|
||||
while (rs.next()) {
|
||||
val (tableName: TableName, metadata: ColumnMetadata) =
|
||||
columnMetadataFromResultSet(rs, isPseudoColumn = true)
|
||||
val joinedTableName: TableName = joinMap[tableName] ?: continue
|
||||
results.add(joinedTableName to metadata)
|
||||
}
|
||||
}
|
||||
dbmd.getColumns(table.catalog, table.schema, table.name, null).use {
|
||||
rs: ResultSet ->
|
||||
while (rs.next()) {
|
||||
val (tableName: TableName, metadata: ColumnMetadata) =
|
||||
columnMetadataFromResultSet(rs, isPseudoColumn = false)
|
||||
val joinedTableName: TableName = joinMap[tableName] ?: continue
|
||||
results.add(joinedTableName to metadata)
|
||||
}
|
||||
|
||||
fun addColumnsFromQuery(
|
||||
catalog: String?,
|
||||
schema: String?,
|
||||
tablePattern: String?,
|
||||
isPseudoColumn: Boolean
|
||||
) {
|
||||
val rsMethod = if (isPseudoColumn) dbmd::getPseudoColumns else dbmd::getColumns
|
||||
rsMethod(catalog, schema, tablePattern, null).use { rs: ResultSet ->
|
||||
while (rs.next()) {
|
||||
val (tableName: TableName, metadata: ColumnMetadata) =
|
||||
columnMetadataFromResultSet(rs, isPseudoColumn)
|
||||
val joinedTableName: TableName = joinMap[tableName] ?: continue
|
||||
results.add(joinedTableName to metadata)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Query columns using the same pattern as table discovery:
|
||||
// - If schema has filters, query per filter pattern
|
||||
// - If no filters, query entire schema at once
|
||||
for (namespace in config.namespaces + config.namespaces.map { it.uppercase() }) {
|
||||
val (catalog: String?, schema: String?) =
|
||||
when (constants.namespaceKind) {
|
||||
NamespaceKind.CATALOG -> namespace to null
|
||||
NamespaceKind.SCHEMA -> null to namespace
|
||||
NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace
|
||||
}
|
||||
|
||||
val patterns = tableFiltersBySchema[namespace]
|
||||
if (patterns != null && patterns.isNotEmpty()) {
|
||||
for (pattern in patterns) {
|
||||
addColumnsFromQuery(catalog, schema, pattern, isPseudoColumn = true)
|
||||
addColumnsFromQuery(catalog, schema, pattern, isPseudoColumn = false)
|
||||
}
|
||||
} else {
|
||||
addColumnsFromQuery(catalog, schema, null, isPseudoColumn = true)
|
||||
addColumnsFromQuery(catalog, schema, null, isPseudoColumn = false)
|
||||
}
|
||||
}
|
||||
log.info { "Discovered ${results.size} column(s) and pseudo-column(s)." }
|
||||
} catch (e: Exception) {
|
||||
throw RuntimeException("Column name discovery query failed: ${e.message}", e)
|
||||
|
||||
@@ -1 +1 @@
|
||||
version=0.1.64
|
||||
version=0.1.65
|
||||
|
||||
Reference in New Issue
Block a user