1
0
mirror of synced 2025-12-23 21:03:15 -05:00

source-mongo: Warn (vs fail) on different _id types in collection (#39145)

This commit is contained in:
Evan Tahler
2024-07-22 16:34:11 -07:00
committed by GitHub
parent 3d53fb38a0
commit d2ae38209f
5 changed files with 12 additions and 41 deletions

View File

@@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.4.2
dockerImageTag: 1.4.3
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2

View File

@@ -7,6 +7,7 @@ package io.airbyte.integrations.source.mongodb;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.*;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
@@ -18,6 +19,7 @@ import io.airbyte.integrations.source.mongodb.MongoUtil.CollectionStatistics;
import io.airbyte.integrations.source.mongodb.state.IdType;
import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager;
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
@@ -52,6 +54,8 @@ public class InitialSnapshotHandler {
final boolean decorateWithCompletedStatus) {
final boolean isEnforceSchema = config.getEnforceSchema();
final var checkpointInterval = config.getCheckpointInterval();
final String MULTIPLE_ID_TYPES_ANALYTICS_MESSAGE_KEY = "db-sources-mongo-multiple-id-types";
return streams
.stream()
.map(airbyteStream -> {
@@ -62,7 +66,10 @@ public class InitialSnapshotHandler {
final var idTypes = aggregateIdField(collection);
if (idTypes.size() > 1) {
throw new ConfigErrorException("The _id fields in a collection must be consistently typed (collection = " + collectionName + ").");
LOGGER.warn("The _id fields in this collection are not consistently typed, which may lead to data loss (collection = {}).",
collectionName);
AirbyteTraceMessageUtility
.emitAnalyticsTrace(new AirbyteAnalyticsTraceMessage().withType(MULTIPLE_ID_TYPES_ANALYTICS_MESSAGE_KEY).withValue("1"));
}
idTypes.stream().findFirst().ifPresent(idType -> {

View File

@@ -12,7 +12,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -378,25 +377,6 @@ class InitialSnapshotHandlerTest {
assertFalse(collection4.hasNext());
}
@Test
void testGetIteratorsThrowsExceptionWhenThereAreDifferentIdTypes() {
insertDocuments(COLLECTION1, List.of(
new Document(Map.of(
CURSOR_FIELD, OBJECT_ID1,
NAME_FIELD, NAME1)),
new Document(Map.of(
CURSOR_FIELD, "string-id",
NAME_FIELD, NAME2))));
final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler();
final MongoDbStateManager stateManager = mock(MongoDbStateManager.class);
final var thrown = assertThrows(ConfigErrorException.class,
() -> initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME),
/* MongoConstants.CHECKPOINT_INTERVAL, true */ CONFIG, false, false));
assertTrue(thrown.getMessage().contains("must be consistently typed"));
}
@Test
void testGetIteratorsThrowsExceptionWhenThereAreUnsupportedIdTypes() {
insertDocuments(COLLECTION1, List.of(

View File

@@ -273,23 +273,6 @@ class MongoDbCdcInitializerTest {
CONFIG));
}
@Test
void testMultipleIdTypesThrowsException() {
final Document aggregate1 = Document.parse("{\"_id\": {\"_id\": \"objectId\"}, \"count\": 1}");
final Document aggregate2 = Document.parse("{\"_id\": {\"_id\": \"string\"}, \"count\": 1}");
when(aggregateCursor.hasNext()).thenReturn(true, true, false);
when(aggregateCursor.next()).thenReturn(aggregate1, aggregate2);
doCallRealMethod().when(aggregateIterable).forEach(any(Consumer.class));
final MongoDbStateManager stateManager =
MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.IN_PROGRESS), CONFIG);
final var thrown = assertThrows(ConfigErrorException.class, () -> cdcInitializer
.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, CONFIG));
assertTrue(thrown.getMessage().contains("must be consistently typed"));
}
@Test
void testUnsupportedIdTypeThrowsException() {
final Document aggregate = Document.parse("{\"_id\": {\"_id\": \"exotic\"}, \"count\": 1}");