Make efficient table discovery during read (#52556)
This commit is contained in:
committed by
GitHub
parent
ae01f283a2
commit
d146b27b05
@@ -174,6 +174,7 @@ corresponds to that version.
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 0.48.7 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Make efficient table discovery during read |
|
||||
| 0.48.6 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Fix flaky source mssql tests |
|
||||
| 0.48.5 | 2025-01-16 | [\#51583](https://github.com/airbytehq/airbyte/pull/51583) | Also save SSL key to /tmp in destination-postgres |
|
||||
| 0.48.4 | 2024-12-24 | [\#50410](https://github.com/airbytehq/airbyte/pull/50410) | Save SSL key to /tmp |
|
||||
|
||||
@@ -1 +1 @@
|
||||
version=0.48.6
|
||||
version=0.48.7
|
||||
|
||||
@@ -389,31 +389,7 @@ abstract class AbstractJdbcSource<Datatype>(
|
||||
)
|
||||
}
|
||||
.values
|
||||
.map { fields: List<JsonNode> ->
|
||||
TableInfo<CommonField<Datatype>>(
|
||||
nameSpace = fields[0].get(INTERNAL_SCHEMA_NAME).asText(),
|
||||
name = fields[0].get(INTERNAL_TABLE_NAME).asText(),
|
||||
fields =
|
||||
fields
|
||||
// read the column metadata Json object, and determine its
|
||||
// type
|
||||
.map { f: JsonNode ->
|
||||
val datatype = sourceOperations.getDatabaseFieldType(f)
|
||||
val jsonType = getAirbyteType(datatype)
|
||||
LOGGER.debug {
|
||||
"Table ${fields[0].get(INTERNAL_TABLE_NAME).asText()} column ${f.get(INTERNAL_COLUMN_NAME).asText()}" +
|
||||
"(type ${f.get(INTERNAL_COLUMN_TYPE_NAME).asText()}[${f.get(INTERNAL_COLUMN_SIZE).asInt()}], " +
|
||||
"nullable ${f.get(INTERNAL_IS_NULLABLE).asBoolean()}) -> $jsonType"
|
||||
}
|
||||
object :
|
||||
CommonField<Datatype>(
|
||||
f.get(INTERNAL_COLUMN_NAME).asText(),
|
||||
datatype
|
||||
) {}
|
||||
},
|
||||
cursorFields = extractCursorFields(fields)
|
||||
)
|
||||
}
|
||||
.map { fields: List<JsonNode> -> jsonFieldListToTableInfo(fields) }
|
||||
}
|
||||
|
||||
private fun extractCursorFields(fields: List<JsonNode>): List<String> {
|
||||
@@ -579,6 +555,53 @@ abstract class AbstractJdbcSource<Datatype>(
|
||||
)
|
||||
}
|
||||
|
||||
override fun discoverTable(
|
||||
database: JdbcDatabase,
|
||||
schema: String,
|
||||
tableName: String
|
||||
): TableInfo<CommonField<Datatype>>? {
|
||||
LOGGER.info { "Discover table: $schema.$tableName" }
|
||||
return database
|
||||
.bufferedResultSetQuery<JsonNode>(
|
||||
{ connection: Connection ->
|
||||
connection.metaData.getColumns(getCatalog(database), schema, tableName, null)
|
||||
},
|
||||
{ resultSet: ResultSet -> this.getColumnMetadata(resultSet) }
|
||||
)
|
||||
.groupBy { t: JsonNode ->
|
||||
ImmutablePair.of<String, String>(
|
||||
t.get(INTERNAL_SCHEMA_NAME).asText(),
|
||||
t.get(INTERNAL_TABLE_NAME).asText()
|
||||
)
|
||||
}
|
||||
.values
|
||||
.map { fields: List<JsonNode> -> jsonFieldListToTableInfo(fields) }
|
||||
.firstOrNull()
|
||||
}
|
||||
|
||||
private fun jsonFieldListToTableInfo(fields: List<JsonNode>): TableInfo<CommonField<Datatype>> {
|
||||
return TableInfo<CommonField<Datatype>>(
|
||||
nameSpace = fields[0].get(INTERNAL_SCHEMA_NAME).asText(),
|
||||
name = fields[0].get(INTERNAL_TABLE_NAME).asText(),
|
||||
fields =
|
||||
fields
|
||||
// read the column metadata Json object, and determine its
|
||||
// type
|
||||
.map { f: JsonNode ->
|
||||
val datatype = sourceOperations.getDatabaseFieldType(f)
|
||||
val jsonType = getAirbyteType(datatype)
|
||||
LOGGER.debug {
|
||||
"Table ${fields[0].get(INTERNAL_TABLE_NAME).asText()} column ${f.get(INTERNAL_COLUMN_NAME).asText()}" +
|
||||
"(type ${f.get(INTERNAL_COLUMN_TYPE_NAME).asText()}[${f.get(INTERNAL_COLUMN_SIZE).asInt()}], " +
|
||||
"nullable ${f.get(INTERNAL_IS_NULLABLE).asBoolean()}) -> $jsonType"
|
||||
}
|
||||
object :
|
||||
CommonField<Datatype>(f.get(INTERNAL_COLUMN_NAME).asText(), datatype) {}
|
||||
},
|
||||
cursorFields = extractCursorFields(fields)
|
||||
)
|
||||
}
|
||||
|
||||
public override fun isCursorType(type: Datatype): Boolean {
|
||||
return sourceOperations.isCursorType(type)
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ protected constructor(driverClassName: String) :
|
||||
logPreSyncDebugData(database, catalog)
|
||||
|
||||
val fullyQualifiedTableNameToInfo =
|
||||
discoverWithoutSystemTables(database).associateBy {
|
||||
discoverWithoutSystemTables(database, catalog).associateBy {
|
||||
String.format("%s.%s", it.nameSpace, it.name)
|
||||
}
|
||||
|
||||
@@ -289,6 +289,22 @@ protected constructor(driverClassName: String) :
|
||||
/* no-op */
|
||||
}
|
||||
|
||||
@Throws(Exception::class)
|
||||
protected fun discoverWithoutSystemTables(
|
||||
database: Database,
|
||||
catalog: ConfiguredAirbyteCatalog,
|
||||
): List<TableInfo<CommonField<DataType>>> {
|
||||
var result = mutableListOf<TableInfo<CommonField<DataType>>>()
|
||||
catalog.streams.forEach { airbyteStream: ConfiguredAirbyteStream ->
|
||||
val stream = airbyteStream.stream
|
||||
discoverTable(database, stream.namespace, stream.name)?.let {
|
||||
LOGGER.info { "Discovered table: ${it.nameSpace}.${it.name}: $it" }
|
||||
result.add(it)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@Throws(Exception::class)
|
||||
protected fun discoverWithoutSystemTables(
|
||||
database: Database
|
||||
@@ -723,6 +739,23 @@ protected constructor(driverClassName: String) :
|
||||
tableInfos: List<TableInfo<CommonField<DataType>>>
|
||||
): Map<String, MutableList<String>>
|
||||
|
||||
/**
|
||||
* Discovers a table in the source database.
|
||||
*
|
||||
* @param database
|
||||
* - source database
|
||||
* @param schema
|
||||
* - source schema
|
||||
* @param tableName
|
||||
* - source table name
|
||||
* @return table information
|
||||
*/
|
||||
protected abstract fun discoverTable(
|
||||
database: Database,
|
||||
schema: String,
|
||||
tableName: String
|
||||
): TableInfo<CommonField<DataType>>?
|
||||
|
||||
protected abstract val quoteString: String?
|
||||
|
||||
/**
|
||||
|
||||
@@ -3,7 +3,7 @@ plugins {
|
||||
}
|
||||
|
||||
airbyteJavaConnector {
|
||||
cdkVersionRequired = '0.48.6'
|
||||
cdkVersionRequired = '0.48.7'
|
||||
features = ['db-sources']
|
||||
useLocalCdk = false
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: source
|
||||
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
|
||||
dockerImageTag: 4.1.19
|
||||
dockerImageTag: 4.1.20
|
||||
dockerRepository: airbyte/source-mssql
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
|
||||
githubIssueLabel: source-mssql
|
||||
|
||||
@@ -424,6 +424,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------| :---------------------------------------------------------------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 4.1.20 | 2025-01-26 | [52556](https://github.com/airbytehq/airbyte/pull/52556) | Improve tables discovery during read. |
|
||||
| 4.1.19 | 2025-01-16 | [51596](https://github.com/airbytehq/airbyte/pull/51596) | Bump driver versions to latest (jdbc, debezium, cdk) |
|
||||
| 4.1.18 | 2025-01-06 | [50943](https://github.com/airbytehq/airbyte/pull/50943) | Use airbyte/java-connector-base:2.0.0. This makes the image rootless. The connector will be incompatible with Airbyte < 0.64. |
|
||||
| 4.1.17 | 2024-12-17 | [49840](https://github.com/airbytehq/airbyte/pull/49840) | Use a base image: airbyte/java-connector-base:1.0.0 |
|
||||
|
||||
Reference in New Issue
Block a user