1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[Source-MongoDb] : Better logging for MongoDB (#33430)

This commit is contained in:
Akash Kulkarni
2023-12-13 10:13:48 -08:00
committed by GitHub
parent f8da0be7fa
commit f2e0585508
4 changed files with 7 additions and 6 deletions

View File

@@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.0.11
dockerImageTag: 1.0.12
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2

View File

@@ -67,7 +67,7 @@ public class MongoDbCdcInitialSnapshotUtils {
final List<ConfiguredAirbyteStream> initialSnapshotStreams = new ArrayList<>();
if (!savedOffsetIsValid) {
LOGGER.debug("Offset state is invalid. Add all {} stream(s) from the configured catalog to perform an initial snapshot.",
LOGGER.info("Offset state is invalid. Add all {} stream(s) from the configured catalog to perform an initial snapshot.",
fullCatalog.getStreams().size());
/*
@@ -87,7 +87,7 @@ public class MongoDbCdcInitialSnapshotUtils {
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
LOGGER.debug("There are {} stream(s) that are still in progress of an initial snapshot sync.", streamsStillInInitialSnapshot.size());
LOGGER.info("There are {} stream(s) that are still in progress of an initial snapshot sync.", streamsStillInInitialSnapshot.size());
// Fetch the streams from the catalog that still need to complete the initial snapshot sync
initialSnapshotStreams.addAll(fullCatalog.getStreams().stream()
@@ -98,7 +98,7 @@ public class MongoDbCdcInitialSnapshotUtils {
// Fetch the streams added to the catalog since the last sync
final List<ConfiguredAirbyteStream> newStreams = identifyStreamsToSnapshot(fullCatalog,
new HashSet<>(stateManager.getStreamStates().keySet()));
LOGGER.debug("There are {} stream(s) that have been added to the catalog since the last sync.", newStreams.size());
LOGGER.info("There are {} stream(s) that have been added to the catalog since the last sync.", newStreams.size());
initialSnapshotStreams.addAll(newStreams);
}

View File

@@ -110,11 +110,11 @@ public class MongoDbCdcInitializer {
optSavedOffset.filter(savedOffset -> mongoDbDebeziumStateUtil.isValidResumeToken(savedOffset, mongoClient)).isPresent();
if (!savedOffsetIsValid) {
LOGGER.debug("Saved offset is not valid. Airbyte will trigger a full refresh.");
LOGGER.info("Saved offset is not valid. Airbyte will trigger a full refresh.");
// If the offset in the state is invalid, reset the state to the initial STATE
stateManager.resetState(new MongoDbCdcState(initialDebeziumState));
} else {
LOGGER.debug("Valid offset state discovered. Updating state manager with retrieved CDC state {}...", cdcState);
LOGGER.info("Valid offset state discovered. Updating state manager with retrieved CDC state {}...", cdcState);
stateManager.updateCdcState(new MongoDbCdcState(cdcState));
}