1
0
mirror of synced 2025-12-19 18:14:56 -05:00
This commit is contained in:
Rodi Reich Zilberman
2025-12-17 23:00:57 -08:00
parent f3299c9abc
commit b9e9116edf
3 changed files with 12 additions and 16 deletions

View File

@@ -210,7 +210,8 @@ class JdbcSelectQuerier(
*
* @param jdbcConnectionFactory Factory for creating JDBC connections
* @param query SQL query string to execute (should return exactly one row)
* @param bindParameters Optional lambda to bind parameters to the PreparedStatement before execution
* @param bindParameters Optional lambda to bind parameters to the PreparedStatement before
* execution
* @param withResultSet Lambda function to process the ResultSet and extract the desired value
* @return The value extracted from the single result row using the withRS function
*/

View File

@@ -184,9 +184,7 @@ open class PostgresSourceJdbcPartitionFactory(
null
} else {
// Snapshot ongoing
if (
fileNodeChange !in listOf(FILENODE_NO_CHANGE, FILENODE_NOT_FOUND)
) {
if (fileNodeChange !in listOf(FILENODE_NO_CHANGE, FILENODE_NOT_FOUND)) {
handler.accept(InvalidPrimaryKey(stream.id, listOf(ctidField.id)))
streamState.reset()
coldStart(streamState, filenode)
@@ -335,14 +333,17 @@ open class PostgresSourceJdbcPartitionFactory(
jdbcConnectionFactory: JdbcConnectionFactory
): Filenode? {
log.info { "Querying filenode for stream ${streamState.stream.id}" }
val sql = "SELECT pg_relation_filenode(?::regclass)"
val sql = "SELECT pg_relation_filenode(?::regclass)"
val jdbcFieldType: JdbcFieldType<*> = LongFieldType
val filenode: Any? =
querySingleValue(
jdbcConnectionFactory,
sql,
{ stmt ->
stmt.setString(1, """"${streamState.stream.namespace}"."${streamState.stream.name}"""")
stmt.setString(
1,
""""${streamState.stream.namespace}"."${streamState.stream.name}""""
)
},
{ rs -> jdbcFieldType.jdbcGetter.get(rs, 1) }
)
@@ -427,14 +428,11 @@ open class PostgresSourceJdbcPartitionFactory(
}
private fun relationSize(stream: Stream): Long {
val sql =
"SELECT pg_relation_size(?)"
val sql = "SELECT pg_relation_size(?)"
return querySingleValue(
JdbcConnectionFactory(sharedState.configuration),
sql,
{ stmt ->
stmt.setString(1,toQualifiedTableName(stream.namespace, stream.name))
},
{ stmt -> stmt.setString(1, toQualifiedTableName(stream.namespace, stream.name)) },
{ rs ->
return@querySingleValue rs.getLong(1)
}

View File

@@ -146,14 +146,11 @@ class PostgresSourceJdbcConcurrentPartitionsCreator<
/** Get total relation size in bytes for a given table - this includes toast data. */
private fun totalRelationSize(stream: Stream): Long {
val sql =
"SELECT pg_total_relation_size(?)"
val sql = "SELECT pg_total_relation_size(?)"
return querySingleValue(
JdbcConnectionFactory(sharedState.configuration),
sql,
{ stmt ->
stmt.setString(1,toQualifiedTableName(stream.namespace, stream.name))
},
{ stmt -> stmt.setString(1, toQualifiedTableName(stream.namespace, stream.name)) },
{ rs ->
return@querySingleValue rs.getLong(1)
}