1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[source-mysql] Fix fetching binlog status for version >=8.4 (#55237)

Co-authored-by: matt.bayley <matt.bayley@matt.bayley--MacBook-Pro---G4WV0GW6J3>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: yue.li@airbyte.io <yue.li@airbyte.io>
This commit is contained in:
Matt Bayley
2025-03-06 15:01:24 -08:00
committed by GitHub
parent 3cf3c3d2fa
commit 43858a9b08
7 changed files with 48 additions and 34 deletions

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.11.5
dockerImageTag: 3.11.6
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql

View File

@@ -46,6 +46,7 @@ import java.io.ByteArrayOutputStream
import java.math.BigDecimal
import java.sql.Connection
import java.sql.ResultSet
import java.sql.SQLSyntaxErrorException
import java.sql.Statement
import java.time.Instant
import java.time.OffsetDateTime
@@ -269,41 +270,53 @@ class MySqlSourceDebeziumOperations(
}
private fun queryPositionAndGtids(): Pair<MySqlSourceCdcPosition, String?> {
val file = Field("File", StringFieldType)
val pos = Field("Position", LongFieldType)
val gtids = Field("Executed_Gtid_Set", StringFieldType)
jdbcConnectionFactory.get().use { connection: Connection ->
connection.createStatement().use { stmt: Statement ->
val sql = "SHOW MASTER STATUS"
stmt.executeQuery(sql).use { rs: ResultSet ->
if (!rs.next()) throw ConfigErrorException("No results for query: $sql")
val mySqlSourceCdcPosition =
MySqlSourceCdcPosition(
fileName = rs.getString(file.id)?.takeUnless { rs.wasNull() }
?: throw ConfigErrorException(
"No value for ${file.id} in: $sql",
),
position = rs.getLong(pos.id).takeUnless { rs.wasNull() || it <= 0 }
?: throw ConfigErrorException(
"No value for ${pos.id} in: $sql",
),
)
if (rs.metaData.columnCount <= 4) {
// This value exists only in MySQL 5.6.5 or later.
return mySqlSourceCdcPosition to null
}
val gtidSet: String? =
rs.getString(gtids.id)
?.takeUnless { rs.wasNull() || it.isBlank() }
?.trim()
?.replace("\n", "")
?.replace("\r", "")
return mySqlSourceCdcPosition to gtidSet
try {
// Syntax for MySQL version < 8.4
return parseBinaryLogStatus(stmt, "SHOW MASTER STATUS")
} catch (e: SQLSyntaxErrorException) {
// Syntax for MySQL version >= 8.4
return parseBinaryLogStatus(stmt, "SHOW BINARY LOG STATUS")
}
}
}
}
private fun parseBinaryLogStatus(
stmt: Statement,
query: String
): Pair<MySqlSourceCdcPosition, String?> {
stmt.executeQuery(query).use { rs: ResultSet ->
if (!rs.next()) throw ConfigErrorException("No results for query: {{$query}}")
val file = Field("File", StringFieldType)
val pos = Field("Position", LongFieldType)
val gtids = Field("Executed_Gtid_Set", StringFieldType)
val mySqlSourceCdcPosition =
MySqlSourceCdcPosition(
fileName = rs.getString(file.id)?.takeUnless { rs.wasNull() }
?: throw ConfigErrorException(
"No value for ${file.id} in: {{$query}}",
),
position = rs.getLong(pos.id).takeUnless { rs.wasNull() || it <= 0 }
?: throw ConfigErrorException(
"No value for ${pos.id} in: {{$query}}",
),
)
if (rs.metaData.columnCount <= 4) {
// This value exists only in MySQL 5.6.5 or later.
return mySqlSourceCdcPosition to null
}
val gtidSet: String? =
rs.getString(gtids.id)
?.takeUnless { rs.wasNull() || it.isBlank() }
?.trim()
?.replace("\n", "")
?.replace("\r", "")
return mySqlSourceCdcPosition to gtidSet
}
}
private fun queryPurgedIds(): MySqlGtidSet {
val purgedGtidField = Field("@@global.gtid_purged", StringFieldType)
jdbcConnectionFactory.get().use { connection: Connection ->

View File

@@ -9,7 +9,7 @@ import org.testcontainers.containers.Network
import org.testcontainers.utility.DockerImageName
object MySqlContainerFactory {
const val COMPATIBLE_NAME = "mysql:8.0"
const val COMPATIBLE_NAME = "mysql:9.2.0"
private val log = KotlinLogging.logger {}
init {

View File

@@ -44,7 +44,7 @@ class MySqlSourceCdcIntegrationTest {
)
MySqlContainerFactory.exclusive(
imageName = "mysql:8.0",
imageName = "mysql:9.2.0",
MySqlContainerFactory.WithCdcOff,
)
.use { nonCdcDbContainer ->
@@ -143,7 +143,7 @@ class MySqlSourceCdcIntegrationTest {
fun startAndProvisionTestContainer() {
dbContainer =
MySqlContainerFactory.exclusive(
imageName = "mysql:8.0",
imageName = "mysql:9.2.0",
MySqlContainerFactory.WithNetwork,
)
provisionTestContainer(dbContainer, connectionFactory)

View File

@@ -179,7 +179,7 @@ class MySqlSourceCursorBasedIntegrationTest {
companion object {
val log = KotlinLogging.logger {}
val dbContainer: MySQLContainer<*> = MySqlContainerFactory.shared(imageName = "mysql:8.0")
val dbContainer: MySQLContainer<*> = MySqlContainerFactory.shared(imageName = "mysql:9.2.0")
val config: MySqlSourceConfigurationSpecification =
MySqlContainerFactory.config(dbContainer)

View File

@@ -34,7 +34,7 @@ class MySqlSourceDatatypeIntegrationTest {
@BeforeAll
@Timeout(value = 300)
fun startAndProvisionTestContainer() {
dbContainer = MySqlContainerFactory.shared("mysql:8.0", MySqlContainerFactory.WithCdc)
dbContainer = MySqlContainerFactory.shared("mysql:9.2.0", MySqlContainerFactory.WithCdc)
}
}
}