1
0
mirror of synced 2025-12-25 02:09:19 -05:00

postgres+mysql sources: use constants to keep checkpointing behaviour same (#30197)

Co-authored-by: subodh1810 <subodh1810@users.noreply.github.com>
This commit is contained in:
Subodh Kant Chaturvedi
2023-09-12 13:13:23 +05:30
committed by GitHub
parent 51c67d7eaa
commit ad47884297
11 changed files with 52 additions and 21 deletions

View File

@@ -4,6 +4,11 @@
package io.airbyte.integrations.debezium;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
@@ -117,10 +122,10 @@ public class AirbyteDebeziumHandler<T> {
firstRecordWaitTime);
final Duration syncCheckpointDuration =
config.get("sync_checkpoint_seconds") != null ? Duration.ofSeconds(config.get("sync_checkpoint_seconds").asLong())
: DebeziumStateDecoratingIterator.SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get("sync_checkpoint_records") != null ? config.get("sync_checkpoint_records").asLong()
: DebeziumStateDecoratingIterator.SYNC_CHECKPOINT_RECORDS;
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: SYNC_CHECKPOINT_RECORDS;
return AutoCloseableIterators.fromIterator(new DebeziumStateDecoratingIterator<>(
eventIterator,
cdcStateHandler,

View File

@@ -0,0 +1,17 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import java.time.Duration;
public class DebeziumIteratorConstants {
public static final String SYNC_CHECKPOINT_DURATION_PROPERTY = "sync_checkpoint_seconds";
public static final String SYNC_CHECKPOINT_RECORDS_PROPERTY = "sync_checkpoint_records";
public static final Duration SYNC_CHECKPOINT_DURATION = Duration.ofMinutes(15);
public static final Integer SYNC_CHECKPOINT_RECORDS = 10_000;
}

View File

@@ -27,9 +27,6 @@ import org.slf4j.LoggerFactory;
*/
public class DebeziumStateDecoratingIterator<T> extends AbstractIterator<AirbyteMessage> implements Iterator<AirbyteMessage> {
public static final Duration SYNC_CHECKPOINT_DURATION = Duration.ofMinutes(15);
public static final Integer SYNC_CHECKPOINT_RECORDS = 10_000;
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumStateDecoratingIterator.class);
private final Iterator<ChangeEventWithMetadata> changeEventIterator;

View File

@@ -4,6 +4,9 @@
package io.airbyte.integrations.source.mysql.initialsync;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mysql.cj.MysqlType;
@@ -164,9 +167,9 @@ public class MySqlInitialLoadHandler {
: currentPkLoadStatus.getIncrementalState();
final Duration syncCheckpointDuration =
config.get("sync_checkpoint_seconds") != null ? Duration.ofSeconds(config.get("sync_checkpoint_seconds").asLong())
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: MySqlInitialSyncStateIterator.SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get("sync_checkpoint_records") != null ? config.get("sync_checkpoint_records").asLong()
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: MySqlInitialSyncStateIterator.SYNC_CHECKPOINT_RECORDS;
return AutoCloseableIterators.transformIterator(

View File

@@ -8,6 +8,7 @@ import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadS
import autovalue.shaded.com.google.common.collect.AbstractIterator;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.debezium.DebeziumIteratorConstants;
import io.airbyte.integrations.source.mysql.internal.models.InternalModels.StateType;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
@@ -26,8 +27,8 @@ import org.slf4j.LoggerFactory;
public class MySqlInitialSyncStateIterator extends AbstractIterator<AirbyteMessage> implements Iterator<AirbyteMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialSyncStateIterator.class);
public static final Duration SYNC_CHECKPOINT_DURATION = Duration.ofMinutes(15);
public static final Integer SYNC_CHECKPOINT_RECORDS = 100_000;
public static final Duration SYNC_CHECKPOINT_DURATION = DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION;
public static final Integer SYNC_CHECKPOINT_RECORDS = DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS;
private final Iterator<AirbyteMessage> messageIterator;
private final AirbyteStreamNameNamespacePair pair;

View File

@@ -4,6 +4,7 @@
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_CDC_OFFSET;
@@ -121,7 +122,7 @@ public class CdcMysqlSourceTest extends CdcSourceTest {
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("replication_method", replicationMethod)
.put("sync_checkpoint_records", 1)
.put(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.put("is_test", true)
.build());
}

View File

@@ -8,6 +8,7 @@ import static io.airbyte.integrations.source.postgres.ctid.CtidStateManager.CTID
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import io.airbyte.integrations.debezium.DebeziumIteratorConstants;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
@@ -27,8 +28,8 @@ import org.slf4j.LoggerFactory;
public class CtidStateIterator extends AbstractIterator<AirbyteMessage> implements Iterator<AirbyteMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(CtidStateIterator.class);
public static final Duration SYNC_CHECKPOINT_DURATION = Duration.ofMinutes(15);
public static final Integer SYNC_CHECKPOINT_RECORDS = 10_000;
public static final Duration SYNC_CHECKPOINT_DURATION = DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION;
public static final Integer SYNC_CHECKPOINT_RECORDS = DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS;
private final Iterator<AirbyteMessageWithCtid> messageIterator;
private final AirbyteStreamNameNamespacePair pair;

View File

@@ -4,6 +4,8 @@
package io.airbyte.integrations.source.postgres.ctid;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY;
import static io.airbyte.integrations.source.postgres.ctid.InitialSyncCtidIteratorConstants.USE_TEST_CHUNK_SIZE;
import com.fasterxml.jackson.databind.JsonNode;
@@ -162,9 +164,9 @@ public class PostgresCtidHandler {
(currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair)
: currentCtidStatus.getIncrementalState();
final Duration syncCheckpointDuration =
config.get("sync_checkpoint_seconds") != null ? Duration.ofSeconds(config.get("sync_checkpoint_seconds").asLong())
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: CtidStateIterator.SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get("sync_checkpoint_records") != null ? config.get("sync_checkpoint_records").asLong()
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: CtidStateIterator.SYNC_CHECKPOINT_RECORDS;
return AutoCloseableIterators.transformIterator(

View File

@@ -4,6 +4,8 @@
package io.airbyte.integrations.source.postgres;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_LSN;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
@@ -153,7 +155,7 @@ public class CdcPostgresSourceTest extends CdcSourceTest {
.put(JdbcUtils.SSL_KEY, false)
.put("is_test", true)
.put("replication_method", replicationMethod)
.put("sync_checkpoint_records", 1)
.put(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.build());
}
@@ -933,8 +935,8 @@ public class CdcPostgresSourceTest extends CdcSourceTest {
writeModelRecord(record);
}
final JsonNode config = getConfig();
((ObjectNode) config).put("sync_checkpoint_seconds", 1);
((ObjectNode) config).put("sync_checkpoint_records", 100_000);
((ObjectNode) config).put(SYNC_CHECKPOINT_DURATION_PROPERTY, 1);
((ObjectNode) config).put(SYNC_CHECKPOINT_RECORDS_PROPERTY, 100_000);
final JsonNode stateAfterFirstSync = Jsons.jsonNode(Collections.singletonList(stateMessages.get(stateMessages.size() - 1)));
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()

View File

@@ -4,6 +4,7 @@
package io.airbyte.integrations.source.postgres;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY;
import static io.airbyte.integrations.source.postgres.ctid.CtidStateManager.STATE_TYPE_KEY;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.extractSpecificFieldFromCombinedMessages;
@@ -515,7 +516,7 @@ class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
@Test
void testReadMultipleTablesIncrementally() throws Exception {
((ObjectNode) config).put("sync_checkpoint_records", 1);
((ObjectNode) config).put(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1);
final String namespace = getDefaultNamespace();
final String streamOneName = TABLE_NAME + "one";
// Create a fresh first table

View File

@@ -4,6 +4,7 @@
package io.airbyte.integrations.source.postgres;
import static io.airbyte.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.extractStateMessage;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.filterRecords;
@@ -174,7 +175,7 @@ class XminPostgresSourceTest {
.put(JdbcUtils.PASSWORD_KEY, psqlDb.getPassword())
.put(JdbcUtils.SSL_KEY, false)
.put("replication_method", getReplicationMethod())
.put("sync_checkpoint_records", 1)
.put(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.build());
}