1
0
mirror of synced 2025-12-23 21:03:15 -05:00

🐛 Source MongoDB : Improve our discovery phase to not create large BSON objects (#32886)

Co-authored-by: akashkulk <akashkulk@users.noreply.github.com>
This commit is contained in:
Akash Kulkarni
2023-11-30 06:49:15 -08:00
committed by GitHub
parent 6671bced15
commit 1a09927d3e
5 changed files with 10 additions and 23 deletions

View File

@@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.0.9
dockerImageTag: 1.0.10
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2

View File

@@ -130,7 +130,7 @@ public class MongoUtil {
final OptionalInt sizeFromConfig = config.getQueueSize();
if (sizeFromConfig.isPresent()) {
int size = sizeFromConfig.getAsInt();
final int size = sizeFromConfig.getAsInt();
if (size < MIN_QUEUE_SIZE) {
LOGGER.warn("Queue size is overridden to {} , which is the min allowed for safety.",
MIN_QUEUE_SIZE);
@@ -231,8 +231,7 @@ public class MongoUtil {
final Document arrayToObjectAggregation = new Document("$arrayToObject", mapFunction);
final Map<String, Object> groupMap = new HashMap<>();
groupMap.put("_id", null);
groupMap.put("fields", Map.of("$addToSet", "$fields"));
groupMap.put("_id", "$fields");
final List<Bson> aggregateList = new ArrayList<>();
/*
@@ -248,15 +247,14 @@ public class MongoUtil {
* Runs the following aggregation query: db.<collection name>.aggregate( [ { "$sample": { "size" :
* 10000 } }, { "$project" : { "fields" : { "$arrayToObject": { "$map" : { "input" : {
* "$objectToArray" : "$$ROOT" }, "as" : "each", "in" : { "k" : "$$each.k", "v" : { "$type" :
* "$$each.v" } } } } } } }, { "$unwind" : "$fields" }, { "$group" : { "_id" : null, "fields" : {
* "$addToSet" : "$fields" } } } ] )
* "$$each.v" } } } } } } }, { "$unwind" : "$fields" }, { "$group" : { "_id" : $fields } } ] )
*/
final AggregateIterable<Document> output = collection.aggregate(aggregateList);
try (final MongoCursor<Document> cursor = output.allowDiskUse(true).cursor()) {
while (cursor.hasNext()) {
@SuppressWarnings("unchecked")
final Map<String, String> fields = ((List<Map<String, String>>) cursor.next().get("fields")).get(0);
final Map<String, String> fields = (Map<String, String>) cursor.next().get("_id");
discoveredFields.addAll(fields.entrySet().stream()
.map(e -> new MongoField(e.getKey(), convertToSchemaType(e.getValue())))
.collect(Collectors.toSet()));

View File

@@ -1,8 +1,6 @@
[
{
"_id": null,
"fields": [
{
"_id": {
"_id": "string",
"name": "string",
"last_updated": "date",
@@ -12,12 +10,9 @@
"owners": "object",
"amount": "null"
}
]
},
{
"_id": null,
"fields": [
{
"_id": {
"_id": "string",
"name": "string",
"last_updated": "date",
@@ -27,6 +22,5 @@
"owners": "object",
"other": "string"
}
]
}
]

View File

@@ -1,8 +1,6 @@
[
{
"_id": null,
"fields": [
{
"_id": {
"_id": "string",
"name": "string",
"last_updated": "date",
@@ -11,12 +9,9 @@
"items": "array",
"owners": "object"
}
]
},
{
"_id": null,
"fields": [
{
"_id": {
"_id": "string",
"name": "string",
"last_updated": "date",
@@ -26,6 +21,5 @@
"owners": "object",
"other": "string"
}
]
}
]