1
0
mirror of synced 2025-12-25 02:09:19 -05:00

source-mysql: adopt bulk-cdk-toolkit-extract-cdc API changes (#52039)

This commit is contained in:
Marius Posta
2025-01-30 16:06:08 -05:00
committed by GitHub
parent a7225939dc
commit 8c2ed63cf4
6 changed files with 103 additions and 120 deletions

View File

@@ -9,7 +9,7 @@ application {
airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = '0.277'
cdk = '0.300'
}
dependencies {

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.11.0
dockerImageTag: 3.11.1
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql

View File

@@ -20,16 +20,18 @@ import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.jdbc.LongFieldType
import io.airbyte.cdk.jdbc.StringFieldType
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.cdc.CdcPartitionsCreator.OffsetInvalidNeedsResyncIllegalStateException
import io.airbyte.cdk.read.cdc.DebeziumInput
import io.airbyte.cdk.read.cdc.AbortDebeziumWarmStartState
import io.airbyte.cdk.read.cdc.DebeziumOffset
import io.airbyte.cdk.read.cdc.DebeziumOperations
import io.airbyte.cdk.read.cdc.DebeziumPropertiesBuilder
import io.airbyte.cdk.read.cdc.DebeziumRecordKey
import io.airbyte.cdk.read.cdc.DebeziumRecordValue
import io.airbyte.cdk.read.cdc.DebeziumSchemaHistory
import io.airbyte.cdk.read.cdc.DebeziumState
import io.airbyte.cdk.read.cdc.DebeziumWarmStartState
import io.airbyte.cdk.read.cdc.DeserializedRecord
import io.airbyte.cdk.read.cdc.InvalidDebeziumWarmStartState
import io.airbyte.cdk.read.cdc.ResetDebeziumWarmStartState
import io.airbyte.cdk.read.cdc.ValidDebeziumWarmStartState
import io.airbyte.cdk.ssh.TunnelSession
import io.airbyte.cdk.util.Jsons
import io.debezium.connector.mysql.MySqlConnector
@@ -63,8 +65,11 @@ class MySqlSourceDebeziumOperations(
random: Random = Random.Default,
) : DebeziumOperations<MySqlSourceCdcPosition> {
private val log = KotlinLogging.logger {}
private val cdcIncrementalConfiguration: CdcIncrementalConfiguration by lazy {
configuration.incrementalConfiguration as CdcIncrementalConfiguration
}
override fun deserialize(
override fun deserializeRecord(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
stream: Stream,
@@ -130,29 +135,42 @@ class MySqlSourceDebeziumOperations(
override fun findStreamName(key: DebeziumRecordKey, value: DebeziumRecordValue): String? =
value.source["table"]?.asText()
override fun deserializeState(
opaqueStateValue: OpaqueStateValue,
): DebeziumWarmStartState {
val debeziumState: UnvalidatedDeserializedState =
try {
deserializeStateUnvalidated(opaqueStateValue)
} catch (e: Exception) {
log.error(e) { "Error deserializing incumbent state value." }
return AbortDebeziumWarmStartState(
"Error deserializing incumbent state value: ${e.message}"
)
}
return validate(debeziumState)
}
/**
* Checks if GTIDs from previously saved state (debeziumInput) are still valid on DB. And also
* check if binlog exists or not.
*
* Validate is not supposed to perform on synthetic state.
*/
private fun validate(debeziumState: DebeziumState): CdcStateValidateResult {
private fun validate(debeziumState: UnvalidatedDeserializedState): DebeziumWarmStartState {
val savedStateOffset: SavedOffset = parseSavedOffset(debeziumState)
val (_: MySqlSourceCdcPosition, gtidSet: String?) = queryPositionAndGtids()
if (gtidSet.isNullOrEmpty() && !savedStateOffset.gtidSet.isNullOrEmpty()) {
log.info {
return abortCdcSync(
"Connector used GTIDs previously, but MySQL server does not know of any GTIDs or they are not enabled"
}
return abortCdcSync()
)
}
val savedGtidSet = MySqlGtidSet(savedStateOffset.gtidSet)
val availableGtidSet = MySqlGtidSet(gtidSet)
if (!savedGtidSet.isContainedWithin(availableGtidSet)) {
log.info {
return abortCdcSync(
"Connector last known GTIDs are $savedGtidSet, but MySQL server only has $availableGtidSet"
}
return abortCdcSync()
)
}
// newGtidSet is gtids from server that hasn't been seen by this connector yet. If the set
@@ -161,49 +179,42 @@ class MySqlSourceDebeziumOperations(
if (!newGtidSet.isEmpty) {
val purgedGtidSet = queryPurgedIds()
if (!purgedGtidSet.isEmpty && !newGtidSet.subtract(purgedGtidSet).equals(newGtidSet)) {
log.info {
return abortCdcSync(
"Connector has not seen GTIDs $newGtidSet, but MySQL server has purged $purgedGtidSet"
}
return abortCdcSync()
)
}
}
if (!savedGtidSet.isEmpty) {
// If the connector has saved GTID set, we will use that to validate and skip
// binlog validation. GTID and binlog works in an independent way to ensure data
// integrity where GTID is for storing transactions and binlog is for storing changes
// in DB.
return CdcStateValidateResult.VALID
}
val existingLogFiles: List<String> = getBinaryLogFileNames()
val found = existingLogFiles.contains(savedStateOffset.position.fileName)
if (!found) {
log.info {
"Connector last known binlog file ${savedStateOffset.position.fileName} is " +
"not found in the server. Server has $existingLogFiles"
// If the connector has saved GTID set, we will use that to validate and skip
// binlog validation. GTID and binlog works in an independent way to ensure data
// integrity where GTID is for storing transactions and binlog is for storing changes
// in DB.
if (savedGtidSet.isEmpty) {
val existingLogFiles: List<String> = getBinaryLogFileNames()
val found = existingLogFiles.contains(savedStateOffset.position.fileName)
if (!found) {
return abortCdcSync(
"Connector last known binlog file ${savedStateOffset.position.fileName} is not found in the server. Server has $existingLogFiles"
)
}
return abortCdcSync()
}
return CdcStateValidateResult.VALID
return ValidDebeziumWarmStartState(debeziumState.offset, debeziumState.schemaHistory)
}
private fun abortCdcSync(): CdcStateValidateResult {
val cdcIncrementalConfiguration: CdcIncrementalConfiguration =
configuration.incrementalConfiguration as CdcIncrementalConfiguration
return when (cdcIncrementalConfiguration.invalidCdcCursorPositionBehavior) {
InvalidCdcCursorPositionBehavior.FAIL_SYNC -> {
log.warn { "Saved offset no longer present on the server. aborting sync." }
CdcStateValidateResult.INVALID_ABORT
}
InvalidCdcCursorPositionBehavior.RESET_SYNC -> {
log.warn {
"Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch."
}
CdcStateValidateResult.INVALID_RESET
}
private fun abortCdcSync(reason: String): InvalidDebeziumWarmStartState =
when (cdcIncrementalConfiguration.invalidCdcCursorPositionBehavior) {
InvalidCdcCursorPositionBehavior.FAIL_SYNC ->
AbortDebeziumWarmStartState(
"Saved offset no longer present on the server, please reset the connection, " +
"and then increase binlog retention and/or increase sync frequency. " +
"$reason."
)
InvalidCdcCursorPositionBehavior.RESET_SYNC ->
ResetDebeziumWarmStartState(
"Saved offset no longer present on the server. $reason."
)
}
}
private fun parseSavedOffset(debeziumState: DebeziumState): SavedOffset {
private fun parseSavedOffset(debeziumState: UnvalidatedDeserializedState): SavedOffset {
val position: MySqlSourceCdcPosition = position(debeziumState.offset)
val gtidSet: String? = debeziumState.offset.wrapped.values.first()["gtids"]?.asText()
return SavedOffset(position, gtidSet)
@@ -233,7 +244,7 @@ class MySqlSourceDebeziumOperations(
return MySqlSourceCdcPosition(file.toString(), pos)
}
override fun synthesize(): DebeziumInput {
override fun generateColdStartOffset(): DebeziumOffset {
val (mySqlSourceCdcPosition: MySqlSourceCdcPosition, gtidSet: String?) =
queryPositionAndGtids()
val topicPrefixName: String = DebeziumPropertiesBuilder.sanitizeTopicPrefix(databaseName)
@@ -254,8 +265,7 @@ class MySqlSourceDebeziumOperations(
}
val offset = DebeziumOffset(mapOf(key to value))
log.info { "Constructed synthetic $offset." }
val state = DebeziumState(offset, schemaHistory = null)
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
return offset
}
private fun queryPositionAndGtids(): Pair<MySqlSourceCdcPosition, String?> {
@@ -319,49 +329,37 @@ class MySqlSourceDebeziumOperations(
}
}
override fun deserialize(
opaqueStateValue: OpaqueStateValue,
streams: List<Stream>
): DebeziumInput {
val debeziumState: DebeziumState =
try {
deserializeDebeziumState(opaqueStateValue)
} catch (e: Exception) {
throw ConfigErrorException("Error deserializing $opaqueStateValue", e)
}
val cdcValidationResult = validate(debeziumState)
if (cdcValidationResult != CdcStateValidateResult.VALID) {
if (cdcValidationResult == CdcStateValidateResult.INVALID_ABORT) {
throw ConfigErrorException(
"Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention and/or increase sync frequency."
)
}
if (cdcValidationResult == CdcStateValidateResult.INVALID_RESET) {
throw OffsetInvalidNeedsResyncIllegalStateException()
}
return synthesize()
}
override fun generateColdStartProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.with(commonProperties)
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
// We use the recovery property cause using this mode will instruct Debezium to
// construct the db schema history. Note that we used to use schema_only_recovery mode
// instead, but this mode has been deprecated.
.with("snapshot.mode", "recovery")
.withStreams(listOf())
.buildMap()
val properties: Map<String, String> =
DebeziumPropertiesBuilder().with(commonProperties).withStreams(streams).buildMap()
return DebeziumInput(properties, debeziumState, isSynthetic = false)
}
override fun generateWarmStartProperties(streams: List<Stream>): Map<String, String> =
DebeziumPropertiesBuilder().with(commonProperties).withStreams(streams).buildMap()
override fun serialize(debeziumState: DebeziumState): OpaqueStateValue {
override fun serializeState(
offset: DebeziumOffset,
schemaHistory: DebeziumSchemaHistory?
): OpaqueStateValue {
val stateNode: ObjectNode = Jsons.objectNode()
// Serialize offset.
val offsetNode: JsonNode =
Jsons.objectNode().apply {
for ((k, v) in debeziumState.offset.wrapped) {
for ((k, v) in offset.wrapped) {
put(Jsons.writeValueAsString(k), Jsons.writeValueAsString(v))
}
}
stateNode.set<JsonNode>(MYSQL_CDC_OFFSET, offsetNode)
// Serialize schema history.
val schemaHistory: List<HistoryRecord>? = debeziumState.schemaHistory?.wrapped
if (schemaHistory != null) {
val uncompressedString: String =
schemaHistory.joinToString(separator = "\n") {
schemaHistory.wrapped.joinToString(separator = "\n") {
DocumentWriter.defaultWriter().write(it.document())
}
if (uncompressedString.length <= MAX_UNCOMPRESSED_LENGTH) {
@@ -427,24 +425,11 @@ class MySqlSourceDebeziumOperations(
MySqlSourceCdcTemporalConverter::class
)
val serverTimezone: String? =
(configuration.incrementalConfiguration as CdcIncrementalConfiguration).serverTimezone
if (!serverTimezone.isNullOrBlank()) {
dbzPropertiesBuilder.with("database.connectionTimezone", serverTimezone)
}
dbzPropertiesBuilder.buildMap()
}
cdcIncrementalConfiguration.serverTimezone
?.takeUnless { it.isBlank() }
?.let { dbzPropertiesBuilder.withDatabase("connectionTimezone", it) }
val syntheticProperties: Map<String, String> by lazy {
DebeziumPropertiesBuilder()
.with(commonProperties)
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
// We use the recovery property cause using this mode will instruct Debezium to
// construct the db schema history. Note that we used to use schema_only_recovery mode
// instead, but this mode has been deprecated.
.with("snapshot.mode", "recovery")
.withStreams(listOf())
.buildMap()
dbzPropertiesBuilder.buildMap()
}
companion object {
@@ -470,7 +455,9 @@ class MySqlSourceDebeziumOperations(
val INTERNAL_CONVERTER_CONFIG: Map<String, String> =
java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false.toString())
internal fun deserializeDebeziumState(opaqueStateValue: OpaqueStateValue): DebeziumState {
internal fun deserializeStateUnvalidated(
opaqueStateValue: OpaqueStateValue
): UnvalidatedDeserializedState {
val stateNode: ObjectNode = opaqueStateValue[STATE] as ObjectNode
// Deserialize offset.
val offsetNode: ObjectNode = stateNode[MYSQL_CDC_OFFSET] as ObjectNode
@@ -486,7 +473,7 @@ class MySqlSourceDebeziumOperations(
val offset = DebeziumOffset(offsetMap)
// Deserialize schema history.
val schemaNode: JsonNode =
stateNode[MYSQL_DB_HISTORY] ?: return DebeziumState(offset, schemaHistory = null)
stateNode[MYSQL_DB_HISTORY] ?: return UnvalidatedDeserializedState(offset)
val isCompressed: Boolean = stateNode[IS_COMPRESSED]?.asBoolean() ?: false
val uncompressedString: String =
if (isCompressed) {
@@ -494,7 +481,6 @@ class MySqlSourceDebeziumOperations(
val compressedBytes: ByteArray =
textValue.substring(1, textValue.length - 1).toByteArray(Charsets.UTF_8)
val decoded = Base64.decodeBase64(compressedBytes)
GZIPInputStream(ByteArrayInputStream(decoded)).reader(Charsets.UTF_8).readText()
} else {
schemaNode.textValue()
@@ -504,9 +490,14 @@ class MySqlSourceDebeziumOperations(
.lines()
.filter { it.isNotBlank() }
.map { HistoryRecord(DocumentReader.defaultReader().read(it)) }
return DebeziumState(offset, DebeziumSchemaHistory(schemaHistoryList))
return UnvalidatedDeserializedState(offset, DebeziumSchemaHistory(schemaHistoryList))
}
data class UnvalidatedDeserializedState(
val offset: DebeziumOffset,
val schemaHistory: DebeziumSchemaHistory? = null,
)
internal fun position(offset: DebeziumOffset): MySqlSourceCdcPosition {
if (offset.wrapped.size != 1) {
throw ConfigErrorException("Expected exactly 1 key in $offset")

View File

@@ -63,7 +63,7 @@ import io.airbyte.cdk.read.Where
import io.airbyte.cdk.read.WhereClauseLeafNode
import io.airbyte.cdk.read.WhereClauseNode
import io.airbyte.cdk.read.WhereNode
import io.airbyte.cdk.read.cdc.DebeziumState
import io.airbyte.cdk.read.cdc.DebeziumOffset
import io.airbyte.cdk.util.Jsons
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
@@ -102,10 +102,9 @@ class MySqlSourceOperations :
if (globalStateValue == null) {
return
}
val debeziumState: DebeziumState =
MySqlSourceDebeziumOperations.deserializeDebeziumState(globalStateValue)
val position: MySqlSourceCdcPosition =
MySqlSourceDebeziumOperations.position(debeziumState.offset)
val offset: DebeziumOffset =
MySqlSourceDebeziumOperations.deserializeStateUnvalidated(globalStateValue).offset
val position: MySqlSourceCdcPosition = MySqlSourceDebeziumOperations.position(offset)
recordData.set<JsonNode>(
MySqlSourceCdcMetaFields.CDC_LOG_FILE.id,
CdcStringMetaFieldType.jsonEncoder.encode(position.fileName),

View File

@@ -21,9 +21,8 @@ import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.read.ConcurrencyResource
import io.airbyte.cdk.read.ConfiguredSyncMode
import io.airbyte.cdk.read.DefaultJdbcSharedState
import io.airbyte.cdk.read.Feed
import io.airbyte.cdk.read.SelectQuerier
import io.airbyte.cdk.read.StateQuerier
import io.airbyte.cdk.read.StateManager
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.StreamFeedBootstrap
import io.airbyte.cdk.util.Jsons
@@ -152,15 +151,8 @@ class MySqlSourceJdbcPartitionFactoryTest {
recordData: ObjectNode
) {}
},
stateQuerier =
object : StateQuerier {
override val feeds: List<Feed> = listOf(stream)
override fun current(feed: Feed): OpaqueStateValue? =
if (feed == stream) incumbentStateValue else null
override fun resetFeedStates() {
/* no-op */
}
},
stateManager =
StateManager(initialStreamStates = mapOf(stream to incumbentStateValue)),
stream,
)
}