1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[Source-MySQL] Enhanced Standard sync with PK initial load -> Cursor based switch over (#30270)

This commit is contained in:
Duy Nguyen
2023-09-18 15:17:00 -07:00
committed by GitHub
parent fd742448b9
commit f226503bd1
21 changed files with 1801 additions and 203 deletions

View File

@@ -9,8 +9,13 @@ import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.mysql.internal.models.CursorBasedStatus;
import io.airbyte.integrations.source.mysql.internal.models.InternalModels.StateType;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.sql.SQLException;
@@ -19,7 +24,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +54,11 @@ public class MySqlQueryUtils {
public static final String SHOW_TABLE_QUERY =
"""
SHOW TABLE STATUS;
SHOW TABLE STATUS;
""";
public static final String MAX_CURSOR_VALUE_QUERY =
"""
SELECT %s FROM %s WHERE %s = (SELECT MAX(%s) FROM %s);
""";
public static final String MAX_PK_COL = "max_pk";
@@ -135,19 +146,91 @@ public class MySqlQueryUtils {
return tableSizeInfoMap;
}
private static List<JsonNode> getTableEstimate(final JdbcDatabase database, final String namespace, final String name) {
try {
// Construct the table estimate query.
final String tableEstimateQuery =
String.format(TABLE_ESTIMATE_QUERY, TABLE_SIZE_BYTES_COL, AVG_ROW_LENGTH, namespace, name);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
return jsonNodes.size() > 0 ? jsonNodes : Collections.emptyList();
} catch (final Exception e) {
LOGGER.warn("Error occurred while attempting to estimate table size", e);
/**
* Iterates through each stream and find the max cursor value and the record count which has that
* value based on each cursor field provided by the customer per stream This information is saved in
* a Hashmap with the mapping being the AirbyteStreamNameNamespacepair -> CursorBasedStatus
*
* @param database the source db
* @param streams streams to be synced
* @param stateManager stream stateManager
* @return Map of streams to statuses
*/
public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, CursorBasedStatus> getCursorBasedSyncStatusForStreams(final JdbcDatabase database,
final List<ConfiguredAirbyteStream> streams,
final StateManager stateManager,
final String quoteString) {
final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, CursorBasedStatus> cursorBasedStatusMap = new HashMap<>();
streams.forEach(stream -> {
try {
final String name = stream.getStream().getName();
final String namespace = stream.getStream().getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(namespace, name, quoteString);
final Optional<CursorInfo> cursorInfoOptional =
stateManager.getCursorInfo(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace));
if (cursorInfoOptional.isEmpty()) {
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName()));
}
LOGGER.info("Querying max cursor value for {}.{}", namespace, name);
final String cursorField = cursorInfoOptional.get().getCursorField();
final String quotedCursorField = getIdentifierWithQuoting(cursorField, quoteString);
final String cursorBasedSyncStatusQuery = String.format(MAX_CURSOR_VALUE_QUERY,
quotedCursorField,
fullTableName,
quotedCursorField,
quotedCursorField,
fullTableName);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus();
cursorBasedStatus.setStateType(StateType.CURSOR_BASED);
cursorBasedStatus.setVersion(2L);
cursorBasedStatus.setStreamName(name);
cursorBasedStatus.setStreamNamespace(namespace);
cursorBasedStatus.setCursorField(ImmutableList.of(cursorField));
if (!jsonNodes.isEmpty()) {
final JsonNode result = jsonNodes.get(0);
cursorBasedStatus.setCursor(result.get(cursorField).asText());
cursorBasedStatus.setCursorRecordCount((long) jsonNodes.size());
}
cursorBasedStatusMap.put(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace), cursorBasedStatus);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
});
return cursorBasedStatusMap;
}
private static List<JsonNode> getTableEstimate(final JdbcDatabase database, final String namespace, final String name)
throws SQLException {
// Construct the table estimate query.
final String tableEstimateQuery =
String.format(TABLE_ESTIMATE_QUERY, TABLE_SIZE_BYTES_COL, AVG_ROW_LENGTH, namespace, name);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
return jsonNodes;
}
public static void logStreamSyncStatus(final List<ConfiguredAirbyteStream> streams, final String syncType) {
if (streams.isEmpty()) {
LOGGER.info("No Streams will be synced via {}.", syncType);
} else {
LOGGER.info("Streams to be synced via {} : {}", syncType, streams.size());
LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(streams));
}
return Collections.emptyList();
}
public static String prettyPrintConfiguredAirbyteStreamList(final List<ConfiguredAirbyteStream> streamList) {
return streamList.stream().map(s -> "%s.%s".formatted(s.getStream().getNamespace(), s.getStream().getName())).collect(Collectors.joining(", "));
}
}

View File

@@ -5,12 +5,18 @@
package io.airbyte.integrations.source.mysql;
import static io.airbyte.db.jdbc.JdbcUtils.EQUALS;
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.isAnyStreamIncrementalSyncMode;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER;
import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters;
import static io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SSL_MODE;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getCursorBasedSyncStatusForStreams;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.logStreamSyncStatus;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.convertNameNamespacePairFromV0;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.initPairToPrimaryKeyInfoMap;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.streamsForInitialPrimaryKeyLoad;
import static java.util.stream.Collectors.toList;
import com.fasterxml.jackson.databind.JsonNode;
@@ -40,8 +46,15 @@ import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.mysql.cursor_based.MySqlCursorBasedStateManager;
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
import io.airbyte.integrations.source.mysql.initialsync.MySqlFeatureFlags;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadHandler;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStreamStateManager;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.CursorBasedStreams;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.InitialLoadStreams;
import io.airbyte.integrations.source.mysql.internal.models.CursorBasedStatus;
import io.airbyte.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils;
@@ -53,6 +66,7 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
@@ -333,15 +347,55 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
final Map<String, TableInfo<CommonField<MysqlType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
final JsonNode sourceConfig = database.getSourceConfig();
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
final MySqlFeatureFlags mySqlFeatureFlags = new MySqlFeatureFlags(sourceConfig);
if (isCdc(sourceConfig) && isAnyStreamIncrementalSyncMode(catalog)) {
LOGGER.info("Using PK + CDC");
return MySqlInitialReadUtil.getCdcReadIterators(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString());
} else {
LOGGER.info("using CDC: {}", false);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager,
emittedAt);
if (isAnyStreamIncrementalSyncMode(catalog)) {
if (mySqlFeatureFlags.isStandardInitialSyncViaPkEnabled()) {
LOGGER.info("Syncing via Primary Key");
final MySqlCursorBasedStateManager cursorBasedStateManager = new MySqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog);
final Map<AirbyteStreamNameNamespacePair, CursorBasedStatus> pairToCursorBasedStatus =
getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, quoteString);
final CursorBasedStreams cursorBasedStreams =
new CursorBasedStreams(MySqlInitialReadUtil.identifyStreamsForCursorBased(catalog, initialLoadStreams.streamsForInitialLoad()),
pairToCursorBasedStatus);
logStreamSyncStatus(initialLoadStreams.streamsForInitialLoad(), "Primary Key");
logStreamSyncStatus(cursorBasedStreams.streamsForCursorBased(), "Cursor");
final MySqlInitialLoadStreamStateManager mySqlInitialLoadStreamStateManager =
new MySqlInitialLoadStreamStateManager(catalog, initialLoadStreams,
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, quoteString));
final MySqlInitialLoadHandler initialLoadHandler =
new MySqlInitialLoadHandler(sourceConfig, database, new MySqlSourceOperations(), getQuoteString(), mySqlInitialLoadStreamStateManager,
namespacePair -> Jsons.jsonNode(pairToCursorBasedStatus.get(convertNameNamespacePairFromV0(namespacePair))),
getTableSizeInfoForStreams(database, catalog.getStreams(), getQuoteString()));
final List<AutoCloseableIterator<AirbyteMessage>> initialLoadIterator = new ArrayList<>(initialLoadHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()),
tableNameToTable,
emittedAt));
// Build Cursor based iterator
final List<AutoCloseableIterator<AirbyteMessage>> cursorBasedIterator =
new ArrayList<>(super.getIncrementalIterators(database,
new ConfiguredAirbyteCatalog().withStreams(
cursorBasedStreams.streamsForCursorBased()),
tableNameToTable,
cursorBasedStateManager, emittedAt));
return Stream.of(initialLoadIterator, cursorBasedIterator).flatMap(Collection::stream).collect(Collectors.toList());
}
}
}
LOGGER.info("using CDC: {}", false);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager,
emittedAt);
}
@Override

View File

@@ -0,0 +1,81 @@
package io.airbyte.integrations.source.mysql.cursor_based;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mysql.internal.models.CursorBasedStatus;
import io.airbyte.integrations.source.mysql.internal.models.InternalModels.StateType;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.state.StreamStateManager;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySqlCursorBasedStateManager extends StreamStateManager {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlCursorBasedStateManager.class);
public MySqlCursorBasedStateManager(final List<AirbyteStateMessage> airbyteStateMessages, final ConfiguredAirbyteCatalog catalog) {
super(airbyteStateMessages, catalog);
}
@Override
public AirbyteStateMessage toState(final Optional<AirbyteStreamNameNamespacePair> pair) {
if (pair.isPresent()) {
final Map<AirbyteStreamNameNamespacePair, CursorInfo> pairToCursorInfoMap = getPairToCursorInfoMap();
final Optional<CursorInfo> cursorInfo = Optional.ofNullable(pairToCursorInfoMap.get(pair.get()));
if (cursorInfo.isPresent()) {
LOGGER.debug("Generating state message for {}...", pair);
return new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
// Temporarily include legacy state for backwards compatibility with the platform
.withStream(generateStreamState(pair.get(), cursorInfo.get()));
} else {
LOGGER.warn("Cursor information could not be located in state for stream {}. Returning a new, empty state message...", pair);
return new AirbyteStateMessage().withType(AirbyteStateType.STREAM).withStream(new AirbyteStreamState());
}
} else {
LOGGER.warn("Stream not provided. Returning a new, empty state message...");
return new AirbyteStateMessage().withType(AirbyteStateType.STREAM).withStream(new AirbyteStreamState());
}
}
/**
* Generates the stream state for the given stream and cursor information.
*
* @param airbyteStreamNameNamespacePair The stream.
* @param cursorInfo The current cursor.
* @return The {@link AirbyteStreamState} representing the current state of the stream.
*/
private AirbyteStreamState generateStreamState(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair,
final CursorInfo cursorInfo) {
return new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(airbyteStreamNameNamespacePair.getName()).withNamespace(airbyteStreamNameNamespacePair.getNamespace()))
.withStreamState(Jsons.jsonNode(generateDbStreamState(airbyteStreamNameNamespacePair, cursorInfo)));
}
private CursorBasedStatus generateDbStreamState(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair,
final CursorInfo cursorInfo) {
final CursorBasedStatus state = new CursorBasedStatus();
state.setStateType(StateType.CURSOR_BASED);
state.setVersion(2L);
state.setStreamName(airbyteStreamNameNamespacePair.getName());
state.setStreamNamespace(airbyteStreamNameNamespacePair.getNamespace());
state.setCursorField(cursorInfo.getCursorField() == null ? Collections.emptyList() : Lists.newArrayList(cursorInfo.getCursorField()));
state.setCursor(cursorInfo.getCursor());
if (cursorInfo.getCursorRecordCount() > 0L) {
state.setCursorRecordCount(cursorInfo.getCursorRecordCount());
}
return state;
}
}

View File

@@ -9,12 +9,17 @@ import com.fasterxml.jackson.databind.JsonNode;
// Feature flags to gate new primary key load features.
public class MySqlFeatureFlags {
public static final String STANDARD_VIA_PK = "standard_via_pk";
private final JsonNode sourceConfig;
public MySqlFeatureFlags(final JsonNode sourceConfig) {
this.sourceConfig = sourceConfig;
}
public boolean isStandardInitialSyncViaPkEnabled() {
return getFlagValue(STANDARD_VIA_PK);
}
private boolean getFlagValue(final String flag) {
return sourceConfig.has(flag) && sourceConfig.get(flag).asBoolean();
}

View File

@@ -21,7 +21,6 @@ import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -40,12 +39,12 @@ public class MySqlInitialLoadGlobalStateManager implements MySqlInitialLoadState
// have completed the snapshot.
private final Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot;
MySqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams,
final Map<AirbyteStreamNameNamespacePair, PrimaryKeyInfo> pairToPrimaryKeyInfo,
final CdcState cdcState,
final ConfiguredAirbyteCatalog catalog) {
public MySqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams,
final Map<AirbyteStreamNameNamespacePair, PrimaryKeyInfo> pairToPrimaryKeyInfo,
final CdcState cdcState,
final ConfiguredAirbyteCatalog catalog) {
this.cdcState = cdcState;
this.pairToPrimaryKeyLoadStatus = initPairToPrimaryKeyLoadStatusMap(initialLoadStreams.pairToInitialLoadStatus());
this.pairToPrimaryKeyLoadStatus = MySqlInitialLoadStateManager.initPairToPrimaryKeyLoadStatusMap(initialLoadStreams.pairToInitialLoadStatus());
this.pairToPrimaryKeyInfo = pairToPrimaryKeyInfo;
this.streamsThatHaveCompletedSnapshot = initStreamsCompletedSnapshot(initialLoadStreams, catalog);
}
@@ -63,16 +62,7 @@ public class MySqlInitialLoadGlobalStateManager implements MySqlInitialLoadState
return streamsThatHaveCompletedSnapshot;
}
private static Map<AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> initPairToPrimaryKeyLoadStatusMap(
final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> pairToPkStatus) {
final Map<AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> map = new HashMap<>();
pairToPkStatus.forEach((pair, pkStatus) -> {
final AirbyteStreamNameNamespacePair updatedPair = new AirbyteStreamNameNamespacePair(pair.getName(), pair.getNamespace());
map.put(updatedPair, pkStatus);
});
return map;
}
@Override
public AirbyteStateMessage createIntermediateStateMessage(final AirbyteStreamNameNamespacePair pair, final PrimaryKeyLoadStatus pkLoadStatus) {
final List<AirbyteStreamState> streamStates = new ArrayList<>();
streamsThatHaveCompletedSnapshot.forEach(stream -> {
@@ -95,6 +85,7 @@ public class MySqlInitialLoadGlobalStateManager implements MySqlInitialLoadState
pairToPrimaryKeyLoadStatus.put(pair, pkLoadStatus);
}
@Override
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
streamsThatHaveCompletedSnapshot.add(pair);
final List<AirbyteStreamState> streamStates = new ArrayList<>();

View File

@@ -13,8 +13,10 @@ import com.mysql.cj.MysqlType;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.mysql.MySqlQueryUtils.TableSizeInfo;
import io.airbyte.integrations.source.mysql.MySqlSourceOperations;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.integrations.source.relationaldb.TableInfo;
@@ -36,6 +38,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +49,7 @@ public class MySqlInitialLoadHandler {
private static final long RECORD_LOGGING_SAMPLE_RATE = 1_000_000;
private final JsonNode config;
private final JdbcDatabase database;
private final MySqlInitialLoadSourceOperations sourceOperations;
private final JdbcCompatibleSourceOperations sourceOperations;
private final String quoteString;
private final MySqlInitialLoadStateManager initialLoadStateManager;
private final Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;
@@ -57,7 +60,7 @@ public class MySqlInitialLoadHandler {
public MySqlInitialLoadHandler(final JsonNode config,
final JdbcDatabase database,
final MySqlInitialLoadSourceOperations sourceOperations,
final MySqlSourceOperations sourceOperations,
final String quoteString,
final MySqlInitialLoadStateManager initialLoadStateManager,
final Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier,
@@ -80,6 +83,7 @@ public class MySqlInitialLoadHandler {
final AirbyteStream stream = airbyteStream.getStream();
final String streamName = stream.getName();
final String namespace = stream.getNamespace();
final List<String> primaryKeys = stream.getSourceDefinedPrimaryKey().stream().flatMap(pk -> Stream.of(pk.get(0))).toList();
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, namespace);
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(namespace, streamName);
if (!tableNameToTable.containsKey(fullyQualifiedTableName)) {
@@ -95,6 +99,15 @@ public class MySqlInitialLoadHandler {
.map(CommonField::getName)
.filter(CatalogHelpers.getTopLevelFieldNames(airbyteStream)::contains)
.collect(Collectors.toList());
// This is to handle the case if the user de-selects the PK column
// Necessary to query the data via pk but won't be added to the final record
primaryKeys.forEach(pk -> {
if (!selectedDatabaseFields.contains(pk)) {
selectedDatabaseFields.add(0, pk);
}
});
final AutoCloseableIterator<JsonNode> queryStream =
new MySqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair,
calculateChunkSize(tableSizeInfoMap.get(pair), pair), isCompositePrimaryKey(airbyteStream));

View File

@@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.mysql.cj.MysqlType;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.PrimaryKeyInfo;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
@@ -43,7 +44,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadRecordIterator.class);
private final MySqlInitialLoadSourceOperations sourceOperations;
private final JdbcCompatibleSourceOperations sourceOperations;
private final String quoteString;
private final MySqlInitialLoadStateManager initialLoadStateManager;
@@ -59,7 +60,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
MySqlInitialLoadRecordIterator(
final JdbcDatabase database,
final MySqlInitialLoadSourceOperations sourceOperations,
final JdbcCompatibleSourceOperations<MysqlType> sourceOperations,
final String quoteString,
final MySqlInitialLoadStateManager initialLoadStateManager,
final List<String> columnNames,

View File

@@ -9,6 +9,8 @@ import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.Pri
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import java.util.HashMap;
import java.util.Map;
public interface MySqlInitialLoadStateManager {
@@ -33,4 +35,14 @@ public interface MySqlInitialLoadStateManager {
// the column name associated with the stream.
PrimaryKeyInfo getPrimaryKeyInfo(final AirbyteStreamNameNamespacePair pair);
static Map<AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> initPairToPrimaryKeyLoadStatusMap(
final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> pairToPkStatus) {
final Map<AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> map = new HashMap<>();
pairToPkStatus.forEach((pair, pkStatus) -> {
final AirbyteStreamNameNamespacePair updatedPair = new AirbyteStreamNameNamespacePair(pair.getName(), pair.getNamespace());
map.put(updatedPair, pkStatus);
});
return map;
}
}

View File

@@ -0,0 +1,87 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql.initialsync;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.InitialLoadStreams;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.PrimaryKeyInfo;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This state manager extends the StreamStateManager to enable writing the state_type and version
* keys to the stream state when they're going through the iterator Once we have verified that
* expanding StreamStateManager itself to include this functionality, this class will be removed
*/
public class MySqlInitialLoadStreamStateManager implements MySqlInitialLoadStateManager {
private final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> pairToPrimaryKeyLoadStatus;
// Map of pair to the primary key info (field name & data type) associated with it.
private final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyInfo> pairToPrimaryKeyInfo;
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadStreamStateManager.class);
public MySqlInitialLoadStreamStateManager(final ConfiguredAirbyteCatalog catalog,
final InitialLoadStreams initialLoadStreams,
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyInfo> pairToPrimaryKeyInfo) {
this.pairToPrimaryKeyInfo = pairToPrimaryKeyInfo;
this.pairToPrimaryKeyLoadStatus = MySqlInitialLoadStateManager.initPairToPrimaryKeyLoadStatusMap(initialLoadStreams.pairToInitialLoadStatus());
}
@Override
public void updatePrimaryKeyLoadState(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair,
final PrimaryKeyLoadStatus pkLoadStatus) {
pairToPrimaryKeyLoadStatus.put(pair, pkLoadStatus);
}
@Override
public AirbyteStateMessage createFinalStateMessage(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair,
final JsonNode streamStateForIncrementalRun) {
return new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(getAirbyteStreamState(pair, (streamStateForIncrementalRun)));
}
@Override
public PrimaryKeyInfo getPrimaryKeyInfo(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyInfo.get(pair);
}
@Override
public PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyLoadStatus.get(pair);
}
@Override
public AirbyteStateMessage createIntermediateStateMessage(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair,
final PrimaryKeyLoadStatus pkLoadStatus) {
return new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(getAirbyteStreamState(pair, Jsons.jsonNode(pkLoadStatus)));
}
private AirbyteStreamState getAirbyteStreamState(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair, final JsonNode stateData) {
LOGGER.info("STATE DATA FOR {}: {}", pair.getNamespace().concat("_").concat(pair.getName()), stateData);
assert Objects.nonNull(pair.getName());
assert Objects.nonNull(pair.getNamespace());
return new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(pair.getName()).withNamespace(pair.getNamespace()))
.withStreamState(stateData);
}
}

View File

@@ -6,6 +6,7 @@ package io.airbyte.integrations.source.mysql.initialsync;
import static io.airbyte.integrations.debezium.internals.mysql.MysqlCdcStateConstants.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.prettyPrintConfiguredAirbyteStreamList;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadGlobalStateManager.STATE_TYPE_KEY;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.PRIMARY_KEY_STATE_TYPE;
@@ -30,6 +31,7 @@ import io.airbyte.integrations.source.mysql.MySqlCdcSavedInfoFetcher;
import io.airbyte.integrations.source.mysql.MySqlCdcStateHandler;
import io.airbyte.integrations.source.mysql.MySqlQueryUtils;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadSourceOperations.CdcMetadataInjector;
import io.airbyte.integrations.source.mysql.internal.models.CursorBasedStatus;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.integrations.source.relationaldb.CdcStateManager;
import io.airbyte.integrations.source.relationaldb.DbSourceDiscoverUtil;
@@ -40,6 +42,7 @@ import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
@@ -105,10 +108,10 @@ public class MySqlInitialReadUtil {
savedOffset.isPresent() && mySqlDebeziumStateUtil.savedOffsetStillPresentOnServer(database, savedOffset.get());
if (!savedOffsetStillPresentOnServer) {
LOGGER.warn("Saved offset no longer present on the server, Airbtye is going to trigger a sync from scratch");
LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch");
}
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(stateManager.getCdcStateManager(), catalog,
final InitialLoadStreams initialLoadStreams = cdcStreamsForInitialPrimaryKeyLoad(stateManager.getCdcStateManager(), catalog,
savedOffsetStillPresentOnServer);
final CdcState stateToBeUsed = (!savedOffsetStillPresentOnServer || (stateManager.getCdcStateManager().getCdcState() == null
@@ -172,12 +175,12 @@ public class MySqlInitialReadUtil {
}
/**
* Determines the streams to sync for initial primary key load. These include streams that are (i)
* currently in primary key load (ii) newly added incremental streams.
* CDC specific: Determines the streams to sync for initial primary key load. These include streams
* that are (i) currently in primary key load (ii) newly added incremental streams.
*/
public static InitialLoadStreams streamsForInitialPrimaryKeyLoad(final CdcStateManager stateManager,
final ConfiguredAirbyteCatalog fullCatalog,
final boolean savedOffsetStillPresentOnServer) {
public static InitialLoadStreams cdcStreamsForInitialPrimaryKeyLoad(final CdcStateManager stateManager,
final ConfiguredAirbyteCatalog fullCatalog,
final boolean savedOffsetStillPresentOnServer) {
if (!savedOffsetStillPresentOnServer) {
return new InitialLoadStreams(
fullCatalog.getStreams()
@@ -224,23 +227,95 @@ public class MySqlInitialReadUtil {
return new InitialLoadStreams(streamsForPkSync, pairToInitialLoadStatus);
}
private static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog,
final Set<AirbyteStreamNameNamespacePair> alreadySyncedStreams) {
/**
* Determines the streams to sync for initial primary key load. These include streams that are (i)
* currently in primary key load (ii) newly added incremental streams.
*/
public static InitialLoadStreams streamsForInitialPrimaryKeyLoad(final StateManager stateManager,
final ConfiguredAirbyteCatalog fullCatalog) {
final List<AirbyteStateMessage> rawStateMessages = stateManager.getRawStateMessages();
final Set<AirbyteStreamNameNamespacePair> streamsStillInPkSync = new HashSet<>();
final Set<AirbyteStreamNameNamespacePair> alreadySeenStreamPairs = new HashSet<>();
// Build a map of stream <-> initial load status for streams that currently have an initial primary
// key load in progress.
final Map<AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> pairToInitialLoadStatus = new HashMap<>();
if (rawStateMessages != null) {
rawStateMessages.forEach(stateMessage -> {
final AirbyteStreamState stream = stateMessage.getStream();
final JsonNode streamState = stream.getStreamState();
final StreamDescriptor streamDescriptor = stateMessage.getStream().getStreamDescriptor();
if (streamState == null || streamDescriptor == null) {
return;
}
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamDescriptor.getName(),
streamDescriptor.getNamespace());
// Build a map of stream <-> initial load status for streams that currently have an initial primary
// key load in progress.
if (streamState.has(STATE_TYPE_KEY)) {
if (streamState.get(STATE_TYPE_KEY).asText().equalsIgnoreCase(PRIMARY_KEY_STATE_TYPE)) {
final PrimaryKeyLoadStatus primaryKeyLoadStatus = Jsons.object(streamState, PrimaryKeyLoadStatus.class);
pairToInitialLoadStatus.put(pair, primaryKeyLoadStatus);
streamsStillInPkSync.add(pair);
}
}
alreadySeenStreamPairs.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace()));
});
}
final List<ConfiguredAirbyteStream> streamsForPkSync = new ArrayList<>();
fullCatalog.getStreams().stream()
.filter(stream -> streamsStillInPkSync.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())))
.map(Jsons::clone)
.forEach(streamsForPkSync::add);
final List<ConfiguredAirbyteStream> newlyAddedStreams = identifyStreamsToSnapshot(fullCatalog,
Collections.unmodifiableSet(alreadySeenStreamPairs));
streamsForPkSync.addAll(newlyAddedStreams);
return new InitialLoadStreams(streamsForPkSync.stream().filter(MySqlInitialReadUtil::streamHasPrimaryKey).collect(Collectors.toList()),
pairToInitialLoadStatus);
}
private static boolean streamHasPrimaryKey(final ConfiguredAirbyteStream stream) {
return stream.getStream().getSourceDefinedPrimaryKey().size() > 0;
}
public static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog,
final Set<AirbyteStreamNameNamespacePair> alreadySyncedStreams) {
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
final Set<AirbyteStreamNameNamespacePair> newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams));
return catalog.getStreams().stream()
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
.filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))).map(Jsons::clone)
.filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())))
.map(Jsons::clone)
.collect(Collectors.toList());
}
public static List<ConfiguredAirbyteStream> identifyStreamsForCursorBased(final ConfiguredAirbyteCatalog catalog,
final List<ConfiguredAirbyteStream> streamsForInitialLoad) {
final Set<AirbyteStreamNameNamespacePair> initialLoadStreamsNamespacePairs =
streamsForInitialLoad.stream().map(stream -> AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))
.collect(
Collectors.toSet());
return catalog.getStreams().stream()
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
.filter(stream -> !initialLoadStreamsNamespacePairs.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())))
.map(Jsons::clone)
.collect(Collectors.toList());
}
// Build a map of stream <-> primary key info (primary key field name + datatype) for all streams
// currently undergoing initial primary key syncs.
private static Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyInfo> initPairToPrimaryKeyInfoMap(
final JdbcDatabase database,
final InitialLoadStreams initialLoadStreams,
final Map<String, TableInfo<CommonField<MysqlType>>> tableNameToTable,
final String quoteString) {
public static Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyInfo> initPairToPrimaryKeyInfoMap(
final JdbcDatabase database,
final InitialLoadStreams initialLoadStreams,
final Map<String, TableInfo<CommonField<MysqlType>>> tableNameToTable,
final String quoteString) {
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyInfo> pairToPkInfoMap = new HashMap<>();
// For every stream that is in primary initial key sync, we want to maintain information about the
// current primary key info associated with the
@@ -278,15 +353,20 @@ public class MySqlInitialReadUtil {
return new PrimaryKeyInfo(pkFieldName, pkFieldType, pkMaxValue);
}
public static String prettyPrintConfiguredAirbyteStreamList(final List<ConfiguredAirbyteStream> streamList) {
return streamList.stream().map(s -> "%s.%s".formatted(s.getStream().getNamespace(), s.getStream().getName())).collect(Collectors.joining(", "));
}
public record InitialLoadStreams(List<ConfiguredAirbyteStream> streamsForInitialLoad,
Map<AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> pairToInitialLoadStatus) {
}
public record CursorBasedStreams(List<ConfiguredAirbyteStream> streamsForCursorBased,
Map<AirbyteStreamNameNamespacePair, CursorBasedStatus> pairToCursorBasedStatus) {
}
public record PrimaryKeyInfo(String pkFieldName, MysqlType fieldType, String pkMaxValue) {}
public static AirbyteStreamNameNamespacePair convertNameNamespacePairFromV0(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair v1NameNamespacePair) {
return new AirbyteStreamNameNamespacePair(v1NameNamespacePair.getName(), v1NameNamespacePair.getNamespace());
}
}

View File

@@ -21,23 +21,17 @@ import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
@@ -240,64 +234,6 @@ class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}
@Override
protected void incrementalDateCheck() throws Exception {
incrementalCursorCheck(
COL_UPDATED_AT,
"2005-10-18",
"2006-10-19",
List.of(getTestMessages().get(1), getTestMessages().get(2)));
}
@Override
protected List<AirbyteMessage> getTestMessages() {
return List.of(
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_1,
COL_NAME, "picard",
COL_UPDATED_AT, "2004-10-19")))),
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_2,
COL_NAME, "crusher",
COL_UPDATED_AT,
"2005-10-19")))),
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_3,
COL_NAME, "vash",
COL_UPDATED_AT, "2006-10-19")))));
}
@Override
protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(final String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_4,
COL_NAME, "riker",
COL_UPDATED_AT, "2006-10-19")))));
expectedMessages.add(new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_5,
COL_NAME, "data",
COL_UPDATED_AT, "2006-10-19")))));
final DbStreamState state = new DbStreamState()
.withStreamName(streamName)
.withStreamNamespace(namespace)
.withCursorField(List.of(COL_ID))
.withCursor("5")
.withCursorRecordCount(1L);
expectedMessages.addAll(createExpectedTestMessages(List.of(state)));
return expectedMessages;
}
@Override
protected boolean supportsPerStream() {
return true;

View File

@@ -0,0 +1,613 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql;
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.STATE_TYPE_KEY;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mysql.cj.MysqlType;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.mysql.internal.models.CursorBasedStatus;
import io.airbyte.integrations.source.mysql.internal.models.InternalModels.StateType;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.MySQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
@ExtendWith(SystemStubsExtension.class)
class MySqlPkJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
@SystemStub
private EnvironmentVariables environmentVariables;
protected static final String USERNAME_WITHOUT_PERMISSION = "new_user";
protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password";
protected static final String TEST_USER = "test";
protected static final Callable<String> TEST_PASSWORD = () -> "test";
protected static MySQLContainer<?> container;
protected Database database;
protected DSLContext dslContext;
@BeforeAll
static void init() throws Exception {
container = new MySQLContainer<>("mysql:8.0")
.withUsername(TEST_USER)
.withPassword(TEST_PASSWORD.call())
.withEnv("MYSQL_ROOT_HOST", "%")
.withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD.call());
container.start();
final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD.call());
connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n");
}
@BeforeEach
public void setup() throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, Strings.addRandomSuffix("db", "_", 10))
.put(JdbcUtils.USERNAME_KEY, TEST_USER)
.put(JdbcUtils.PASSWORD_KEY, TEST_PASSWORD.call())
.put("standard_via_pk", true)
.build());
dslContext = DSLContextFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.get(JdbcUtils.PASSWORD_KEY).asText(),
DatabaseDriver.MYSQL.getDriverClassName(),
String.format("jdbc:mysql://%s:%s",
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asText()),
SQLDialect.MYSQL);
database = new Database(dslContext);
database.query(ctx -> {
ctx.fetch("CREATE DATABASE " + getDefaultNamespace());
return null;
});
super.setup();
}
@AfterEach
void tearDownMySql() throws Exception {
dslContext.close();
super.tearDown();
}
@AfterAll
static void cleanUp() {
container.close();
}
// MySql does not support schemas in the way most dbs do. Instead we namespace by db name.
@Override
public boolean supportsSchemas() {
return false;
}
@Override
public AbstractJdbcSource<MysqlType> getJdbcSource() {
return new MySqlSource();
}
@Override
public String getDriverClass() {
return MySqlSource.DRIVER_CLASS;
}
@Override
public JsonNode getConfig() {
return Jsons.clone(config);
}
@Test
void testReadMultipleTablesIncrementally() throws Exception {
((ObjectNode) config).put("sync_checkpoint_records", 1);
final String namespace = getDefaultNamespace();
final String streamOneName = TABLE_NAME + "one";
// Create a fresh first table
database.query(connection -> {
connection.fetch(String.format("USE %s;", getDefaultNamespace()));
connection.fetch(String.format("CREATE TABLE %s (\n"
+ " id int PRIMARY KEY,\n"
+ " name VARCHAR(200) NOT NULL,\n"
+ " updated_at VARCHAR(200) NOT NULL\n"
+ ");", streamOneName));
connection.execute(
String.format(
"INSERT INTO %s(id, name, updated_at) VALUES (1,'picard', '2004-10-19')",
getFullyQualifiedTableName(streamOneName)));
connection.execute(
String.format(
"INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')",
getFullyQualifiedTableName(streamOneName)));
connection.execute(
String.format(
"INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')",
getFullyQualifiedTableName(streamOneName)));
return null;
});
// Create a fresh second table
final String streamTwoName = TABLE_NAME + "two";
final String streamTwoFullyQualifiedName = getFullyQualifiedTableName(streamTwoName);
// Insert records into second table
database.query(ctx -> {
ctx.fetch(String.format("CREATE TABLE %s (\n"
+ " id int PRIMARY KEY,\n"
+ " name VARCHAR(200) NOT NULL,\n"
+ " updated_at DATE NOT NULL\n"
+ ");", streamTwoName));
ctx.execute(
String.format("INSERT INTO %s(id, name, updated_at)"
+ "VALUES (40,'Jean Luc','2006-10-19')",
streamTwoFullyQualifiedName));
ctx.execute(
String.format("INSERT INTO %s(id, name, updated_at)"
+ "VALUES (41, 'Groot', '2006-10-19')",
streamTwoFullyQualifiedName));
ctx.execute(
String.format("INSERT INTO %s(id, name, updated_at)"
+ "VALUES (42, 'Thanos','2006-10-19')",
streamTwoFullyQualifiedName));
return null;
});
// Create records list that we expect to see in the state message
final List<AirbyteMessage> streamTwoExpectedRecords = Arrays.asList(
createRecord(streamTwoName, namespace, ImmutableMap.of(
COL_ID, 40,
COL_NAME, "Jean Luc",
COL_UPDATED_AT, "2006-10-19")),
createRecord(streamTwoName, namespace, ImmutableMap.of(
COL_ID, 41,
COL_NAME, "Groot",
COL_UPDATED_AT, "2006-10-19")),
createRecord(streamTwoName, namespace, ImmutableMap.of(
COL_ID, 42,
COL_NAME, "Thanos",
COL_UPDATED_AT, "2006-10-19")));
// Prep and create a configured catalog to perform sync
final AirbyteStream streamOne = getAirbyteStream(streamOneName, namespace);
final AirbyteStream streamTwo = getAirbyteStream(streamTwoName, namespace);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
new AirbyteCatalog().withStreams(List.of(streamOne, streamTwo)));
configuredCatalog.getStreams().forEach(airbyteStream -> {
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
airbyteStream.setCursorField(List.of(COL_ID));
airbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND);
airbyteStream.withPrimaryKey(List.of(List.of(COL_ID)));
});
// Perform initial sync
final List<AirbyteMessage> messagesFromFirstSync = MoreIterators
.toList(source.read(config, configuredCatalog, null));
final List<AirbyteMessage> recordsFromFirstSync = filterRecords(messagesFromFirstSync);
setEmittedAtToNull(messagesFromFirstSync);
// All records in the 2 configured streams should be present
assertThat(filterRecords(recordsFromFirstSync)).containsExactlyElementsOf(
Stream.concat(getTestMessages(streamOneName).stream().parallel(),
streamTwoExpectedRecords.stream().parallel()).collect(toList()));
final List<AirbyteStateMessage> actualFirstSyncState = extractStateMessage(messagesFromFirstSync);
// Since we are emitting a state message after each record, we should have 1 state for each record -
// 3 from stream1 and 3 from stream2
assertEquals(6, actualFirstSyncState.size());
// The expected state type should be 2 primaryKey's and the last one being standard
final List<String> expectedStateTypesFromFirstSync = List.of("primary_key", "primary_key", "cursor_based");
final List<String> stateTypeOfStreamOneStatesFromFirstSync =
extractSpecificFieldFromCombinedMessages(messagesFromFirstSync, streamOneName, STATE_TYPE_KEY);
final List<String> stateTypeOfStreamTwoStatesFromFirstSync =
extractSpecificFieldFromCombinedMessages(messagesFromFirstSync, streamTwoName, STATE_TYPE_KEY);
// It should be the same for stream1 and stream2
assertEquals(stateTypeOfStreamOneStatesFromFirstSync, expectedStateTypesFromFirstSync);
assertEquals(stateTypeOfStreamTwoStatesFromFirstSync, expectedStateTypesFromFirstSync);
// Create the expected primaryKeys that we should see
final List<String> expectedPrimaryKeysFromFirstSync = List.of("1", "2");
final List<String> primaryKeyFromStreamOneStatesFromFirstSync =
extractSpecificFieldFromCombinedMessages(messagesFromFirstSync, streamOneName, "pk_val");
final List<String> primaryKeyFromStreamTwoStatesFromFirstSync =
extractSpecificFieldFromCombinedMessages(messagesFromFirstSync, streamOneName, "pk_val");
// Verifying each element and its index to match.
// Only checking the first 2 elements since we have verified that the last state_type is
// "cursor_based"
assertEquals(primaryKeyFromStreamOneStatesFromFirstSync.get(0), expectedPrimaryKeysFromFirstSync.get(0));
assertEquals(primaryKeyFromStreamOneStatesFromFirstSync.get(1), expectedPrimaryKeysFromFirstSync.get(1));
assertEquals(primaryKeyFromStreamTwoStatesFromFirstSync.get(0), expectedPrimaryKeysFromFirstSync.get(0));
assertEquals(primaryKeyFromStreamTwoStatesFromFirstSync.get(1), expectedPrimaryKeysFromFirstSync.get(1));
// Extract only state messages for each stream
final List<AirbyteStateMessage> streamOneStateMessagesFromFirstSync = extractStateMessage(messagesFromFirstSync, streamOneName);
final List<AirbyteStateMessage> streamTwoStateMessagesFromFirstSync = extractStateMessage(messagesFromFirstSync, streamTwoName);
// Extract the incremental states of each stream's first and second state message
final List<JsonNode> streamOneIncrementalStatesFromFirstSync =
List.of(streamOneStateMessagesFromFirstSync.get(0).getStream().getStreamState().get("incremental_state"),
streamOneStateMessagesFromFirstSync.get(1).getStream().getStreamState().get("incremental_state"));
final JsonNode streamOneFinalStreamStateFromFirstSync = streamOneStateMessagesFromFirstSync.get(2).getStream().getStreamState();
final List<JsonNode> streamTwoIncrementalStatesFromFirstSync =
List.of(streamTwoStateMessagesFromFirstSync.get(0).getStream().getStreamState().get("incremental_state"),
streamTwoStateMessagesFromFirstSync.get(1).getStream().getStreamState().get("incremental_state"));
final JsonNode streamTwoFinalStreamStateFromFirstSync = streamTwoStateMessagesFromFirstSync.get(2).getStream().getStreamState();
// The incremental_state of each stream's first and second incremental states is expected
// to be identical to the stream_state of the final state message for each stream
assertEquals(streamOneIncrementalStatesFromFirstSync.get(0), streamOneFinalStreamStateFromFirstSync);
assertEquals(streamOneIncrementalStatesFromFirstSync.get(1), streamOneFinalStreamStateFromFirstSync);
assertEquals(streamTwoIncrementalStatesFromFirstSync.get(0), streamTwoFinalStreamStateFromFirstSync);
assertEquals(streamTwoIncrementalStatesFromFirstSync.get(1), streamTwoFinalStreamStateFromFirstSync);
// Sync should work with a primaryKey state AND a cursor-based state from each stream
// Forcing a sync with
// - stream one state still being the first record read via Primary Key.
// - stream two state being the Primary Key state before the final emitted state before the cursor switch
final List<AirbyteMessage> messagesFromSecondSyncWithMixedStates = MoreIterators
.toList(source.read(config, configuredCatalog,
Jsons.jsonNode(List.of(streamOneStateMessagesFromFirstSync.get(0),
streamTwoStateMessagesFromFirstSync.get(1)))));
// Extract only state messages for each stream after second sync
final List<AirbyteStateMessage> streamOneStateMessagesFromSecondSync =
extractStateMessage(messagesFromSecondSyncWithMixedStates, streamOneName);
final List<String> stateTypeOfStreamOneStatesFromSecondSync =
extractSpecificFieldFromCombinedMessages(messagesFromSecondSyncWithMixedStates, streamOneName, STATE_TYPE_KEY);
final List<AirbyteStateMessage> streamTwoStateMessagesFromSecondSync =
extractStateMessage(messagesFromSecondSyncWithMixedStates, streamTwoName);
final List<String> stateTypeOfStreamTwoStatesFromSecondSync =
extractSpecificFieldFromCombinedMessages(messagesFromSecondSyncWithMixedStates, streamTwoName, STATE_TYPE_KEY);
// Stream One states after the second sync are expected to have 2 stream states
// - 1 with PrimaryKey state_type and 1 state that is of cursorBased state type
assertEquals(2, streamOneStateMessagesFromSecondSync.size());
assertEquals(List.of("primary_key", "cursor_based"), stateTypeOfStreamOneStatesFromSecondSync);
// Stream Two states after the second sync are expected to have 1 stream state
// - The state that is of cursorBased state type
assertEquals(1, streamTwoStateMessagesFromSecondSync.size());
assertEquals(List.of("cursor_based"), stateTypeOfStreamTwoStatesFromSecondSync);
// Add some data to each table and perform a third read.
// Expect to see all records be synced via cursorBased method and not primaryKey
database.query(ctx -> {
ctx.execute(
String.format("INSERT INTO %s(id, name, updated_at)"
+ "VALUES (4,'Hooper','2006-10-19')",
getFullyQualifiedTableName(streamOneName)));
ctx.execute(
String.format("INSERT INTO %s(id, name, updated_at)"
+ "VALUES (43, 'Iron Man', '2006-10-19')",
streamTwoFullyQualifiedName));
return null;
});
final List<AirbyteMessage> messagesFromThirdSync = MoreIterators
.toList(source.read(config, configuredCatalog,
Jsons.jsonNode(List.of(streamOneStateMessagesFromSecondSync.get(1),
streamTwoStateMessagesFromSecondSync.get(0)))));
// Extract only state messages, state type, and cursor for each stream after second sync
final List<AirbyteStateMessage> streamOneStateMessagesFromThirdSync =
extractStateMessage(messagesFromThirdSync, streamOneName);
final List<String> stateTypeOfStreamOneStatesFromThirdSync =
extractSpecificFieldFromCombinedMessages(messagesFromThirdSync, streamOneName, STATE_TYPE_KEY);
final List<String> cursorOfStreamOneStatesFromThirdSync =
extractSpecificFieldFromCombinedMessages(messagesFromThirdSync, streamOneName, "cursor");
final List<AirbyteStateMessage> streamTwoStateMessagesFromThirdSync =
extractStateMessage(messagesFromThirdSync, streamTwoName);
final List<String> stateTypeOfStreamTwoStatesFromThirdSync =
extractSpecificFieldFromCombinedMessages(messagesFromThirdSync, streamTwoName, STATE_TYPE_KEY);
final List<String> cursorOfStreamTwoStatesFromThirdSync =
extractSpecificFieldFromCombinedMessages(messagesFromThirdSync, streamTwoName, "cursor");
// Both streams should now be synced via standard cursor and have updated max cursor values
// cursor: 4 for stream one
// cursor: 43 for stream two
assertEquals(1, streamOneStateMessagesFromThirdSync.size());
assertEquals(List.of("cursor_based"), stateTypeOfStreamOneStatesFromThirdSync);
assertEquals(List.of("4"), cursorOfStreamOneStatesFromThirdSync);
assertEquals(1, streamTwoStateMessagesFromThirdSync.size());
assertEquals(List.of("cursor_based"), stateTypeOfStreamTwoStatesFromThirdSync);
assertEquals(List.of("43"), cursorOfStreamTwoStatesFromThirdSync);
}
@Test
void testSpec() throws Exception {
final ConnectorSpecification actual = source.spec();
final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);
assertEquals(expected, actual);
}
/**
* MySQL Error Codes:
* <p>
* https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-error-sqlstates.html
* </p>
*
* @throws Exception
*/
@Test
void testCheckIncorrectPasswordFailure() throws Exception {
((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake");
final AirbyteConnectionStatus status = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: 08001;"));
}
@Test
public void testCheckIncorrectUsernameFailure() throws Exception {
((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, "fake");
final AirbyteConnectionStatus status = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
// do not test for message since there seems to be flakiness where sometimes the test will get the
// message with
// State code: 08001 or State code: 28000
}
@Test
public void testCheckIncorrectHostFailure() throws Exception {
((ObjectNode) config).put(JdbcUtils.HOST_KEY, "localhost2");
final AirbyteConnectionStatus status = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: 08S01;"));
}
@Test
public void testCheckIncorrectPortFailure() throws Exception {
((ObjectNode) config).put(JdbcUtils.PORT_KEY, "0000");
final AirbyteConnectionStatus status = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: 08S01;"));
}
@Test
public void testCheckIncorrectDataBaseFailure() throws Exception {
((ObjectNode) config).put(JdbcUtils.DATABASE_KEY, "wrongdatabase");
final AirbyteConnectionStatus status = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: 42000; Error code: 1049;"));
}
@Test
public void testUserHasNoPermissionToDataBase() throws Exception {
final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD.call());
connection.createStatement()
.execute("create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n");
((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION);
((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD_WITHOUT_PERMISSION);
final AirbyteConnectionStatus status = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: 08001;"));
}
@Override
protected DbStreamState buildStreamState(final ConfiguredAirbyteStream configuredAirbyteStream,
final String cursorField,
final String cursorValue) {
return new CursorBasedStatus().withStateType(StateType.CURSOR_BASED).withVersion(2L)
.withStreamName(configuredAirbyteStream.getStream().getName())
.withStreamNamespace(configuredAirbyteStream.getStream().getNamespace())
.withCursorField(List.of(cursorField))
.withCursor(cursorValue)
.withCursorRecordCount(1L);
}
@Override
protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(final String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_4,
COL_NAME, "riker",
COL_UPDATED_AT, "2006-10-19")))));
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_5,
COL_NAME, "data",
COL_UPDATED_AT, "2006-10-19")))));
final DbStreamState state = new CursorBasedStatus()
.withStateType(StateType.CURSOR_BASED)
.withVersion(2L)
.withStreamName(streamName)
.withStreamNamespace(namespace)
.withCursorField(ImmutableList.of(COL_ID))
.withCursor("5")
.withCursorRecordCount(1L);
expectedMessages.addAll(createExpectedTestMessages(List.of(state)));
return expectedMessages;
}
@Override
protected boolean supportsPerStream() {
return true;
}
@Override
protected List<AirbyteMessage> getTestMessages() {
return getTestMessages(streamName);
}
protected List<AirbyteMessage> getTestMessages(final String streamName) {
return List.of(
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_1,
COL_NAME, "picard",
COL_UPDATED_AT, "2004-10-19")))),
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_2,
COL_NAME, "crusher",
COL_UPDATED_AT,
"2005-10-19")))),
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_3,
COL_NAME, "vash",
COL_UPDATED_AT, "2006-10-19")))));
}
private AirbyteStream getAirbyteStream(final String tableName, final String namespace) {
return CatalogHelpers.createAirbyteStream(
tableName,
namespace,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)));
}
@Override
protected AirbyteCatalog getCatalog(final String defaultNamespace) {
return new AirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}
// Override from parent class as we're no longer including the legacy Data field.
@Override
protected List<AirbyteMessage> createExpectedTestMessages(final List<DbStreamState> states) {
return supportsPerStream()
? states.stream()
.map(s -> new AirbyteMessage().withType(Type.STATE)
.withState(
new AirbyteStateMessage().withType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName()))
.withStreamState(Jsons.jsonNode(s)))))
.collect(
Collectors.toList())
: List.of(new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY)));
}
@Override
protected List<AirbyteStateMessage> createState(final List<DbStreamState> states) {
return supportsPerStream()
? states.stream()
.map(s -> new AirbyteStateMessage().withType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName()))
.withStreamState(Jsons.jsonNode(s))))
.collect(
Collectors.toList())
: List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY));
}
@Override
protected JsonNode getStateData(final AirbyteMessage airbyteMessage, final String streamName) {
final JsonNode streamState = airbyteMessage.getState().getStream().getStreamState();
if (streamState.get("stream_name").asText().equals(streamName)) {
return streamState;
}
throw new IllegalArgumentException("Stream not found in state message: " + streamName);
}
}