1
0
mirror of synced 2025-12-25 02:09:19 -05:00

source-mysql: upgrade debezium to 2.5.4 (#39144)

This patch attempts to fix the following issues:

airbytehq/oncall#5332
#32348
This commit is contained in:
Yue Li
2024-06-12 16:25:21 -07:00
committed by GitHub
parent fd8baf9237
commit 30cd6f1a3d
4 changed files with 9 additions and 7 deletions

View File

@@ -24,8 +24,8 @@ application {
dependencies {
implementation 'mysql:mysql-connector-java:8.0.30'
implementation 'io.debezium:debezium-embedded:2.4.0.Final'
implementation 'io.debezium:debezium-connector-mysql:2.4.0.Final'
implementation 'io.debezium:debezium-embedded:2.5.4.Final'
implementation 'io.debezium:debezium-connector-mysql:2.5.4.Final'
testFixturesImplementation 'org.testcontainers:mysql:1.19.0'

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.4.7
dockerImageTag: 3.4.8
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql

View File

@@ -28,6 +28,7 @@ import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlOffsetContext.Loader;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.strategy.mysql.MySqlGtidSet;
import io.debezium.engine.ChangeEvent;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
@@ -63,9 +64,9 @@ public class MySqlDebeziumStateUtil implements DebeziumStateUtil {
LOGGER.info("Connector used GTIDs previously, but MySQL server does not know of any GTIDs or they are not enabled");
return false;
}
final GtidSet gtidSetFromSavedState = new GtidSet(savedState.gtidSet().get());
final MySqlGtidSet gtidSetFromSavedState = new MySqlGtidSet(savedState.gtidSet().get());
// Get the GTID set that is available in the server
final GtidSet availableGtidSet = new GtidSet(availableGtidStr.get());
final MySqlGtidSet availableGtidSet = new MySqlGtidSet(availableGtidStr.get());
if (gtidSetFromSavedState.isContainedWithin(availableGtidSet)) {
LOGGER.info("MySQL server current GTID set {} does contain the GTID set required by the connector {}", availableGtidSet,
gtidSetFromSavedState);
@@ -122,7 +123,7 @@ public class MySqlDebeziumStateUtil implements DebeziumStateUtil {
ps.setString(2, set2.toString());
return ps.executeQuery();
},
resultSet -> new GtidSet(resultSet.getString(1)))) {
resultSet -> new MySqlGtidSet(resultSet.getString(1)))) {
final List<GtidSet> gtidSets = stream.toList();
if (gtidSets.isEmpty()) {
return Optional.empty();
@@ -143,7 +144,7 @@ public class MySqlDebeziumStateUtil implements DebeziumStateUtil {
if (resultSet.getMetaData().getColumnCount() > 0) {
String string = resultSet.getString(1);
if (string != null && !string.isEmpty()) {
return Optional.of(new GtidSet(string));
return Optional.of(new MySqlGtidSet(string));
}
}
return Optional.empty();