1
0
mirror of synced 2025-12-25 02:09:19 -05:00

mysql source : implement support for snapshot of new tables in cdc mode (#16954)

* mysql source : implement support for snapshot of new tables in cdc mode

* undo unwanted changes

* add more assertions

* format

* fix build

* fix build

* revert acceptance test changes

* bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Subodh Kant Chaturvedi
2022-09-27 00:14:07 +05:30
committed by GitHub
parent cf0c082e47
commit 99746082a4
32 changed files with 367 additions and 282 deletions

View File

@@ -16,5 +16,5 @@ ENV APPLICATION source-mysql
COPY --from=build /airbyte /airbyte
LABEL io.airbyte.version=0.6.13
LABEL io.airbyte.version=0.6.14
LABEL io.airbyte.name=airbyte/source-mysql

View File

@@ -1,15 +0,0 @@
#!/usr/bin/env sh
# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2):dev
# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
# Run
docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
-v $(pwd):/test_input \
airbyte/source-acceptance-test \
--acceptance-test-config /test_input

View File

@@ -3,7 +3,6 @@ plugins {
id 'airbyte-docker'
id 'airbyte-integration-test-java'
id 'airbyte-performance-test-java'
id 'airbyte-source-acceptance-test'
}
application {

View File

@@ -17,14 +17,25 @@ import io.airbyte.integrations.source.jdbc.AbstractJdbcSource.SslMode;
import java.net.URI;
import java.nio.file.Path;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySqlCdcProperties {
final static private Logger LOGGER = LoggerFactory.getLogger(MySqlCdcProperties.class);
static Properties getDebeziumProperties(final JdbcDatabase database) {
final JsonNode sourceConfig = database.getSourceConfig();
final Properties props = commonProperties(database);
// snapshot config
if (sourceConfig.has("snapshot_mode")) {
// The parameter `snapshot_mode` is passed in test to simulate reading the binlog directly and skip
// initial snapshot
props.setProperty("snapshot.mode", sourceConfig.get("snapshot_mode").asText());
} else {
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "when_needed");
}
return props;
}
private static Properties commonProperties(final JdbcDatabase database) {
final Properties props = new Properties();
final JsonNode sourceConfig = database.getSourceConfig();
final JsonNode dbConfig = database.getDatabaseConfig();
@@ -41,26 +52,6 @@ public class MySqlCdcProperties {
props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter");
// snapshot config
if (sourceConfig.has("snapshot_mode")) {
// The parameter `snapshot_mode` is passed in test to simulate reading the binlog directly and skip
// initial snapshot
props.setProperty("snapshot.mode", sourceConfig.get("snapshot_mode").asText());
} else {
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "when_needed");
}
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-locking-mode
// This is to make sure other database clients are allowed to write to a table while Airbyte is
// taking a snapshot. There is a risk involved that
// if any database client makes a schema change then the sync might break
props.setProperty("snapshot.locking.mode", "none");
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-include-schema-changes
props.setProperty("include.schema.changes", "false");
// This to make sure that binary data represented as a base64-encoded String.
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-binary-handling-mode
props.setProperty("binary.handling.mode", "base64");
props.setProperty("database.include.list", sourceConfig.get("database").asText());
// Check params for SSL connection in config and add properties for CDC SSL connection
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-ssl-mode
if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) {
@@ -106,6 +97,26 @@ public class MySqlCdcProperties {
props.setProperty("database.ssl.mode", "required");
}
}
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-locking-mode
// This is to make sure other database clients are allowed to write to a table while Airbyte is
// taking a snapshot. There is a risk involved that
// if any database client makes a schema change then the sync might break
props.setProperty("snapshot.locking.mode", "none");
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-include-schema-changes
props.setProperty("include.schema.changes", "false");
// This to make sure that binary data represented as a base64-encoded String.
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-binary-handling-mode
props.setProperty("binary.handling.mode", "base64");
props.setProperty("database.include.list", sourceConfig.get("database").asText());
return props;
}
static Properties getSnapshotProperties(final JdbcDatabase database) {
final Properties props = commonProperties(database);
props.setProperty("snapshot.mode", "initial_only");
return props;
}

View File

@@ -53,7 +53,13 @@ public class MySqlCdcStateHandler implements CdcStateHandler {
@Override
public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() {
throw new RuntimeException("Snapshot of individual tables currently not supported in MySQL");
LOGGER.info("Snapshot of new tables is complete, saving state");
/*
* Namespace pair is ignored by global state manager, but is needed for satisfy the API contract.
* Therefore, provide an empty optional.
*/
final AirbyteStateMessage stateMessage = stateManager.emit(Optional.empty());
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
}
}

View File

@@ -19,6 +19,7 @@ import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
@@ -42,15 +43,18 @@ import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -233,14 +237,32 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
final AirbyteDebeziumHandler handler =
new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, Duration.ofMinutes(5));
final MySqlCdcStateHandler mySqlCdcStateHandler = new MySqlCdcStateHandler(stateManager);
final MySqlCdcConnectorMetadataInjector mySqlCdcConnectorMetadataInjector = new MySqlCdcConnectorMetadataInjector();
final List<ConfiguredAirbyteStream> streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager);
final Optional<CdcState> cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState());
final MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null));
return Collections.singletonList(handler.getIncrementalIterators(catalog,
fetcher,
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog,
new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)),
new MySqlCdcStateHandler(stateManager),
new MySqlCdcConnectorMetadataInjector(),
MySqlCdcProperties.getDebeziumProperties(database),
emittedAt));
emittedAt);
if (streamsToSnapshot.isEmpty()) {
return Collections.singletonList(incrementalIteratorSupplier.get());
}
final AutoCloseableIterator<AirbyteMessage> snapshotIterator = handler.getSnapshotIterators(
new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot),
mySqlCdcConnectorMetadataInjector,
MySqlCdcProperties.getSnapshotProperties(database),
mySqlCdcStateHandler,
emittedAt);
return Collections.singletonList(
AutoCloseableIterators.concatWithEagerClose(snapshotIterator, AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier)));
} else {
LOGGER.info("using CDC: {}", false);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager,

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.fasterxml.jackson.databind.JsonNode;
import com.mysql.cj.MysqlType;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
@@ -12,7 +12,6 @@ import io.airbyte.db.MySqlUtils;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import java.io.IOException;
import org.jooq.DSLContext;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertEquals;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.google.common.collect.ImmutableMap;
import io.airbyte.db.MySqlUtils;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.google.common.collect.ImmutableMap;
import io.airbyte.db.MySqlUtils;

View File

@@ -2,9 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import java.nio.file.Path;

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import java.nio.file.Path;

View File

@@ -33,19 +33,13 @@ import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.debezium.CdcSourceTest;
import io.airbyte.integrations.debezium.CdcTargetPosition;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.sql.DataSource;
import org.jooq.SQLDialect;
@@ -131,12 +125,12 @@ public class CdcMysqlSourceTest extends CdcSourceTest {
}
@Override
protected CdcTargetPosition extractPosition(JsonNode record) {
protected CdcTargetPosition extractPosition(final JsonNode record) {
return new MySqlCdcTargetPosition(record.get(CDC_LOG_FILE).asText(), record.get(CDC_LOG_POS).asInt());
}
@Override
protected void assertNullCdcMetaData(JsonNode data) {
protected void assertNullCdcMetaData(final JsonNode data) {
assertNull(data.get(CDC_LOG_FILE));
assertNull(data.get(CDC_LOG_POS));
assertNull(data.get(CDC_UPDATED_AT));
@@ -144,7 +138,7 @@ public class CdcMysqlSourceTest extends CdcSourceTest {
}
@Override
protected void assertCdcMetaData(JsonNode data, boolean deletedAtNull) {
protected void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull) {
assertNotNull(data.get(CDC_LOG_FILE));
assertNotNull(data.get(CDC_LOG_POS));
assertNotNull(data.get(CDC_UPDATED_AT));
@@ -156,7 +150,7 @@ public class CdcMysqlSourceTest extends CdcSourceTest {
}
@Override
protected void removeCDCColumns(ObjectNode data) {
protected void removeCDCColumns(final ObjectNode data) {
data.remove(CDC_LOG_FILE);
data.remove(CDC_LOG_POS);
data.remove(CDC_UPDATED_AT);
@@ -164,9 +158,9 @@ public class CdcMysqlSourceTest extends CdcSourceTest {
}
@Override
protected void addCdcMetadataColumns(AirbyteStream stream) {
ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
ObjectNode properties = (ObjectNode) jsonSchema.get("properties");
protected void addCdcMetadataColumns(final AirbyteStream stream) {
final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
final ObjectNode properties = (ObjectNode) jsonSchema.get("properties");
final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number"));
@@ -193,38 +187,16 @@ public class CdcMysqlSourceTest extends CdcSourceTest {
}
@Override
public void assertExpectedStateMessages(List<AirbyteStateMessage> stateMessages) {
for (AirbyteStateMessage stateMessage : stateMessages) {
public void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages) {
for (final AirbyteStateMessage stateMessage : stateMessages) {
assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET));
assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY));
}
}
@Override
protected AirbyteCatalog expectedCatalogForDiscover() {
final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG);
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2",
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty()));
List<AirbyteStream> streams = expectedCatalog.getStreams();
// stream with PK
streams.get(0).setSourceDefinedCursor(true);
addCdcMetadataColumns(streams.get(0));
AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING));
streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList());
streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
addCdcMetadataColumns(streamWithoutPK);
streams.add(streamWithoutPK);
expectedCatalog.withStreams(streams);
return expectedCatalog;
protected String randomTableSchema() {
return MODELS_SCHEMA;
}
@Test

View File

@@ -41,7 +41,7 @@ public class MySqlSourceOperationsTest {
private Database database;
@BeforeEach
private void init() {
public void init() {
container = new MySQLContainer<>("mysql:8.0");
container.start();
database = new Database(DSLContextFactory.create(