|
|
|
|
@@ -23,12 +23,11 @@ import io.airbyte.commons.json.Jsons;
|
|
|
|
|
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
|
|
|
|
|
import io.debezium.config.Configuration;
|
|
|
|
|
import io.debezium.connector.common.OffsetReader;
|
|
|
|
|
import io.debezium.connector.mysql.GtidSet;
|
|
|
|
|
import io.debezium.connector.mysql.MySqlConnectorConfig;
|
|
|
|
|
import io.debezium.connector.mysql.MySqlOffsetContext;
|
|
|
|
|
import io.debezium.connector.mysql.MySqlOffsetContext.Loader;
|
|
|
|
|
import io.debezium.connector.mysql.MySqlPartition;
|
|
|
|
|
import io.debezium.connector.mysql.strategy.mysql.MySqlGtidSet;
|
|
|
|
|
import io.debezium.connector.mysql.gtid.MySqlGtidSet;
|
|
|
|
|
import io.debezium.engine.ChangeEvent;
|
|
|
|
|
import io.debezium.pipeline.spi.Offsets;
|
|
|
|
|
import io.debezium.pipeline.spi.Partition;
|
|
|
|
|
@@ -71,12 +70,12 @@ public class MySqlDebeziumStateUtil implements DebeziumStateUtil {
|
|
|
|
|
if (gtidSetFromSavedState.isContainedWithin(availableGtidSet)) {
|
|
|
|
|
LOGGER.info("MySQL server current GTID set {} does contain the GTID set required by the connector {}", availableGtidSet,
|
|
|
|
|
gtidSetFromSavedState);
|
|
|
|
|
final Optional<GtidSet> gtidSetToReplicate = subtractGtidSet(availableGtidSet, gtidSetFromSavedState, database);
|
|
|
|
|
final Optional<MySqlGtidSet> gtidSetToReplicate = subtractGtidSet(availableGtidSet, gtidSetFromSavedState, database);
|
|
|
|
|
if (gtidSetToReplicate.isPresent()) {
|
|
|
|
|
final Optional<GtidSet> purgedGtidSet = purgedGtidSet(database);
|
|
|
|
|
final Optional<MySqlGtidSet> purgedGtidSet = purgedGtidSet(database);
|
|
|
|
|
if (purgedGtidSet.isPresent()) {
|
|
|
|
|
LOGGER.info("MySQL server has already purged {} GTIDs", purgedGtidSet.get());
|
|
|
|
|
final Optional<GtidSet> nonPurgedGtidSetToReplicate = subtractGtidSet(gtidSetToReplicate.get(), purgedGtidSet.get(), database);
|
|
|
|
|
final Optional<MySqlGtidSet> nonPurgedGtidSetToReplicate = subtractGtidSet(gtidSetToReplicate.get(), purgedGtidSet.get(), database);
|
|
|
|
|
if (nonPurgedGtidSetToReplicate.isPresent()) {
|
|
|
|
|
LOGGER.info("GTIDs known by the MySQL server but not processed yet {}, for replication are available only {}", gtidSetToReplicate,
|
|
|
|
|
nonPurgedGtidSetToReplicate);
|
|
|
|
|
@@ -116,8 +115,8 @@ public class MySqlDebeziumStateUtil implements DebeziumStateUtil {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Optional<GtidSet> subtractGtidSet(final GtidSet set1, final GtidSet set2, final JdbcDatabase database) {
|
|
|
|
|
try (final Stream<GtidSet> stream = database.unsafeResultSetQuery(
|
|
|
|
|
private Optional<MySqlGtidSet> subtractGtidSet(final MySqlGtidSet set1, final MySqlGtidSet set2, final JdbcDatabase database) {
|
|
|
|
|
try (final Stream<MySqlGtidSet> stream = database.unsafeResultSetQuery(
|
|
|
|
|
connection -> {
|
|
|
|
|
final PreparedStatement ps = connection.prepareStatement("SELECT GTID_SUBTRACT(?, ?)");
|
|
|
|
|
ps.setString(1, set1.toString());
|
|
|
|
|
@@ -125,7 +124,7 @@ public class MySqlDebeziumStateUtil implements DebeziumStateUtil {
|
|
|
|
|
return ps.executeQuery();
|
|
|
|
|
},
|
|
|
|
|
resultSet -> new MySqlGtidSet(resultSet.getString(1)))) {
|
|
|
|
|
final List<GtidSet> gtidSets = stream.toList();
|
|
|
|
|
final List<MySqlGtidSet> gtidSets = stream.toList();
|
|
|
|
|
if (gtidSets.isEmpty()) {
|
|
|
|
|
return Optional.empty();
|
|
|
|
|
} else if (gtidSets.size() == 1) {
|
|
|
|
|
@@ -138,8 +137,8 @@ public class MySqlDebeziumStateUtil implements DebeziumStateUtil {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Optional<GtidSet> purgedGtidSet(final JdbcDatabase database) {
|
|
|
|
|
try (final Stream<Optional<GtidSet>> stream = database.unsafeResultSetQuery(
|
|
|
|
|
private Optional<MySqlGtidSet> purgedGtidSet(final JdbcDatabase database) {
|
|
|
|
|
try (final Stream<Optional<MySqlGtidSet>> stream = database.unsafeResultSetQuery(
|
|
|
|
|
connection -> connection.createStatement().executeQuery("SELECT @@global.gtid_purged"),
|
|
|
|
|
resultSet -> {
|
|
|
|
|
if (resultSet.getMetaData().getColumnCount() > 0) {
|
|
|
|
|
@@ -150,7 +149,7 @@ public class MySqlDebeziumStateUtil implements DebeziumStateUtil {
|
|
|
|
|
}
|
|
|
|
|
return Optional.empty();
|
|
|
|
|
})) {
|
|
|
|
|
final List<Optional<GtidSet>> gtidSet = stream.toList();
|
|
|
|
|
final List<Optional<MySqlGtidSet>> gtidSet = stream.toList();
|
|
|
|
|
if (gtidSet.isEmpty()) {
|
|
|
|
|
return Optional.empty();
|
|
|
|
|
} else if (gtidSet.size() == 1) {
|
|
|
|
|
@@ -237,9 +236,10 @@ public class MySqlDebeziumStateUtil implements DebeziumStateUtil {
|
|
|
|
|
final ConfiguredAirbyteCatalog catalog,
|
|
|
|
|
final JdbcDatabase database) {
|
|
|
|
|
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
|
|
|
|
|
// We use the schema_only_recovery property cause using this mode will instruct Debezium to
|
|
|
|
|
// We use the recovery property cause using this mode will instruct Debezium to
|
|
|
|
|
// construct the db schema history.
|
|
|
|
|
properties.setProperty("snapshot.mode", "schema_only_recovery");
|
|
|
|
|
// Note that we used to use schema_only_recovery mode, but this mode has been deprecated.
|
|
|
|
|
properties.setProperty("snapshot.mode", "recovery");
|
|
|
|
|
final String dbName = database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText();
|
|
|
|
|
// Topic.prefix is sanitized version of database name. At this stage properties does not have this
|
|
|
|
|
// value - it's set in RelationalDbDebeziumPropertiesManager.
|
|
|
|
|
|