1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Enable MySQL CDC + PK 100% (#30213)

This commit is contained in:
Akash Kulkarni
2023-09-07 13:32:29 -07:00
committed by GitHub
parent d1e3ebefe1
commit 09df4f4875
15 changed files with 401 additions and 805 deletions

View File

@@ -24,6 +24,6 @@ ENV APPLICATION source-mysql
COPY --from=build /airbyte /airbyte
LABEL io.airbyte.version=3.0.3
LABEL io.airbyte.version=3.0.4
LABEL io.airbyte.name=airbyte/source-mysql

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.0.3
dockerImageTag: 3.0.4
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql

View File

@@ -115,12 +115,6 @@ public class MySqlCdcProperties {
return props;
}
static Properties getSnapshotProperties(final JdbcDatabase database) {
final Properties props = commonProperties(database);
props.setProperty("snapshot.mode", "initial_only");
return props;
}
private static int generateServerID() {
final int min = 5400;
final int max = 6400;

View File

@@ -27,31 +27,23 @@ import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
import io.airbyte.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.integrations.debezium.internals.FirstRecordWaitTimeUtil;
import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcPosition;
import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcTargetPosition;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
import io.airbyte.integrations.source.mysql.initialsync.MySqlFeatureFlags;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil;
import io.airbyte.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory;
@@ -65,21 +57,17 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
@@ -346,47 +334,9 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
final StateManager stateManager,
final Instant emittedAt) {
final JsonNode sourceConfig = database.getSourceConfig();
final MySqlFeatureFlags featureFlags = new MySqlFeatureFlags(sourceConfig);
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
if (featureFlags.isCdcInitialSyncViaPkEnabled()) {
LOGGER.info("Using PK + CDC");
return MySqlInitialReadUtil.getCdcReadIterators(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString());
}
final Duration firstRecordWaitTime = FirstRecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig);
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
final AirbyteDebeziumHandler<MySqlCdcPosition> handler =
new AirbyteDebeziumHandler<>(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, firstRecordWaitTime, OptionalInt.empty());
final MySqlCdcStateHandler mySqlCdcStateHandler = new MySqlCdcStateHandler(stateManager);
final MySqlCdcConnectorMetadataInjector mySqlCdcConnectorMetadataInjector = MySqlCdcConnectorMetadataInjector.getInstance(emittedAt);
final List<ConfiguredAirbyteStream> streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager);
final Optional<CdcState> cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState());
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog,
new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)),
new MySqlCdcStateHandler(stateManager),
mySqlCdcConnectorMetadataInjector,
MySqlCdcProperties.getDebeziumProperties(database),
DebeziumPropertiesManager.DebeziumConnectorType.RELATIONALDB,
emittedAt,
false);
if (streamsToSnapshot.isEmpty()) {
return Collections.singletonList(incrementalIteratorSupplier.get());
}
final AutoCloseableIterator<AirbyteMessage> snapshotIterator = handler.getSnapshotIterators(
new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot),
mySqlCdcConnectorMetadataInjector,
MySqlCdcProperties.getSnapshotProperties(database),
mySqlCdcStateHandler,
DebeziumPropertiesManager.DebeziumConnectorType.RELATIONALDB,
emittedAt);
return Collections.singletonList(
AutoCloseableIterators.concatWithEagerClose(AirbyteTraceMessageUtility::emitStreamStatusTrace, snapshotIterator,
AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)));
LOGGER.info("Using PK + CDC");
return MySqlInitialReadUtil.getCdcReadIterators(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString());
} else {
LOGGER.info("using CDC: {}", false);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager,

View File

@@ -9,17 +9,12 @@ import com.fasterxml.jackson.databind.JsonNode;
// Feature flags to gate new primary key load features.
public class MySqlFeatureFlags {
public static final String CDC_VIA_PK = "cdc_via_pk";
private final JsonNode sourceConfig;
public MySqlFeatureFlags(final JsonNode sourceConfig) {
this.sourceConfig = sourceConfig;
}
public boolean isCdcInitialSyncViaPkEnabled() {
return getFlagValue(CDC_VIA_PK);
}
private boolean getFlagValue(final String flag) {
return sourceConfig.has(flag) && sourceConfig.get(flag).asBoolean();
}

View File

@@ -8,6 +8,8 @@ import static io.airbyte.integrations.io.airbyte.integration_tests.sources.utils
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
@@ -23,10 +25,18 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.List;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
@ExtendWith(SystemStubsExtension.class)
public class CdcBinlogsMySqlSourceDatatypeTest extends AbstractMySqlSourceDatatypeTest {
@SystemStub
private EnvironmentVariables environmentVariables;
private DSLContext dslContext;
private JsonNode stateAfterFirstSync;
@@ -60,14 +70,8 @@ public class CdcBinlogsMySqlSourceDatatypeTest extends AbstractMySqlSourceDataty
catalog.getStreams().add(dummyTableWithData);
final List<AirbyteMessage> allMessages = super.runRead(catalog);
if (allMessages.size() != 2) {
throw new RuntimeException("First sync should only generate 2 records");
}
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(allMessages);
if (stateAfterFirstBatch == null || stateAfterFirstBatch.isEmpty()) {
throw new RuntimeException("stateAfterFirstBatch should not be null or empty");
}
stateAfterFirstSync = Jsons.jsonNode(stateAfterFirstBatch);
stateAfterFirstSync = Jsons.jsonNode(List.of(Iterables.getLast(stateAfterFirstBatch)));
if (stateAfterFirstSync == null) {
throw new RuntimeException("stateAfterFirstSync should not be null");
}
@@ -83,6 +87,7 @@ public class CdcBinlogsMySqlSourceDatatypeTest extends AbstractMySqlSourceDataty
protected Database setupDatabase() throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.put("initial_waiting_seconds", INITIAL_CDC_WAITING_SECONDS)

View File

@@ -8,6 +8,7 @@ import static io.airbyte.integrations.io.airbyte.integration_tests.sources.utils
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
@@ -16,10 +17,18 @@ import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
@ExtendWith(SystemStubsExtension.class)
public class CdcInitialSnapshotMySqlSourceDatatypeTest extends AbstractMySqlSourceDatatypeTest {
@SystemStub
private EnvironmentVariables environmentVariables;
private DSLContext dslContext;
@Override
@@ -32,6 +41,7 @@ public class CdcInitialSnapshotMySqlSourceDatatypeTest extends AbstractMySqlSour
protected Database setupDatabase() throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.put("initial_waiting_seconds", INITIAL_CDC_WAITING_SECONDS)

View File

@@ -49,12 +49,12 @@ import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
@SystemStub
private EnvironmentVariables environmentVariables;
protected EnvironmentVariables environmentVariables;
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private MySQLContainer<?> container;
private JsonNode config;
protected static final String STREAM_NAME = "id_and_name";
protected static final String STREAM_NAME2 = "starships";
protected MySQLContainer<?> container;
protected JsonNode config;
@Override
protected String getImageName() {
@@ -100,41 +100,13 @@ public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}
protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
Field.of("id", JsonSchemaType.NUMBER)
/* no name field */)
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME2),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
/* no id field */
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}
@Override
protected JsonNode getState() {
return null;
}
@Override
protected void setupEnvironment(final TestDestinationEnv environment) {
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
@@ -157,7 +129,7 @@ public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
createAndPopulateTables();
}
private void createAndPopulateTables() {
protected void createAndPopulateTables() {
executeQuery("CREATE TABLE id_and_name(id INTEGER PRIMARY KEY, name VARCHAR(200));");
executeQuery(
"INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
@@ -166,17 +138,17 @@ public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
"INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
}
private void revokeAllPermissions() {
protected void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';");
}
private void grantCorrectPermissions() {
protected void grantCorrectPermissions() {
executeQuery(
"GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "
+ container.getUsername() + "@'%';");
}
private void executeQuery(final String query) {
protected void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
@@ -227,11 +199,6 @@ public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
assertEquals(6, filterRecords(runRead(configuredCatalog, latestState)).size());
}
@Override
protected boolean supportsPerStream() {
return true;
}
@Test
public void testIncrementalReadSelectedColumns() throws Exception {
final ConfiguredAirbyteCatalog catalog = getConfiguredCatalogWithPartialColumns();
@@ -240,7 +207,36 @@ public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
final List<AirbyteRecordMessage> records = filterRecords(allMessages);
assertFalse(records.isEmpty(), "Expected a incremental sync to produce records");
verifyFieldNotExist(records, STREAM_NAME, "name");
verifyFieldNotExist(records, STREAM_NAME2, "id");
verifyFieldNotExist(records, STREAM_NAME2, "name");
}
private ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() {
// We cannot strip the primary key field as that is required for a successful CDC sync
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
Field.of("id", JsonSchemaType.NUMBER)
/* no name field */)
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME2),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
/* no name field */
Field.of("id", JsonSchemaType.NUMBER))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}
private void verifyFieldNotExist(final List<AirbyteRecordMessage> records, final String stream, final String field) {

View File

@@ -5,102 +5,25 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import static io.airbyte.integrations.io.airbyte.integration_tests.sources.utils.TestConstants.INITIAL_CDC_WAITING_SECONDS;
import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.MySqlUtils;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;
public class CdcMySqlSslCaCertificateSourceAcceptanceTest extends SourceAcceptanceTest {
public class CdcMySqlSslCaCertificateSourceAcceptanceTest extends CdcMySqlSourceAcceptanceTest {
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private MySQLContainer<?> container;
private JsonNode config;
private static MySqlUtils.Certificate certs;
@Override
protected String getImageName() {
return "airbyte/source-mysql:dev";
}
@Override
protected ConnectorSpecification getSpec() throws Exception {
return SshHelpers.getSpecAndInjectSsh();
}
@Override
protected JsonNode getConfig() {
return config;
}
@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME2),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}
@Override
protected JsonNode getState() {
return null;
}
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
certs = MySqlUtils.getCertificate(container, true);
final var sslMode = ImmutableMap.builder()
@@ -131,80 +54,4 @@ public class CdcMySqlSslCaCertificateSourceAcceptanceTest extends SourceAcceptan
grantCorrectPermissions();
createAndPopulateTables();
}
private void createAndPopulateTables() {
executeQuery("CREATE TABLE id_and_name(id INTEGER PRIMARY KEY, name VARCHAR(200));");
executeQuery(
"INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
executeQuery("CREATE TABLE starships(id INTEGER PRIMARY KEY, name VARCHAR(200));");
executeQuery(
"INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
}
private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';");
}
private void grantCorrectPermissions() {
executeQuery(
"GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "
+ container.getUsername() + "@'%';");
}
private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
DatabaseDriver.MYSQL.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
container.getHost(),
container.getFirstMappedPort(),
container.getDatabaseName()),
SQLDialect.MYSQL)) {
final Database database = new Database(dslContext);
database.query(
ctx -> ctx
.execute(query));
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
@Override
protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
}
@Test
public void testIncrementalSyncShouldNotFailIfBinlogIsDeleted() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog());
// only sync incremental streams
configuredCatalog.setStreams(
configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList()));
final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalog, getState());
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
final List<AirbyteStateMessage> stateMessages = airbyteMessages
.stream()
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
.map(AirbyteMessage::getState)
.collect(Collectors.toList());
assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records");
assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages");
// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
final JsonNode latestState = Jsons.jsonNode(supportsPerStream() ? stateMessages : List.of(Iterables.getLast(stateMessages)));
// RESET MASTER removes all binary log files that are listed in the index file,
// leaving only a single, empty binary log file with a numeric suffix of .000001
executeQuery("RESET MASTER;");
assertEquals(6, filterRecords(runRead(configuredCatalog, latestState)).size());
}
@Override
protected boolean supportsPerStream() {
return true;
}
}

View File

@@ -5,100 +5,22 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import static io.airbyte.integrations.io.airbyte.integration_tests.sources.utils.TestConstants.INITIAL_CDC_WAITING_SECONDS;
import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;
public class CdcMySqlSslRequiredSourceAcceptanceTest extends SourceAcceptanceTest {
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private MySQLContainer<?> container;
private JsonNode config;
public class CdcMySqlSslRequiredSourceAcceptanceTest extends CdcMySqlSourceAcceptanceTest {
@Override
protected String getImageName() {
return "airbyte/source-mysql:dev";
}
@Override
protected ConnectorSpecification getSpec() throws Exception {
return SshHelpers.getSpecAndInjectSsh();
}
@Override
protected JsonNode getConfig() {
return config;
}
@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME2),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}
@Override
protected JsonNode getState() {
return null;
}
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
protected void setupEnvironment(final TestDestinationEnv environment) {
container = new MySQLContainer<>("mysql:8.0");
container.start();
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
final var sslMode = ImmutableMap.builder()
.put(JdbcUtils.MODE_KEY, "required")
@@ -129,80 +51,4 @@ public class CdcMySqlSslRequiredSourceAcceptanceTest extends SourceAcceptanceTes
private void alterUserRequireSsl() {
executeQuery("ALTER USER " + container.getUsername() + " REQUIRE SSL;");
}
private void createAndPopulateTables() {
executeQuery("CREATE TABLE id_and_name(id INTEGER PRIMARY KEY, name VARCHAR(200));");
executeQuery(
"INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
executeQuery("CREATE TABLE starships(id INTEGER PRIMARY KEY, name VARCHAR(200));");
executeQuery(
"INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
}
private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';");
}
private void grantCorrectPermissions() {
executeQuery(
"GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "
+ container.getUsername() + "@'%';");
}
private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
DatabaseDriver.MYSQL.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
container.getHost(),
container.getFirstMappedPort(),
container.getDatabaseName()),
SQLDialect.MYSQL)) {
final Database database = new Database(dslContext);
database.query(
ctx -> ctx
.execute(query));
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
@Override
protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
}
@Test
public void testIncrementalSyncShouldNotFailIfBinlogIsDeleted() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog());
// only sync incremental streams
configuredCatalog.setStreams(
configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList()));
final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalog, getState());
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
final List<AirbyteStateMessage> stateMessages = airbyteMessages
.stream()
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
.map(AirbyteMessage::getState)
.collect(Collectors.toList());
assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records");
assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages");
// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
final JsonNode latestState = Jsons.jsonNode(supportsPerStream() ? stateMessages : List.of(Iterables.getLast(stateMessages)));
// RESET MASTER removes all binary log files that are listed in the index file,
// leaving only a single, empty binary log file with a numeric suffix of .000001
executeQuery("RESET MASTER;");
assertEquals(6, filterRecords(runRead(configuredCatalog, latestState)).size());
}
@Override
protected boolean supportsPerStream() {
return true;
}
}

View File

@@ -12,8 +12,11 @@ import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_DEFAULT_CURSO
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS;
import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.PRIMARY_KEY_STATE_TYPE;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.STATE_TYPE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -22,6 +25,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
@@ -34,17 +39,30 @@ import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.debezium.CdcSourceTest;
import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcTargetPosition;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteGlobalState;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterEach;
@@ -221,16 +239,6 @@ public class CdcMysqlSourceTest extends CdcSourceTest {
return database;
}
@Override
protected void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages) {
assertEquals(1, stateMessages.size());
assertNotNull(stateMessages.get(0).getData());
for (final AirbyteStateMessage stateMessage : stateMessages) {
assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET));
assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY));
}
}
@Override
protected String randomTableSchema() {
return MODELS_SCHEMA;
@@ -345,8 +353,313 @@ public class CdcMysqlSourceTest extends CdcSourceTest {
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
}
protected void assertStateForSyncShouldHandlePurgedLogsGracefully(final List<AirbyteStateMessage> stateMessages, final int syncNumber) {
assertExpectedStateMessages(stateMessages);
@Override
protected void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages) {
assertEquals(7, stateMessages.size());
assertStateTypes(stateMessages, 4);
}
@Override
protected void assertExpectedStateMessagesFromIncrementalSync(final List<AirbyteStateMessage> stateMessages) {
assertEquals(1, stateMessages.size());
assertNotNull(stateMessages.get(0).getData());
for (final AirbyteStateMessage stateMessage : stateMessages) {
assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET));
assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY));
}
}
private void assertStateForSyncShouldHandlePurgedLogsGracefully(final List<AirbyteStateMessage> stateMessages, final int syncNumber) {
if (syncNumber == 1) {
assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(stateMessages);
} else if (syncNumber == 2) {
// Sync number 2 uses the state from sync number 1 but before we trigger the sync 2 we purge the
// binary logs and as a result the validation of
// logs present on the server fails, and we trigger a sync from scratch
assertEquals(47, stateMessages.size());
assertStateTypes(stateMessages, 44);
} else {
throw new RuntimeException("Unknown sync number");
}
}
@Override
protected void assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(final List<AirbyteStateMessage> stateAfterFirstBatch) {
assertEquals(27, stateAfterFirstBatch.size());
assertStateTypes(stateAfterFirstBatch, 24);
}
@Override
protected void assertExpectedStateMessagesForNoData(final List<AirbyteStateMessage> stateMessages) {
assertEquals(2, stateMessages.size());
}
private void assertStateTypes(final List<AirbyteStateMessage> stateMessages, final int indexTillWhichExpectPkState) {
JsonNode sharedState = null;
for (int i = 0; i < stateMessages.size(); i++) {
final AirbyteStateMessage stateMessage = stateMessages.get(i);
assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType());
final AirbyteGlobalState global = stateMessage.getGlobal();
assertNotNull(global.getSharedState());
if (Objects.isNull(sharedState)) {
sharedState = global.getSharedState();
} else {
assertEquals(sharedState, global.getSharedState());
}
assertEquals(1, global.getStreamStates().size());
final AirbyteStreamState streamState = global.getStreamStates().get(0);
if (i <= indexTillWhichExpectPkState) {
assertTrue(streamState.getStreamState().has(STATE_TYPE_KEY));
assertEquals(PRIMARY_KEY_STATE_TYPE, streamState.getStreamState().get(STATE_TYPE_KEY).asText());
} else {
assertFalse(streamState.getStreamState().has(STATE_TYPE_KEY));
}
}
}
@Override
protected void assertStateMessagesForNewTableSnapshotTest(final List<AirbyteStateMessage> stateMessages,
final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) {
assertEquals(7, stateMessages.size());
for (int i = 0; i <= 4; i++) {
final AirbyteStateMessage stateMessage = stateMessages.get(i);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessage.getType());
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessage.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSnapshotState = stateMessage.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
stateMessage.getGlobal().getStreamStates().forEach(s -> {
final JsonNode streamState = s.getStreamState();
if (s.getStreamDescriptor().equals(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema()))) {
assertEquals(PRIMARY_KEY_STATE_TYPE, streamState.get(STATE_TYPE_KEY).asText());
} else if (s.getStreamDescriptor().equals(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA))) {
assertFalse(streamState.has(STATE_TYPE_KEY));
} else {
throw new RuntimeException("Unknown stream");
}
});
}
final AirbyteStateMessage secondLastSateMessage = stateMessages.get(5);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, secondLastSateMessage.getType());
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
secondLastSateMessage.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSnapshotState = secondLastSateMessage.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
secondLastSateMessage.getGlobal().getStreamStates().forEach(s -> {
final JsonNode streamState = s.getStreamState();
assertFalse(streamState.has(STATE_TYPE_KEY));
});
final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateMessages.get(6);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType());
assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSyncCompletionState.contains(
new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSyncCompletionState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData());
}
@Test
public void testCompositeIndexInitialLoad() throws Exception {
// Simulate adding a composite index by modifying the catalog.
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG);
final List<List<String>> primaryKeys = configuredCatalog.getStreams().get(0).getStream().getSourceDefinedPrimaryKey();
primaryKeys.add(List.of("make_id"));
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), configuredCatalog, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages1);
assertExpectedStateMessages(stateMessages1);
// Re-run the sync with state associated with record w/ id = 15 (second to last record).
// We expect to read 2 records, since in the case of a composite PK we issue a >= query.
// We also expect 3 state records. One associated with the pk state, one to signify end of initial
// load, and
// the last one indicating the cdc position we have synced until.
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(4)));
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), configuredCatalog, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertExpectedRecords(new HashSet<>(MODEL_RECORDS.subList(4, 6)), recordMessages2);
assertEquals(3, stateMessages2.size());
assertStateTypes(stateMessages2, 0);
}
@Test
public void testTwoStreamSync() throws Exception {
// Add another stream models_2 and read that one as well.
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG);
final List<JsonNode> MODEL_RECORDS_2 = ImmutableList.of(
Jsons.jsonNode(ImmutableMap.of(COL_ID, 110, COL_MAKE_ID, 1, COL_MODEL, "Fiesta-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_MAKE_ID, 1, COL_MODEL, "Focus-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 130, COL_MAKE_ID, 1, COL_MODEL, "Ranger-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 140, COL_MAKE_ID, 2, COL_MODEL, "GLA-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 150, COL_MAKE_ID, 2, COL_MODEL, "A 220-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2")));
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2",
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID)));
for (final JsonNode recordJson : MODEL_RECORDS_2) {
writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID,
COL_MAKE_ID, COL_MODEL);
}
final ConfiguredAirbyteStream airbyteStream = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))));
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
final List<ConfiguredAirbyteStream> streams = configuredCatalog.getStreams();
streams.add(airbyteStream);
configuredCatalog.withStreams(streams);
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), configuredCatalog, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertEquals(13, stateMessages1.size());
JsonNode sharedState = null;
StreamDescriptor firstStreamInState = null;
for (int i = 0; i < stateMessages1.size(); i++) {
final AirbyteStateMessage stateMessage = stateMessages1.get(i);
assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType());
final AirbyteGlobalState global = stateMessage.getGlobal();
assertNotNull(global.getSharedState());
if (Objects.isNull(sharedState)) {
sharedState = global.getSharedState();
} else {
assertEquals(sharedState, global.getSharedState());
}
if (Objects.isNull(firstStreamInState)) {
assertEquals(1, global.getStreamStates().size());
firstStreamInState = global.getStreamStates().get(0).getStreamDescriptor();
}
if (i <= 4) {
// First 4 state messages are pk state
assertEquals(1, global.getStreamStates().size());
final AirbyteStreamState streamState = global.getStreamStates().get(0);
assertTrue(streamState.getStreamState().has(STATE_TYPE_KEY));
assertEquals(PRIMARY_KEY_STATE_TYPE, streamState.getStreamState().get(STATE_TYPE_KEY).asText());
} else if (i == 5) {
// 5th state message is the final state message emitted for the stream
assertEquals(1, global.getStreamStates().size());
final AirbyteStreamState streamState = global.getStreamStates().get(0);
assertFalse(streamState.getStreamState().has(STATE_TYPE_KEY));
} else if (i <= 10) {
// 6th to 10th is the primary_key state message for the 2nd stream but final state message for 1st
// stream
assertEquals(2, global.getStreamStates().size());
final StreamDescriptor finalFirstStreamInState = firstStreamInState;
global.getStreamStates().forEach(c -> {
if (c.getStreamDescriptor().equals(finalFirstStreamInState)) {
assertFalse(c.getStreamState().has(STATE_TYPE_KEY));
} else {
assertTrue(c.getStreamState().has(STATE_TYPE_KEY));
assertEquals(PRIMARY_KEY_STATE_TYPE, c.getStreamState().get(STATE_TYPE_KEY).asText());
}
});
} else {
// last 2 state messages don't contain primary_key info cause primary_key sync should be complete
assertEquals(2, global.getStreamStates().size());
global.getStreamStates().forEach(c -> assertFalse(c.getStreamState().has(STATE_TYPE_KEY)));
}
}
final Set<String> names = new HashSet<>(STREAM_NAMES);
names.add(MODELS_STREAM_NAME + "_2");
assertExpectedRecords(Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
.collect(Collectors.toSet()),
recordMessages1,
names,
names,
MODELS_SCHEMA);
assertEquals(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA), firstStreamInState);
// Triggering a sync with a primary_key state for 1 stream and complete state for other stream
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), configuredCatalog, Jsons.jsonNode(Collections.singletonList(stateMessages1.get(6))));
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertEquals(6, stateMessages2.size());
for (int i = 0; i < stateMessages2.size(); i++) {
final AirbyteStateMessage stateMessage = stateMessages2.get(i);
assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType());
final AirbyteGlobalState global = stateMessage.getGlobal();
assertNotNull(global.getSharedState());
assertEquals(2, global.getStreamStates().size());
if (i <= 3) {
final StreamDescriptor finalFirstStreamInState = firstStreamInState;
global.getStreamStates().forEach(c -> {
// First 4 state messages are primary_key state for the stream that didn't complete primary_key sync
// the first time
if (c.getStreamDescriptor().equals(finalFirstStreamInState)) {
assertFalse(c.getStreamState().has(STATE_TYPE_KEY));
} else {
assertTrue(c.getStreamState().has(STATE_TYPE_KEY));
assertEquals(PRIMARY_KEY_STATE_TYPE, c.getStreamState().get(STATE_TYPE_KEY).asText());
}
});
} else {
// last 2 state messages don't contain primary_key info cause primary_key sync should be complete
global.getStreamStates().forEach(c -> assertFalse(c.getStreamState().has(STATE_TYPE_KEY)));
}
}
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
assertEquals(5, recordMessages2.size());
assertExpectedRecords(new HashSet<>(MODEL_RECORDS_2.subList(1, MODEL_RECORDS_2.size())),
recordMessages2,
names,
names,
MODELS_SCHEMA);
}
}

View File

@@ -1,361 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.PRIMARY_KEY_STATE_TYPE;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.STATE_TYPE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.mysql.initialsync.MySqlFeatureFlags;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteGlobalState;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
public class InitialPkLoadEnabledCdcMysqlSourceTest extends CdcMysqlSourceTest {
@Override
protected JsonNode getConfig() {
final JsonNode config = super.getConfig();
((ObjectNode) config).put(MySqlFeatureFlags.CDC_VIA_PK, true);
return config;
}
@Override
protected void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages) {
assertEquals(7, stateMessages.size());
assertStateTypes(stateMessages, 4);
}
@Override
protected void assertExpectedStateMessagesFromIncrementalSync(final List<AirbyteStateMessage> stateMessages) {
super.assertExpectedStateMessages(stateMessages);
}
@Override
protected void assertStateForSyncShouldHandlePurgedLogsGracefully(final List<AirbyteStateMessage> stateMessages, final int syncNumber) {
if (syncNumber == 1) {
assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(stateMessages);
} else if (syncNumber == 2) {
// Sync number 2 uses the state from sync number 1 but before we trigger the sync 2 we purge the
// binary logs and as a result the validation of
// logs present on the server fails, and we trigger a sync from scratch
assertEquals(47, stateMessages.size());
assertStateTypes(stateMessages, 44);
} else {
throw new RuntimeException("Unknown sync number");
}
}
@Override
protected void assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(final List<AirbyteStateMessage> stateAfterFirstBatch) {
assertEquals(27, stateAfterFirstBatch.size());
assertStateTypes(stateAfterFirstBatch, 24);
}
@Override
protected void assertExpectedStateMessagesForNoData(final List<AirbyteStateMessage> stateMessages) {
assertEquals(2, stateMessages.size());
}
private void assertStateTypes(final List<AirbyteStateMessage> stateMessages, final int indexTillWhichExpectPkState) {
JsonNode sharedState = null;
for (int i = 0; i < stateMessages.size(); i++) {
final AirbyteStateMessage stateMessage = stateMessages.get(i);
assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType());
final AirbyteGlobalState global = stateMessage.getGlobal();
assertNotNull(global.getSharedState());
if (Objects.isNull(sharedState)) {
sharedState = global.getSharedState();
} else {
assertEquals(sharedState, global.getSharedState());
}
assertEquals(1, global.getStreamStates().size());
final AirbyteStreamState streamState = global.getStreamStates().get(0);
if (i <= indexTillWhichExpectPkState) {
assertTrue(streamState.getStreamState().has(STATE_TYPE_KEY));
assertEquals(PRIMARY_KEY_STATE_TYPE, streamState.getStreamState().get(STATE_TYPE_KEY).asText());
} else {
assertFalse(streamState.getStreamState().has(STATE_TYPE_KEY));
}
}
}
@Override
protected void assertStateMessagesForNewTableSnapshotTest(final List<AirbyteStateMessage> stateMessages,
final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) {
assertEquals(7, stateMessages.size());
for (int i = 0; i <= 4; i++) {
final AirbyteStateMessage stateMessage = stateMessages.get(i);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessage.getType());
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessage.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSnapshotState = stateMessage.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
stateMessage.getGlobal().getStreamStates().forEach(s -> {
final JsonNode streamState = s.getStreamState();
if (s.getStreamDescriptor().equals(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema()))) {
assertEquals(PRIMARY_KEY_STATE_TYPE, streamState.get(STATE_TYPE_KEY).asText());
} else if (s.getStreamDescriptor().equals(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA))) {
assertFalse(streamState.has(STATE_TYPE_KEY));
} else {
throw new RuntimeException("Unknown stream");
}
});
}
final AirbyteStateMessage secondLastSateMessage = stateMessages.get(5);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, secondLastSateMessage.getType());
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
secondLastSateMessage.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSnapshotState = secondLastSateMessage.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSnapshotState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
secondLastSateMessage.getGlobal().getStreamStates().forEach(s -> {
final JsonNode streamState = s.getStreamState();
assertFalse(streamState.has(STATE_TYPE_KEY));
});
final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateMessages.get(6);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType());
assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates()
.stream()
.map(AirbyteStreamState::getStreamDescriptor)
.collect(Collectors.toSet());
assertEquals(2, streamsInSnapshotState.size());
assertTrue(
streamsInSyncCompletionState.contains(
new StreamDescriptor().withName(MODELS_STREAM_NAME + "_random").withNamespace(randomTableSchema())));
assertTrue(streamsInSyncCompletionState.contains(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA)));
assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData());
}
@Test
public void testCompositeIndexInitialLoad() throws Exception {
// Simulate adding a composite index by modifying the catalog.
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG);
final List<List<String>> primaryKeys = configuredCatalog.getStreams().get(0).getStream().getSourceDefinedPrimaryKey();
primaryKeys.add(List.of("make_id"));
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), configuredCatalog, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages1);
assertExpectedStateMessages(stateMessages1);
// Re-run the sync with state associated with record w/ id = 15 (second to last record).
// We expect to read 2 records, since in the case of a composite PK we issue a >= query.
// We also expect 3 state records. One associated with the pk state, one to signify end of initial
// load, and
// the last one indicating the cdc position we have synced until.
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(4)));
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), configuredCatalog, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertExpectedRecords(new HashSet<>(MODEL_RECORDS.subList(4, 6)), recordMessages2);
assertEquals(3, stateMessages2.size());
assertStateTypes(stateMessages2, 0);
}
@Test
public void testTwoStreamSync() throws Exception {
// Add another stream models_2 and read that one as well.
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG);
final List<JsonNode> MODEL_RECORDS_2 = ImmutableList.of(
Jsons.jsonNode(ImmutableMap.of(COL_ID, 110, COL_MAKE_ID, 1, COL_MODEL, "Fiesta-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_MAKE_ID, 1, COL_MODEL, "Focus-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 130, COL_MAKE_ID, 1, COL_MODEL, "Ranger-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 140, COL_MAKE_ID, 2, COL_MODEL, "GLA-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 150, COL_MAKE_ID, 2, COL_MODEL, "A 220-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2")));
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2",
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID)));
for (final JsonNode recordJson : MODEL_RECORDS_2) {
writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID,
COL_MAKE_ID, COL_MODEL);
}
final ConfiguredAirbyteStream airbyteStream = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))));
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
final List<ConfiguredAirbyteStream> streams = configuredCatalog.getStreams();
streams.add(airbyteStream);
configuredCatalog.withStreams(streams);
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), configuredCatalog, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertEquals(13, stateMessages1.size());
JsonNode sharedState = null;
StreamDescriptor firstStreamInState = null;
for (int i = 0; i < stateMessages1.size(); i++) {
final AirbyteStateMessage stateMessage = stateMessages1.get(i);
assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType());
final AirbyteGlobalState global = stateMessage.getGlobal();
assertNotNull(global.getSharedState());
if (Objects.isNull(sharedState)) {
sharedState = global.getSharedState();
} else {
assertEquals(sharedState, global.getSharedState());
}
if (Objects.isNull(firstStreamInState)) {
assertEquals(1, global.getStreamStates().size());
firstStreamInState = global.getStreamStates().get(0).getStreamDescriptor();
}
if (i <= 4) {
// First 4 state messages are pk state
assertEquals(1, global.getStreamStates().size());
final AirbyteStreamState streamState = global.getStreamStates().get(0);
assertTrue(streamState.getStreamState().has(STATE_TYPE_KEY));
assertEquals(PRIMARY_KEY_STATE_TYPE, streamState.getStreamState().get(STATE_TYPE_KEY).asText());
} else if (i == 5) {
// 5th state message is the final state message emitted for the stream
assertEquals(1, global.getStreamStates().size());
final AirbyteStreamState streamState = global.getStreamStates().get(0);
assertFalse(streamState.getStreamState().has(STATE_TYPE_KEY));
} else if (i <= 10) {
// 6th to 10th is the primary_key state message for the 2nd stream but final state message for 1st
// stream
assertEquals(2, global.getStreamStates().size());
final StreamDescriptor finalFirstStreamInState = firstStreamInState;
global.getStreamStates().forEach(c -> {
if (c.getStreamDescriptor().equals(finalFirstStreamInState)) {
assertFalse(c.getStreamState().has(STATE_TYPE_KEY));
} else {
assertTrue(c.getStreamState().has(STATE_TYPE_KEY));
assertEquals(PRIMARY_KEY_STATE_TYPE, c.getStreamState().get(STATE_TYPE_KEY).asText());
}
});
} else {
// last 2 state messages don't contain primary_key info cause primary_key sync should be complete
assertEquals(2, global.getStreamStates().size());
global.getStreamStates().forEach(c -> assertFalse(c.getStreamState().has(STATE_TYPE_KEY)));
}
}
final Set<String> names = new HashSet<>(STREAM_NAMES);
names.add(MODELS_STREAM_NAME + "_2");
assertExpectedRecords(Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
.collect(Collectors.toSet()),
recordMessages1,
names,
names,
MODELS_SCHEMA);
assertEquals(new StreamDescriptor().withName(MODELS_STREAM_NAME).withNamespace(MODELS_SCHEMA), firstStreamInState);
// Triggering a sync with a primary_key state for 1 stream and complete state for other stream
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), configuredCatalog, Jsons.jsonNode(Collections.singletonList(stateMessages1.get(6))));
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertEquals(6, stateMessages2.size());
for (int i = 0; i < stateMessages2.size(); i++) {
final AirbyteStateMessage stateMessage = stateMessages2.get(i);
assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType());
final AirbyteGlobalState global = stateMessage.getGlobal();
assertNotNull(global.getSharedState());
assertEquals(2, global.getStreamStates().size());
if (i <= 3) {
final StreamDescriptor finalFirstStreamInState = firstStreamInState;
global.getStreamStates().forEach(c -> {
// First 4 state messages are primary_key state for the stream that didn't complete primary_key sync
// the first time
if (c.getStreamDescriptor().equals(finalFirstStreamInState)) {
assertFalse(c.getStreamState().has(STATE_TYPE_KEY));
} else {
assertTrue(c.getStreamState().has(STATE_TYPE_KEY));
assertEquals(PRIMARY_KEY_STATE_TYPE, c.getStreamState().get(STATE_TYPE_KEY).asText());
}
});
} else {
// last 2 state messages don't contain primary_key info cause primary_key sync should be complete
global.getStreamStates().forEach(c -> assertFalse(c.getStreamState().has(STATE_TYPE_KEY)));
}
}
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
assertEquals(5, recordMessages2.size());
assertExpectedRecords(new HashSet<>(MODEL_RECORDS_2.subList(1, MODEL_RECORDS_2.size())),
recordMessages2,
names,
names,
MODELS_SCHEMA);
}
}