1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[source-postgres/mssql/mysql] Send state and count for full refresh (#39349)

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: btkcodedev <btk.codedev@gmail.com>
Co-authored-by: Anjay Goel <anjay.g@dashtoon.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Baz <oleksandr.bazarnov@globallogic.com>
Co-authored-by: Natalie Kwong <38087517+nataliekwong@users.noreply.github.com>
Co-authored-by: Audrey Maldonado <audrey.maldonado@gmail.com>
Co-authored-by: gosusnp <gosusnp@users.noreply.github.com>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Abdul Rahman Zantout <abed-zantout@live.com>
Co-authored-by: Anatolii Yatsuk <35109939+tolik0@users.noreply.github.com>
Co-authored-by: Dhroov Makwana <pabloescoder@gmail.com>
Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Ben Church <ben@airbyte.io>
Co-authored-by: Justin Flannery <juftin@juftin.com>
Co-authored-by: Mal Hancock <mallory@archangelic.space>
Co-authored-by: Enrique Alcázar Garzás <enrique.alcazar@omniloy.com>
Co-authored-by: Daniela García Nistor <58893405+gndaniela@users.noreply.github.com>
Co-authored-by: Carlo Nuccio <carlonuccio91@gmail.com>
Co-authored-by: Natik Gadzhi <natik@respawn.io>
Co-authored-by: Charlie Duong <charlie.duong@hey.com>
Co-authored-by: Yue Li <61070669+theyueli@users.noreply.github.com>
Co-authored-by: Cristina Mariscal <166420606+cmm-airbyte@users.noreply.github.com>
Co-authored-by: cristina.mariscal <cristina.mariscal@cristina.mariscal--MacBook-Pro---DFJ27FJFXX>
Co-authored-by: Gonzalo Villafañe Tapia <gvillafanetapia@gmail.com>
Co-authored-by: Jérémy Denquin <jdenquin@users.noreply.github.com>
Co-authored-by: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com>
Co-authored-by: williammcguinness <wmcguinness@findoctave.com>
Co-authored-by: Marius Posta <marius@airbyte.io>
Co-authored-by: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com>
Co-authored-by: Gireesh Sreepathi <gisripa@gmail.com>
Co-authored-by: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com>
Co-authored-by: Danylo Jablonski <150933663+DanyloGL@users.noreply.github.com>
Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com>
Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
This commit is contained in:
Xiaohan Song
2024-06-17 10:30:56 -07:00
committed by GitHub
parent f9bade8aac
commit b80a72888c
23 changed files with 222 additions and 69 deletions

View File

@@ -173,7 +173,8 @@ corresponds to that version.
### Java CDK
| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------| :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |
| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation |
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |

View File

@@ -1 +1 @@
version=0.40.0
version=0.40.1

View File

@@ -46,6 +46,9 @@ import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.functional.CheckedConsumer
@@ -70,6 +73,7 @@ import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.function.Consumer
@@ -173,12 +177,40 @@ abstract class AbstractJdbcSource<Datatype>(
cursorField,
)
if (airbyteStream.syncMode == FULL_REFRESH) {
var defaultProducer = getSourceStateProducerForNonResumableFullRefreshStream(database)
if (defaultProducer != null) {
iterator =
AutoCloseableIterators.transform(
{ autoCloseableIterator: AutoCloseableIterator<AirbyteMessage> ->
SourceStateIterator(
autoCloseableIterator,
airbyteStream,
defaultProducer,
StateEmitFrequency(stateEmissionFrequency.toLong(), Duration.ZERO)
)
},
iterator,
AirbyteStreamUtils.convertFromNameAndNamespace(
airbyteStream.stream.name,
airbyteStream.stream.namespace
)
)
}
}
return when (airbyteStream.syncMode) {
FULL_REFRESH -> augmentWithStreamStatus(airbyteStream, iterator)
else -> iterator
}
}
protected open fun getSourceStateProducerForNonResumableFullRefreshStream(
database: JdbcDatabase
): SourceStateMessageProducer<AirbyteMessage>? {
return null
}
open fun augmentWithStreamStatus(
airbyteStream: ConfiguredAirbyteStream,
streamItrator: AutoCloseableIterator<AirbyteMessage>

View File

@@ -0,0 +1,52 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.integrations.source.relationaldb.state
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamState
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.StreamDescriptor
class NonResumableStateMessageProducer<T>(
private val isCdc: Boolean,
private val sourceStateMessageProducer: SourceStateMessageProducer<T>
) : SourceStateMessageProducer<AirbyteMessage> {
override fun generateStateMessageAtCheckpoint(
stream: ConfiguredAirbyteStream?
): AirbyteStateMessage? {
return null
}
override fun processRecordMessage(
stream: ConfiguredAirbyteStream?,
message: AirbyteMessage
): AirbyteMessage {
return message
}
override fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage? {
if (isCdc) {
return sourceStateMessageProducer.createFinalStateMessage(stream)
} else {
val airbyteStreamState =
AirbyteStreamState()
.withStreamDescriptor(
StreamDescriptor()
.withName(stream!!.stream.name)
.withNamespace(stream.stream.namespace),
)
return AirbyteStateMessage()
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
.withStream(airbyteStreamState)
}
}
// no intermediate state message.
override fun shouldEmitStateMessage(stream: ConfiguredAirbyteStream?): Boolean {
return false
}
}

View File

@@ -7,11 +7,13 @@ import com.google.common.collect.AbstractIterator
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Duration
import java.time.Instant
import java.time.OffsetDateTime
private val LOGGER = KotlinLogging.logger {}
open class SourceStateIterator<T>(
private val messageIterator: Iterator<T>,
private val stream: ConfiguredAirbyteStream?,
@@ -40,11 +42,9 @@ open class SourceStateIterator<T>(
) {
val stateMessage =
sourceStateMessageProducer.generateStateMessageAtCheckpoint(stream)
if (shouldAttachCountWithState()) {
stateMessage!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
}
stateMessage!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
recordCount = 0L
lastCheckpoint = Instant.now()
@@ -65,11 +65,10 @@ open class SourceStateIterator<T>(
hasEmittedFinalState = true
val finalStateMessageForStream =
sourceStateMessageProducer.createFinalStateMessage(stream)
if (shouldAttachCountWithState()) {
finalStateMessageForStream!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
}
finalStateMessageForStream!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
recordCount = 0L
return AirbyteMessage()
.withType(AirbyteMessage.Type.STATE)
@@ -79,14 +78,6 @@ open class SourceStateIterator<T>(
}
}
/**
* We are disabling counts for FULL_REFRESH streams cause there is are issues with it. We should
* re-enable it once we do the work for project Counts: Emit Counts in Full Refresh
*/
private fun shouldAttachCountWithState(): Boolean {
return stream?.syncMode != SyncMode.FULL_REFRESH
}
// This method is used to check if we should emit a state message. If the record count is set to
// 0,
// we should not emit a state message.

View File

@@ -781,7 +781,11 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
modelsSchema(),
)
} else {
assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS.size.toLong())
// We are expecting count match for all streams, including non RFR streams.
assertExpectedStateMessageCountMatches(
stateMessages1,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
)
// Expect state and record message from MODEL_RECORDS_2.
assertStreamStatusTraceMessageIndex(
@@ -923,8 +927,11 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, 1)
// assertExpectedStateMessages(stateMessages1)
// Non resumeable full refresh does not get any state messages.
assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS.size.toLong())
// Non resumeable full refresh will also get state messages with count.
assertExpectedStateMessageCountMatches(
stateMessages1,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
)
assertExpectedRecords(
(MODEL_RECORDS_2 + MODEL_RECORDS).toSet(),
recordMessages1,
@@ -933,15 +940,18 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
modelsSchema(),
)
// Platform will remove non RFR streams before each new sync.
val state = Jsons.jsonNode(listOf(stateMessages1[stateMessages1.size - 1]))
val streamStates = state.get(0).get("global").get("stream_states") as ArrayNode
removeStreamState(MODELS_STREAM_NAME_2, streamStates)
val read2 = source().read(config()!!, configuredCatalog, state)
val actualRecords2 = AutoCloseableIterators.toListAndClose(read2)
val recordMessages2 = extractRecordMessages(actualRecords2)
val stateMessages2 = extractStateMessages(actualRecords2)
assertExpectedStateMessagesFromIncrementalSync(stateMessages2)
assertExpectedStateMessageCountMatches(stateMessages2, 1)
assertExpectedStateMessageCountMatches(stateMessages2, 1 + MODEL_RECORDS_2.size.toLong())
assertExpectedRecords(
(MODEL_RECORDS_2 + listOf(puntoRecord)).toSet(),
recordMessages2,

View File

@@ -3,7 +3,7 @@ plugins {
}
airbyteJavaConnector {
cdkVersionRequired = '0.38.1'
cdkVersionRequired = '0.40.1'
features = ['db-sources']
useLocalCdk = false
}

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.29
dockerImageTag: 4.0.30
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql

View File

@@ -9,6 +9,7 @@ import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConver
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_UPDATED_AT;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.*;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbReadUtil.identifyStreamsForCursorBased;
import static io.airbyte.integrations.source.mssql.MssqlCdcHelper.isCdc;
import static io.airbyte.integrations.source.mssql.MssqlQueryUtils.getCursorBasedSyncStatusForStreams;
import static io.airbyte.integrations.source.mssql.MssqlQueryUtils.getTableSizeInfoForStreams;
import static io.airbyte.integrations.source.mssql.initialsync.MssqlInitialReadUtil.*;
@@ -37,6 +38,8 @@ import io.airbyte.cdk.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.models.CursorBasedStatus;
import io.airbyte.cdk.integrations.source.relationaldb.state.NonResumableStateMessageProducer;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateGeneratorUtils;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManagerFactory;
@@ -639,7 +642,7 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
return;
}
var sourceConfig = database.getSourceConfig();
if (MssqlCdcHelper.isCdc(sourceConfig)) {
if (isCdc(sourceConfig)) {
initialLoadStateManager = getMssqlInitialLoadGlobalStateManager(database, catalog, stateManager, tableNameToTable, getQuoteString());
} else {
final MssqlCursorBasedStateManager cursorBasedStateManager = new MssqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
@@ -656,7 +659,7 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
final ConfiguredAirbyteCatalog catalog,
final StateManager stateManager) {
var sourceConfig = database.getSourceConfig();
if (MssqlCdcHelper.isCdc(sourceConfig)) {
if (isCdc(sourceConfig)) {
return getMssqlFullRefreshInitialLoadHandler(database, catalog, initialLoadStateManager, stateManager, airbyteStream, Instant.now(),
getQuoteString())
.get();
@@ -677,6 +680,11 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
return false;
}
@Override
protected SourceStateMessageProducer<AirbyteMessage> getSourceStateProducerForNonResumableFullRefreshStream(final JdbcDatabase database) {
return new NonResumableStateMessageProducer<>(isCdc(database.getSourceConfig()), initialLoadStateManager);
}
@NotNull
@Override
public AutoCloseableIterator<AirbyteMessage> augmentWithStreamStatus(@NotNull final ConfiguredAirbyteStream airbyteStream,

View File

@@ -30,6 +30,7 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan
// No special handling for resumable full refresh streams. We will report the cursor as it is.
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
public MssqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams,
final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo,
@@ -60,16 +61,21 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan
final ConfiguredAirbyteCatalog catalog) {
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
this.resumableFullRefreshStreams = new HashSet<>();
this.nonResumableFullRefreshStreams = new HashSet<>();
catalog.getStreams().forEach(configuredAirbyteStream -> {
var pairInStream =
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace());
if (!initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)
&& configuredAirbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
this.streamsThatHaveCompletedSnapshot.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
this.streamsThatHaveCompletedSnapshot.add(pairInStream);
}
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)
&& configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
this.resumableFullRefreshStreams.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) {
this.resumableFullRefreshStreams.add(pairInStream);
} else {
this.nonResumableFullRefreshStreams.add(pairInStream);
}
}
});
}
@@ -128,6 +134,12 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ocStatus)));
});
nonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
});
return new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(generateGlobalState(streamStates));

View File

@@ -6,7 +6,7 @@ plugins {
}
airbyteJavaConnector {
cdkVersionRequired = '0.38.0'
cdkVersionRequired = '0.40.1'
features = ['db-sources']
useLocalCdk = false
}

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.4.9
dockerImageTag: 3.4.10
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql

View File

@@ -46,6 +46,8 @@ import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.NonResumableStateMessageProducer;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateGeneratorUtils;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManagerFactory;
@@ -228,6 +230,11 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
}
}
@Override
protected SourceStateMessageProducer<AirbyteMessage> getSourceStateProducerForNonResumableFullRefreshStream(final JdbcDatabase database) {
return new NonResumableStateMessageProducer<>(isCdc(database.getSourceConfig()), initialLoadStateManager);
}
private static AirbyteStream overrideSyncModes(final AirbyteStream stream) {
return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL));
}

View File

@@ -41,6 +41,9 @@ public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateMan
// No special handling for resumable full refresh streams. We will report the cursor as it is.
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
// non ResumableFullRefreshStreams do not have any state. We only report count for them.
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
private final boolean savedOffsetStillPresentOnServer;
private final ConfiguredAirbyteCatalog catalog;
private final CdcState defaultCdcState;
@@ -66,16 +69,21 @@ public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateMan
final ConfiguredAirbyteCatalog catalog) {
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
this.resumableFullRefreshStreams = new HashSet<>();
this.nonResumableFullRefreshStreams = new HashSet<>();
catalog.getStreams().forEach(configuredAirbyteStream -> {
var pairInStream =
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace());
if (!initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)
&& configuredAirbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
this.streamsThatHaveCompletedSnapshot.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
this.streamsThatHaveCompletedSnapshot.add(pairInStream);
}
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)
&& configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
this.resumableFullRefreshStreams.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) {
this.resumableFullRefreshStreams.add(pairInStream);
} else {
this.nonResumableFullRefreshStreams.add(pairInStream);
}
}
});
}
@@ -137,6 +145,12 @@ public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateMan
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus))));
});
nonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
});
return new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(generateGlobalState(streamStates));

View File

@@ -12,7 +12,7 @@ java {
}
airbyteJavaConnector {
cdkVersionRequired = '0.38.2'
cdkVersionRequired = '0.40.1'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.17
dockerImageTag: 3.4.18
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres

View File

@@ -70,6 +70,8 @@ import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.NonResumableStateMessageProducer;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
import io.airbyte.commons.exceptions.ConfigErrorException;
@@ -862,6 +864,11 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
}
}
@Override
protected SourceStateMessageProducer<AirbyteMessage> getSourceStateProducerForNonResumableFullRefreshStream(final JdbcDatabase database) {
return new NonResumableStateMessageProducer<>(isCdc(database.getSourceConfig()), ctidStateManager);
}
@Override
public boolean supportResumableFullRefresh(final JdbcDatabase database, final ConfiguredAirbyteStream airbyteStream) {
// finalListOfStreamsToBeSyncedViaCtid will be initialized as part of state manager initialization

View File

@@ -36,6 +36,7 @@ public class CtidGlobalStateManager extends CtidStateManager {
private final StateManager stateManager;
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot;
private final boolean savedOffsetAfterReplicationSlotLSN;
private final CdcState defaultCdcState;
@@ -50,23 +51,29 @@ public class CtidGlobalStateManager extends CtidStateManager {
this.stateManager = stateManager;
this.savedOffsetAfterReplicationSlotLSN = savedOffsetAfterReplicationSlotLSN;
this.defaultCdcState = defaultCdcState;
initStream(ctidStreams, catalog);
this.fileNodeHandler = fileNodeHandler;
initStream(ctidStreams, catalog);
}
private void initStream(final CtidStreams ctidStreams,
final ConfiguredAirbyteCatalog catalog) {
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
this.resumableFullRefreshStreams = new HashSet<>();
this.nonResumableFullRefreshStreams = new HashSet<>();
catalog.getStreams().forEach(configuredAirbyteStream -> {
var pair =
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace());
if (!ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) && configuredAirbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
streamsThatHaveCompletedSnapshot.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
streamsThatHaveCompletedSnapshot.add(pair);
}
if (ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream)
&& configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
this.resumableFullRefreshStreams.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
if (fileNodeHandler.hasFileNode(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()))) {
this.resumableFullRefreshStreams.add(pair);
} else {
this.nonResumableFullRefreshStreams.add(pair);
}
}
});
}
@@ -104,7 +111,13 @@ public class CtidGlobalStateManager extends CtidStateManager {
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(ctidStatusForFullRefreshStream))));
});
if (!resumableFullRefreshStreams.contains(pair)) {
nonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
});
if (!resumableFullRefreshStreams.contains(pair) && !nonResumableFullRefreshStreams.contains(pair)) {
streamStates.add(getAirbyteStreamState(pair, (Jsons.jsonNode(ctidStatus))));
}
@@ -148,6 +161,12 @@ public class CtidGlobalStateManager extends CtidStateManager {
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ctidStatusForFullRefreshStream)));
});
nonResumableFullRefreshStreams.forEach(stream -> {
streamStates.add(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
});
return new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(generateGlobalState(streamStates));

View File

@@ -66,6 +66,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.junit.jupiter.api.BeforeEach;
@@ -84,11 +85,9 @@ public class CdcPostgresSourceTest extends CdcSourceTest<PostgresSource, Postgre
@Override
protected void assertExpectedStateMessageCountMatches(final List<? extends AirbyteStateMessage> stateMessages, long totalCount) {
// Count has been disabled due to we do not support RFR for non resumeable full refresh.
// AtomicLong count = new AtomicLong(0L);
// stateMessages.stream().forEach(stateMessage ->
// count.addAndGet(stateMessage.getSourceStats().getRecordCount().longValue()));
// assertEquals(totalCount, count.get());
AtomicLong count = new AtomicLong(0L);
stateMessages.stream().forEach(stateMessage -> count.addAndGet(stateMessage.getSourceStats().getRecordCount().longValue()));
assertEquals(totalCount, count.get());
}
@Override

View File

@@ -296,10 +296,8 @@ class PostgresSourceTest {
final Set<AirbyteMessage> actualMessages =
MoreIterators.toSet(source().read(anotherUserConfig, CONFIGURED_CATALOG, null));
setEmittedAtToNull(actualMessages);
// expect 6 records, 3 state messages and 4 stream status messages. (view does not have its own
// state message because it goes
// to non resumable full refresh path).
assertEquals(13, actualMessages.size());
// expect 6 records, 4 state messages and 4 stream status messages.
assertEquals(14, actualMessages.size());
final var actualRecordMessages = filterRecords(actualMessages);
assertEquals(PRIVILEGE_TEST_CASE_EXPECTED_MESSAGES, actualRecordMessages);
}

View File

@@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.0.30 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. |
| 4.0.29 | 2024-06-14 | [39506](https://github.com/airbytehq/airbyte/pull/39506) | Adopt latest CDK. |
| 4.0.28 | 2024-06-08 | [39342](https://github.com/airbytehq/airbyte/pull/39342) | Fix custom conversion in CDC for datetimeoffset type. |
| 4.0.27 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. |

View File

@@ -232,11 +232,12 @@ Any database or table encoding combination of charset and collation is supported
<summary>Expand to review</summary>
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.9 | 2024-06-11 | [39405](https://github.com/airbytehq/airbyte/pull/39405) | Adopt latest CDK. |
| 3.4.8 | 2024-06-05 | [39144](https://github.com/airbytehq/airbyte/pull/39144) | Upgrade Debezium to 2.5.4 |
| 3.4.7 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. |
| 3.4.6 | 2024-05-29 | [38538](https://github.com/airbytehq/airbyte/pull/38538) | Exit connector when encountering a config error. |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.10 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. |
| 3.4.9 | 2024-06-11 | [39405](https://github.com/airbytehq/airbyte/pull/39405) | Adopt latest CDK. |
| 3.4.8 | 2024-06-05 | [39144](https://github.com/airbytehq/airbyte/pull/39144) | Upgrade Debezium to 2.5.4 |
| 3.4.7 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. |
| 3.4.6 | 2024-05-29 | [38538](https://github.com/airbytehq/airbyte/pull/38538) | Exit connector when encountering a config error. |
| 3.4.5 | 2024-05-23 | [38198](https://github.com/airbytehq/airbyte/pull/38198) | Sync sending trace status messages indicating progress. |
| 3.4.4 | 2024-05-15 | [38208](https://github.com/airbytehq/airbyte/pull/38208) | disable counts in full refresh stream in state message. |
| 3.4.3 | 2024-05-13 | [38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages. |

View File

@@ -310,7 +310,8 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
<summary>Expand to review</summary>
| Version | Date | Pull Request | Subject |
| ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|---------| ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.4.18 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. |
| 3.4.17 | 2024-06-13 | [39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
| 3.4.16 | 2024-05-29 | [39474](https://github.com/airbytehq/airbyte/pull/39474) | Adopt latest CDK. |
| 3.4.15 | 2024-05-29 | [38773](https://github.com/airbytehq/airbyte/pull/38773) | Connect with adaptiveFetch=true. |