1
0
mirror of synced 2025-12-23 21:03:15 -05:00

source-mysql : chunking queries impl (#29109)

This commit is contained in:
Akash Kulkarni
2023-08-18 12:26:40 -07:00
committed by GitHub
parent 528127a3e4
commit 2b18864f3b
9 changed files with 306 additions and 92 deletions

View File

@@ -0,0 +1,73 @@
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySqlQueryUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlQueryUtils.class);
public record TableSizeInfo(Long tableSize, Long avgRowLength) { }
public static final String TABLE_ESTIMATE_QUERY =
"""
SELECT
(data_length + index_length) as %s,
AVG_ROW_LENGTH as %s
FROM
information_schema.tables
WHERE
table_schema = '%s' AND table_name = '%s';
""";
public static final String TABLE_SIZE_BYTES_COL = "TotalSizeBytes";
public static final String AVG_ROW_LENGTH = "AVG_ROW_LENGTH";
public static Map<AirbyteStreamNameNamespacePair, TableSizeInfo> getTableSizeInfoForStreams(final JdbcDatabase database,
final List<ConfiguredAirbyteStream> streams,
final String quoteString) {
final Map<AirbyteStreamNameNamespacePair, TableSizeInfo> tableSizeInfoMap = new HashMap<>();
streams.forEach(stream -> {
try {
final String name = stream.getStream().getName();
final String namespace = stream.getStream().getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(name, namespace, quoteString);
final List<JsonNode> tableEstimateResult = getTableEstimate(database, namespace, name);
Preconditions.checkState(tableEstimateResult.size() == 1);
final long tableEstimateBytes = tableEstimateResult.get(0).get(TABLE_SIZE_BYTES_COL).asLong();
final long avgTableRowSizeBytes = tableEstimateResult.get(0).get(AVG_ROW_LENGTH).asLong();
LOGGER.info("Stream {} size estimate is {}, average row size estimate is {}", fullTableName, tableEstimateBytes, avgTableRowSizeBytes);
final TableSizeInfo tableSizeInfo = new TableSizeInfo(tableEstimateBytes, avgTableRowSizeBytes);
final AirbyteStreamNameNamespacePair namespacePair =
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
tableSizeInfoMap.put(namespacePair, tableSizeInfo);
} catch (final SQLException e) {
LOGGER.warn("Error occurred while attempting to estimate sync size", e);
}
});
return tableSizeInfoMap;
}
private static List<JsonNode> getTableEstimate(final JdbcDatabase database, final String namespace, final String name)
throws SQLException {
// Construct the table estimate query.
final String tableEstimateQuery =
String.format(TABLE_ESTIMATE_QUERY, TABLE_SIZE_BYTES_COL, AVG_ROW_LENGTH, namespace, name);
LOGGER.info("table estimate query: {}", tableEstimateQuery);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
return jsonNodes;
}
}

View File

@@ -35,6 +35,7 @@ public class MySqlInitialLoadGlobalStateManager implements MySqlInitialLoadState
// have completed the snapshot.
private final Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot;
MySqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams,
final Map<AirbyteStreamNameNamespacePair, PrimaryKeyInfo> pairToPrimaryKeyInfo,
final CdcState cdcState, final ConfiguredAirbyteCatalog catalog) {
@@ -55,6 +56,7 @@ public class MySqlInitialLoadGlobalStateManager implements MySqlInitialLoadState
return streamsThatHaveCompletedSnapshot;
}
private static Map<AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> initPairToPrimaryKeyLoadStatusMap(
final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> pairToPkStatus) {
final Map<AirbyteStreamNameNamespacePair, PrimaryKeyLoadStatus> map = new HashMap<>();
@@ -82,6 +84,11 @@ public class MySqlInitialLoadGlobalStateManager implements MySqlInitialLoadState
.withGlobal(globalState);
}
@Override
public void updatePrimaryKeyLoadState(final AirbyteStreamNameNamespacePair pair, final PrimaryKeyLoadStatus pkLoadStatus) {
pairToPrimaryKeyLoadStatus.put(pair, pkLoadStatus);
}
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
streamsThatHaveCompletedSnapshot.add(pair);
final List<AirbyteStreamState> streamStates = new ArrayList<>();

View File

@@ -1,18 +1,15 @@
package io.airbyte.integrations.source.mysql.initialsync;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mysql.cj.MysqlType;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.PrimaryKeyInfo;
import io.airbyte.integrations.source.mysql.MySqlQueryUtils.TableSizeInfo;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CommonField;
@@ -24,9 +21,6 @@ import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -35,7 +29,6 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,18 +43,24 @@ public class MySqlInitialLoadHandler {
private final MySqlInitialLoadStateManager initialLoadStateManager;
private final Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;
private static final long QUERY_TARGET_SIZE_GB = 1_073_741_824;
private static final long DEFAULT_CHUNK_SIZE = 1_000_000;
final Map<AirbyteStreamNameNamespacePair, TableSizeInfo> tableSizeInfoMap;
public MySqlInitialLoadHandler(final JsonNode config,
final JdbcDatabase database,
final MySqlInitialLoadSourceOperations sourceOperations,
final String quoteString,
final MySqlInitialLoadStateManager initialLoadStateManager,
final Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier) {
final Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier,
final Map<AirbyteStreamNameNamespacePair, TableSizeInfo> tableSizeInfoMap) {
this.config = config;
this.database = database;
this.sourceOperations = sourceOperations;
this.quoteString = quoteString;
this.initialLoadStateManager = initialLoadStateManager;
this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier;
this.tableSizeInfoMap = tableSizeInfoMap;
}
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
@@ -88,7 +87,9 @@ public class MySqlInitialLoadHandler {
.map(CommonField::getName)
.filter(CatalogHelpers.getTopLevelFieldNames(airbyteStream)::contains)
.collect(Collectors.toList());
final AutoCloseableIterator<JsonNode> queryStream = queryTablePk(selectedDatabaseFields, table.getNameSpace(), table.getName());
final AutoCloseableIterator<JsonNode> queryStream =
new MySqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair,
calculateChunkSize(tableSizeInfoMap.get(pair), pair));
final AutoCloseableIterator<AirbyteMessage> recordIterator =
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, pair);
@@ -100,75 +101,18 @@ public class MySqlInitialLoadHandler {
return iteratorList;
}
private AutoCloseableIterator<JsonNode> queryTablePk(
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);
final AirbyteStreamNameNamespacePair airbyteStream =
AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<JsonNode> stream = database.unsafeQuery(
connection -> createPkQueryStatement(connection, columnNames, schemaName, tableName, airbyteStream),
sourceOperations::rowToJson);
return AutoCloseableIterators.fromStream(stream, airbyteStream);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}, airbyteStream);
}
private PreparedStatement createPkQueryStatement(
final Connection connection,
final List<String> columnNames,
final String schemaName,
final String tableName,
final AirbyteStreamNameNamespacePair pair) {
try {
LOGGER.info("Preparing query for table: {}", tableName);
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName,
quoteString);
final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString);
final PrimaryKeyLoadStatus pkLoadStatus = initialLoadStateManager.getPrimaryKeyLoadStatus(pair);
final PrimaryKeyInfo pkInfo = initialLoadStateManager.getPrimaryKeyInfo(pair);
final PreparedStatement preparedStatement =
getPkPreparedStatement(connection, wrappedColumnNames, fullTableName, pkLoadStatus, pkInfo);
LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement);
return preparedStatement;
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
private PreparedStatement getPkPreparedStatement(final Connection connection,
final String wrappedColumnNames,
final String fullTableName,
final PrimaryKeyLoadStatus pkLoadStatus,
final PrimaryKeyInfo pkInfo)
throws SQLException {
if (pkLoadStatus == null) {
final String quotedCursorField = enquoteIdentifier(pkInfo.pkFieldName(), quoteString);
final String sql = String.format("SELECT %s FROM %s ORDER BY %s", wrappedColumnNames, fullTableName,
quotedCursorField, quotedCursorField);
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
return preparedStatement;
} else {
final String quotedCursorField = enquoteIdentifier(pkLoadStatus.getPkName(), quoteString);
// Since a pk is unique, we can issue a > query instead of a >=, as there cannot be two records with the same pk.
final String sql = String.format("SELECT %s FROM %s WHERE %s > ? ORDER BY %s", wrappedColumnNames, fullTableName,
quotedCursorField, quotedCursorField);
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
final MysqlType cursorFieldType = pkInfo.fieldType();
sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, pkLoadStatus.getPkVal());
return preparedStatement;
// Calculates the number of rows to fetch per query.
@VisibleForTesting
public static long calculateChunkSize(final TableSizeInfo tableSizeInfo, final AirbyteStreamNameNamespacePair pair) {
// If table size info could not be calculated, a default chunk size will be provided.
if (tableSizeInfo == null || tableSizeInfo.tableSize() == 0 || tableSizeInfo.avgRowLength() == 0) {
LOGGER.info("Chunk size could not be determined for pair: {}, defaulting to {} rows", pair, DEFAULT_CHUNK_SIZE);
return DEFAULT_CHUNK_SIZE;
}
final long avgRowLength = tableSizeInfo.avgRowLength();
final long chunkSize = QUERY_TARGET_SIZE_GB / avgRowLength;
LOGGER.info("Chunk size determined for pair: {}, is {}", pair, chunkSize);
return chunkSize;
}
// Transforms the given iterator to create an {@link AirbyteRecordMessage}

View File

@@ -0,0 +1,145 @@
package io.airbyte.integrations.source.mysql.initialsync;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import autovalue.shaded.com.google.common.collect.AbstractIterator;
import com.fasterxml.jackson.databind.JsonNode;
import com.mysql.cj.MysqlType;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.PrimaryKeyInfo;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Stream;
import javax.annotation.CheckForNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This record iterator operates over a single stream. It continuously reads data from a table via multiple queries with the configured
* chunk size until the entire table is processed. The next query uses the highest watermark of the primary key seen in the previous
* subquery. Consider a table with chunk size = 1,000,000, and 3,500,000 records. The series of queries executed are :
* Query 1 : select * from table order by pk limit 1,800,000, pk_max = pk_max_1
* Query 2 : select * from table where pk > pk_max_1 order by pk limit 1,800,000, pk_max = pk_max_2
* Query 3 : select * from table where pk > pk_max_2 order by pk limit 1,800,000, pk_max = pk_max_3
* Query 4 : select * from table where pk > pk_max_3 order by pk limit 1,800,000, pk_max = pk_max_4
* Query 5 : select * from table where pk > pk_max_4 order by pk limit 1,800,000. Final query, since there are zero records processed here.
*/
public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
implements AutoCloseableIterator<JsonNode> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadRecordIterator.class);
private final MySqlInitialLoadSourceOperations sourceOperations;
private final String quoteString;
private final MySqlInitialLoadStateManager initialLoadStateManager;
private final List<String> columnNames;
private final AirbyteStreamNameNamespacePair pair;
private final JdbcDatabase database;
// Represents the number of rows to get with each query.
private final long chunkSize;
private final PrimaryKeyInfo pkInfo;
private int numSubqueries = 0;
private AutoCloseableIterator<JsonNode> currentIterator;
MySqlInitialLoadRecordIterator(
final JdbcDatabase database,
final MySqlInitialLoadSourceOperations sourceOperations,
final String quoteString,
final MySqlInitialLoadStateManager initialLoadStateManager,
final List<String> columnNames,
final AirbyteStreamNameNamespacePair pair,
final long chunkSize) {
this.database = database;
this.sourceOperations = sourceOperations;
this.quoteString = quoteString;
this.initialLoadStateManager = initialLoadStateManager;
this.columnNames = columnNames;
this.pair = pair;
this.chunkSize = chunkSize;
this.pkInfo = initialLoadStateManager.getPrimaryKeyInfo(pair);
}
@CheckForNull
@Override
protected JsonNode computeNext() {
if (shouldBuildNextSubquery()) {
try {
LOGGER.info("Subquery number : {}", numSubqueries);
final Stream<JsonNode> stream = database.unsafeQuery(
connection -> getPkPreparedStatement(connection), sourceOperations::rowToJson);
// Previous stream (and connection) must be manually closed in this iterator.
if (currentIterator != null) {
currentIterator.close();
}
currentIterator = AutoCloseableIterators.fromStream(stream, pair);
numSubqueries++;
// If the current subquery has no records associated with it, the entire stream has been read.
if (!currentIterator.hasNext()) {
return endOfData();
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
return currentIterator.next();
}
private boolean shouldBuildNextSubquery() {
// The next sub-query should be built if (i) it is the first subquery in the sequence. (ii) the previous subquery has finished.
return currentIterator == null || !currentIterator.hasNext();
}
private PreparedStatement getPkPreparedStatement(final Connection connection) {
try {
final String tableName = pair.getName();
final String schemaName = pair.getNamespace();
LOGGER.info("Preparing query for table: {}", tableName);
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName,
quoteString);
final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString);
final PrimaryKeyLoadStatus pkLoadStatus = initialLoadStateManager.getPrimaryKeyLoadStatus(pair);
if (pkLoadStatus == null) {
LOGGER.info("pkLoadStatus is null");
final String quotedCursorField = enquoteIdentifier(pkInfo.pkFieldName(), quoteString);
final String sql = String.format("SELECT %s FROM %s ORDER BY %s LIMIT %s", wrappedColumnNames, fullTableName,
quotedCursorField, chunkSize);
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement);
return preparedStatement;
} else {
LOGGER.info("pkLoadStatus value is : {}", pkLoadStatus.getPkVal());
final String quotedCursorField = enquoteIdentifier(pkLoadStatus.getPkName(), quoteString);
// Since a pk is unique, we can issue a > query instead of a >=, as there cannot be two records with the same pk.
final String sql = String.format("SELECT %s FROM %s WHERE %s > ? ORDER BY %s LIMIT %s", wrappedColumnNames, fullTableName,
quotedCursorField, quotedCursorField, chunkSize);
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
final MysqlType cursorFieldType = pkInfo.fieldType();
sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, pkLoadStatus.getPkVal());
LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement);
return preparedStatement;
}
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws Exception {
if (currentIterator != null) {
currentIterator.close();
}
}
}

View File

@@ -29,6 +29,14 @@ public class MySqlInitialLoadSourceOperations extends MySqlSourceOperations {
final int columnCount = metadata.getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
for (int i = 1; i <= columnCount; i++) {
// attempt to access the column. this allows us to know if it is null before we do type-specific
// parsing. if it is null, we can move on. while awkward, this seems to be the agreed upon way of
// checking for null values with jdbc.
queryContext.getObject(i);
if (queryContext.wasNull()) {
continue;
}
// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode);
}

View File

@@ -14,10 +14,13 @@ public interface MySqlInitialLoadStateManager {
// Returns an intermediate state message for the initial sync.
AirbyteStateMessage createIntermediateStateMessage(final AirbyteStreamNameNamespacePair pair, final PrimaryKeyLoadStatus pkLoadStatus);
// Updates the {@link PrimaryKeyLoadStatus} for the state associated with the given pair
void updatePrimaryKeyLoadState(final AirbyteStreamNameNamespacePair pair, final PrimaryKeyLoadStatus pkLoadStatus);
// Returns the final state message for the initial sync.
AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun);
// Returns the previous state, represented as a {@link PrimaryKeyLoadStatus} associated with the stream.
// Returns the previous state emitted, represented as a {@link PrimaryKeyLoadStatus} associated with the stream.
PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final AirbyteStreamNameNamespacePair pair);
// Returns the current {@PrimaryKeyInfo}, associated with the stream. This includes the data type & the column name associated with the stream.

View File

@@ -1,5 +1,6 @@
package io.airbyte.integrations.source.mysql.initialsync;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams;
import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadGlobalStateManager.STATE_TYPE_KEY;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.PRIMARY_KEY_STATE_TYPE;
@@ -120,12 +121,12 @@ public class MySqlInitialReadUtil {
final MySqlInitialLoadSourceOperations sourceOperations =
new MySqlInitialLoadSourceOperations(
Optional.of(new CdcMetadataInjector(emittedAt.toString(), stateAttributes, metadataInjector)));
final MySqlInitialLoadHandler initialLoadHandler = new MySqlInitialLoadHandler(sourceConfig, database,
sourceOperations,
quoteString,
initialLoadStateManager,
namespacePair -> Jsons.emptyObject());
namespacePair -> Jsons.emptyObject(),
getTableSizeInfoForStreams(database, catalog.getStreams(), quoteString));
initialLoadIterator.addAll(initialLoadHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()),

View File

@@ -23,12 +23,12 @@ public class MySqlInitialSyncStateIterator extends AbstractIterator<AirbyteMessa
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialSyncStateIterator.class);
public static final Duration SYNC_CHECKPOINT_DURATION = Duration.ofMinutes(15);
public static final Integer SYNC_CHECKPOINT_RECORDS = 10_000;
public static final Integer SYNC_CHECKPOINT_RECORDS = 100_000;
private final Iterator<AirbyteMessage> messageIterator;
private final AirbyteStreamNameNamespacePair pair;
private boolean hasEmittedFinalState = false;
private String lastPk;
private PrimaryKeyLoadStatus pkStatus;
private final JsonNode streamStateForIncrementalRun;
private final MySqlInitialLoadStateManager stateManager;
private long recordCount = 0L;
@@ -50,6 +50,7 @@ public class MySqlInitialSyncStateIterator extends AbstractIterator<AirbyteMessa
this.syncCheckpointDuration = checkpointDuration;
this.syncCheckpointRecords = checkpointRecords;
this.pkFieldName = stateManager.getPrimaryKeyInfo(pair).pkFieldName();
this.pkStatus = stateManager.getPrimaryKeyLoadStatus(pair);
}
@CheckForNull
@@ -57,13 +58,7 @@ public class MySqlInitialSyncStateIterator extends AbstractIterator<AirbyteMessa
protected AirbyteMessage computeNext() {
if (messageIterator.hasNext()) {
if ((recordCount >= syncCheckpointRecords || Duration.between(lastCheckpoint, OffsetDateTime.now()).compareTo(syncCheckpointDuration) > 0)
&& Objects.nonNull(lastPk)) {
final PrimaryKeyLoadStatus pkStatus = new PrimaryKeyLoadStatus()
.withVersion(MYSQL_STATUS_VERSION)
.withStateType(StateType.PRIMARY_KEY)
.withPkName(pkFieldName)
.withPkVal(lastPk)
.withIncrementalState(streamStateForIncrementalRun);
&& Objects.nonNull(pkStatus)) {
LOGGER.info("Emitting initial sync pk state for stream {}, state is {}", pair, pkStatus);
recordCount = 0L;
lastCheckpoint = Instant.now();
@@ -75,7 +70,14 @@ public class MySqlInitialSyncStateIterator extends AbstractIterator<AirbyteMessa
try {
final AirbyteMessage message = messageIterator.next();
if (Objects.nonNull(message)) {
lastPk = message.getRecord().getData().get(pkFieldName).asText();
final String lastPk = message.getRecord().getData().get(pkFieldName).asText();
pkStatus = new PrimaryKeyLoadStatus()
.withVersion(MYSQL_STATUS_VERSION)
.withStateType(StateType.PRIMARY_KEY)
.withPkName(pkFieldName)
.withPkVal(lastPk)
.withIncrementalState(streamStateForIncrementalRun);
stateManager.updatePrimaryKeyLoadState(pair, pkStatus);
}
recordCount++;
return message;

View File

@@ -0,0 +1,31 @@
package io.airbyte.integrations.source.mysql;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.airbyte.integrations.source.mysql.MySqlQueryUtils.TableSizeInfo;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadHandler;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import org.junit.jupiter.api.Test;
public class MySqlInitialLoadHandlerTest {
private static final long ONE_GB = 1_073_741_824;
private static final long ONE_MB = 1_048_576;
@Test
void testInvalidOrNullTableSizeInfo() {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair("table_name", "schema_name");
assertEquals(MySqlInitialLoadHandler.calculateChunkSize(null, pair), 1_000_000L);
final TableSizeInfo invalidRowLengthInfo = new TableSizeInfo(ONE_GB, 0L);
assertEquals(MySqlInitialLoadHandler.calculateChunkSize(invalidRowLengthInfo, pair), 1_000_000L);
final TableSizeInfo invalidTableSizeInfo = new TableSizeInfo(0L, 0L);
assertEquals(MySqlInitialLoadHandler.calculateChunkSize(invalidTableSizeInfo, pair), 1_000_000L);
}
@Test
void testTableSizeInfo() {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair("table_name", "schema_name");
assertEquals(MySqlInitialLoadHandler.calculateChunkSize(new TableSizeInfo(ONE_GB, 2 * ONE_MB), pair), 512L);
assertEquals(MySqlInitialLoadHandler.calculateChunkSize(new TableSizeInfo(ONE_GB, 200L), pair), 5368709L);
}
}