source-mysql : Initial load. Handle composite pk case w/multiple repeated values (#29634)
Co-authored-by: subodh <subodh1810@gmail.com>
This commit is contained in:
@@ -334,7 +334,7 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
|
||||
final JsonNode sourceConfig = database.getSourceConfig();
|
||||
final MySqlFeatureFlags featureFlags = new MySqlFeatureFlags(sourceConfig);
|
||||
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
|
||||
if (featureFlags.isCdcSyncEnabled()) {
|
||||
if (featureFlags.isCdcInitialSyncViaPkEnabled()) {
|
||||
LOGGER.info("Using PK + CDC");
|
||||
return MySqlInitialReadUtil.getCdcReadIterators(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString());
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ public class MySqlFeatureFlags {
|
||||
this.sourceConfig = sourceConfig;
|
||||
}
|
||||
|
||||
public boolean isCdcSyncEnabled() {
|
||||
public boolean isCdcInitialSyncViaPkEnabled() {
|
||||
return getFlagValue(CDC_VIA_PK);
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ public class MySqlInitialLoadHandler {
|
||||
.collect(Collectors.toList());
|
||||
final AutoCloseableIterator<JsonNode> queryStream =
|
||||
new MySqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair,
|
||||
calculateChunkSize(tableSizeInfoMap.get(pair), pair));
|
||||
calculateChunkSize(tableSizeInfoMap.get(pair), pair), isCompositePrimaryKey(airbyteStream));
|
||||
final AutoCloseableIterator<AirbyteMessage> recordIterator =
|
||||
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
|
||||
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, pair);
|
||||
@@ -101,6 +101,10 @@ public class MySqlInitialLoadHandler {
|
||||
return iteratorList;
|
||||
}
|
||||
|
||||
private static boolean isCompositePrimaryKey(final ConfiguredAirbyteStream stream) {
|
||||
return stream.getStream().getSourceDefinedPrimaryKey().size() > 1;
|
||||
}
|
||||
|
||||
// Calculates the number of rows to fetch per query.
|
||||
@VisibleForTesting
|
||||
public static long calculateChunkSize(final TableSizeInfo tableSizeInfo, final AirbyteStreamNameNamespacePair pair) {
|
||||
|
||||
@@ -47,6 +47,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
|
||||
// Represents the number of rows to get with each query.
|
||||
private final long chunkSize;
|
||||
private final PrimaryKeyInfo pkInfo;
|
||||
private final boolean isCompositeKeyLoad;
|
||||
private int numSubqueries = 0;
|
||||
private AutoCloseableIterator<JsonNode> currentIterator;
|
||||
|
||||
@@ -57,7 +58,8 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
|
||||
final MySqlInitialLoadStateManager initialLoadStateManager,
|
||||
final List<String> columnNames,
|
||||
final AirbyteStreamNameNamespacePair pair,
|
||||
final long chunkSize) {
|
||||
final long chunkSize,
|
||||
final boolean isCompositeKeyLoad) {
|
||||
this.database = database;
|
||||
this.sourceOperations = sourceOperations;
|
||||
this.quoteString = quoteString;
|
||||
@@ -66,6 +68,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
|
||||
this.pair = pair;
|
||||
this.chunkSize = chunkSize;
|
||||
this.pkInfo = initialLoadStateManager.getPrimaryKeyInfo(pair);
|
||||
this.isCompositeKeyLoad = isCompositeKeyLoad;
|
||||
}
|
||||
|
||||
@CheckForNull
|
||||
@@ -73,14 +76,20 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
|
||||
protected JsonNode computeNext() {
|
||||
if (shouldBuildNextSubquery()) {
|
||||
try {
|
||||
LOGGER.info("Subquery number : {}", numSubqueries);
|
||||
final Stream<JsonNode> stream = database.unsafeQuery(
|
||||
connection -> getPkPreparedStatement(connection), sourceOperations::rowToJson);
|
||||
|
||||
// We will only issue one query for a composite key load. If we have already processed all the data associated with this
|
||||
// query, we should indicate that we are done processing for the given stream.
|
||||
if (isCompositeKeyLoad && numSubqueries >= 1) {
|
||||
return endOfData();
|
||||
}
|
||||
// Previous stream (and connection) must be manually closed in this iterator.
|
||||
if (currentIterator != null) {
|
||||
currentIterator.close();
|
||||
}
|
||||
|
||||
LOGGER.info("Subquery number : {}", numSubqueries);
|
||||
final Stream<JsonNode> stream = database.unsafeQuery(
|
||||
this::getPkPreparedStatement, sourceOperations::rowToJson);
|
||||
|
||||
currentIterator = AutoCloseableIterators.fromStream(stream, pair);
|
||||
numSubqueries++;
|
||||
// If the current subquery has no records associated with it, the entire stream has been read.
|
||||
@@ -96,7 +105,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
|
||||
|
||||
private boolean shouldBuildNextSubquery() {
|
||||
// The next sub-query should be built if (i) it is the first subquery in the sequence. (ii) the previous subquery has finished.
|
||||
return currentIterator == null || !currentIterator.hasNext();
|
||||
return (currentIterator == null || !currentIterator.hasNext());
|
||||
}
|
||||
|
||||
private PreparedStatement getPkPreparedStatement(final Connection connection) {
|
||||
@@ -114,17 +123,31 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
|
||||
if (pkLoadStatus == null) {
|
||||
LOGGER.info("pkLoadStatus is null");
|
||||
final String quotedCursorField = enquoteIdentifier(pkInfo.pkFieldName(), quoteString);
|
||||
final String sql = String.format("SELECT %s FROM %s ORDER BY %s LIMIT %s", wrappedColumnNames, fullTableName,
|
||||
quotedCursorField, chunkSize);
|
||||
final String sql;
|
||||
// We cannot load in chunks for a composite key load, since each field might not have distinct values.
|
||||
if (isCompositeKeyLoad) {
|
||||
sql = String.format("SELECT %s FROM %s ORDER BY %s", wrappedColumnNames, fullTableName,
|
||||
quotedCursorField);
|
||||
} else {
|
||||
sql = String.format("SELECT %s FROM %s ORDER BY %s LIMIT %s", wrappedColumnNames, fullTableName,
|
||||
quotedCursorField, chunkSize);
|
||||
}
|
||||
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
|
||||
LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement);
|
||||
return preparedStatement;
|
||||
} else {
|
||||
LOGGER.info("pkLoadStatus value is : {}", pkLoadStatus.getPkVal());
|
||||
final String quotedCursorField = enquoteIdentifier(pkLoadStatus.getPkName(), quoteString);
|
||||
// Since a pk is unique, we can issue a > query instead of a >=, as there cannot be two records with the same pk.
|
||||
final String sql = String.format("SELECT %s FROM %s WHERE %s > ? ORDER BY %s LIMIT %s", wrappedColumnNames, fullTableName,
|
||||
quotedCursorField, quotedCursorField, chunkSize);
|
||||
final String sql;
|
||||
// We cannot load in chunks for a composite key load, since each field might not have distinct values. Furthermore, we have to issue a >=
|
||||
// query since we may not have processed all of the data associated with the last saved primary key value.
|
||||
if (isCompositeKeyLoad) {
|
||||
sql = String.format("SELECT %s FROM %s WHERE %s >= ? ORDER BY %s", wrappedColumnNames, fullTableName,
|
||||
quotedCursorField, quotedCursorField);
|
||||
} else {
|
||||
sql = String.format("SELECT %s FROM %s WHERE %s > ? ORDER BY %s LIMIT %s", wrappedColumnNames, fullTableName,
|
||||
quotedCursorField, quotedCursorField, chunkSize);
|
||||
}
|
||||
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
|
||||
final MysqlType cursorFieldType = pkInfo.fieldType();
|
||||
sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, pkLoadStatus.getPkVal());
|
||||
|
||||
@@ -238,6 +238,9 @@ public class MySqlInitialReadUtil {
|
||||
private static PrimaryKeyInfo getPrimaryKeyInfo(final ConfiguredAirbyteStream stream, final Map<String, TableInfo<CommonField<MysqlType>>> tableNameToTable) {
|
||||
// For cursor-based syncs, we cannot always assume a primary key field exists. We need to handle the case where it does not exist when we support
|
||||
// cursor-based syncs.
|
||||
if (stream.getStream().getSourceDefinedPrimaryKey().size() > 1) {
|
||||
LOGGER.info("Composite primary key detected for {namespace, stream} : {}, {}", stream.getStream().getNamespace(), stream.getStream().getName());
|
||||
}
|
||||
final String pkFieldName = stream.getStream().getSourceDefinedPrimaryKey().get(0).get(0);
|
||||
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getStream().getNamespace(), (stream.getStream().getName()));
|
||||
final TableInfo<CommonField<MysqlType>> table = tableNameToTable
|
||||
|
||||
@@ -172,6 +172,40 @@ public class InitialPkLoadEnabledCdcMysqlSourceTest extends CdcMysqlSourceTest {
|
||||
assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompositeIndexInitialLoad() throws Exception {
|
||||
// Simulate adding a composite index by modifying the catalog.
|
||||
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG);
|
||||
final List<List<String>> primaryKeys = configuredCatalog.getStreams().get(0).getStream().getSourceDefinedPrimaryKey();
|
||||
primaryKeys.add(List.of("make_id"));
|
||||
|
||||
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
|
||||
.read(getConfig(), configuredCatalog, null);
|
||||
|
||||
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
|
||||
|
||||
final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
|
||||
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
|
||||
assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages1);
|
||||
assertExpectedStateMessages(stateMessages1);
|
||||
|
||||
// Re-run the sync with state associated with record w/ id = 15 (second to last record).
|
||||
// We expect to read 2 records, since in the case of a composite PK we issue a >= query.
|
||||
// We also expect 3 state records. One associated with the pk state, one to signify end of initial load, and
|
||||
// the last one indicating the cdc position we have synced until.
|
||||
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(4)));
|
||||
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
|
||||
.read(getConfig(), configuredCatalog, state);
|
||||
|
||||
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
|
||||
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
|
||||
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
|
||||
|
||||
assertExpectedRecords(new HashSet<>(MODEL_RECORDS.subList(4, 6)), recordMessages2);
|
||||
assertEquals(3, stateMessages2.size());
|
||||
assertStateTypes(stateMessages2, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoStreamSync() throws Exception {
|
||||
// Add another stream models_2 and read that one as well.
|
||||
|
||||
Reference in New Issue
Block a user