1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🎉 New source: MongoDb ported to java (#5530)

* New abstraction for NoSql database sources

* New MongoDbSource: partial impl

* Added MongoDataType

* Improved MongoDatabase and fixed read method

* code review changes;

* merge clean up;

* Renamed NoSqlDatabase to AbstractDatabase

* formatter changes;

* code review changes: changed mongodb-new to mongodb-v2; left only new connector info in all docs

* code review changes: changed mongodb-new to mongodb-v2; left only new connector info in all docs

* updated spec.json and toDatabaseConfig() method

* updated doc accordingly to spec.json changes

Co-authored-by: Iryna Kruk <iryna.o.kruk@globallogic.com>
This commit is contained in:
irynakruk
2021-09-09 08:14:16 -04:00
committed by GitHub
parent 984cebe129
commit ffecc1c9a2
24 changed files with 1571 additions and 478 deletions

View File

@@ -1,8 +0,0 @@
{
"sourceDefinitionId": "487b930d-7f6a-43ce-8bac-46e6b2de0a55",
"name": "Mongo DB",
"dockerRepository": "airbyte/source-mongodb",
"dockerImageTag": "0.3.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb",
"icon": "mongodb.svg"
}

View File

@@ -0,0 +1,8 @@
{
"sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e",
"name": "MongoDb",
"dockerRepository": "airbyte/source-mongodb-v2",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"icon": "mongodb.svg"
}

View File

@@ -251,12 +251,6 @@
dockerImageTag: 0.2.4
documentationUrl: https://docs.airbyte.io/integrations/sources/appstore
icon: appstore.svg
- sourceDefinitionId: 487b930d-7f6a-43ce-8bac-46e6b2de0a55
name: Mongo DB
dockerRepository: airbyte/source-mongodb
dockerImageTag: 0.3.3
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb
icon: mongodb.svg
- sourceDefinitionId: d19ae824-e289-4b14-995a-0632eb46d246
name: Google Directory
dockerRepository: airbyte/source-google-directory
@@ -464,3 +458,9 @@
dockerRepository: airbyte/source-amazon-ads
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-ads
- sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
name: MongoDb
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg

View File

@@ -22,6 +22,9 @@ dependencies {
// Lombok
implementation 'org.projectlombok:lombok:1.18.20'
annotationProcessor('org.projectlombok:lombok:1.18.20')
// MongoDB
compile 'org.mongodb:mongodb-driver-sync:4.3.0'
}
task(newConfigsMigration, dependsOn: 'classes', type: JavaExec) {

View File

@@ -0,0 +1,50 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.db;
import com.fasterxml.jackson.databind.JsonNode;
public abstract class AbstractDatabase implements AutoCloseable {
private JsonNode sourceConfig;
private JsonNode databaseConfig;
public JsonNode getSourceConfig() {
return sourceConfig;
}
public void setSourceConfig(JsonNode sourceConfig) {
this.sourceConfig = sourceConfig;
}
public JsonNode getDatabaseConfig() {
return databaseConfig;
}
public void setDatabaseConfig(JsonNode databaseConfig) {
this.databaseConfig = databaseConfig;
}
}

View File

@@ -32,6 +32,7 @@ import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.db.mongodb.MongoDatabase;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.dbcp2.BasicDataSource;
@@ -202,4 +203,8 @@ public class Databases {
return new BigQueryDatabase(projectId, jsonCreds);
}
public static MongoDatabase createMongoDatabase(final String connectionString, final String databaseName) {
return new MongoDatabase(connectionString, databaseName);
}
}

View File

@@ -27,29 +27,10 @@ package io.airbyte.db;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.stream.Stream;
public abstract class SqlDatabase implements AutoCloseable {
private JsonNode sourceConfig;
private JsonNode databaseConfig;
public abstract class SqlDatabase extends AbstractDatabase {
public abstract void execute(String sql) throws Exception;
public abstract Stream<JsonNode> query(String sql, String... params) throws Exception;
public JsonNode getSourceConfig() {
return sourceConfig;
}
public void setSourceConfig(JsonNode sourceConfig) {
this.sourceConfig = sourceConfig;
}
public JsonNode getDatabaseConfig() {
return databaseConfig;
}
public void setDatabaseConfig(JsonNode databaseConfig) {
this.databaseConfig = databaseConfig;
}
}

View File

@@ -0,0 +1,144 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.db.mongodb;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.ConnectionString;
import com.mongodb.ReadConcern;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoIterable;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.AbstractDatabase;
import java.util.List;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MongoDatabase extends AbstractDatabase {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDatabase.class);
private static final int BATCH_SIZE = 1000;
private final ConnectionString connectionString;
private final String databaseName;
private MongoClient mongoClient;
public MongoDatabase(String uri, String databaseName) {
try {
connectionString = new ConnectionString(uri);
mongoClient = MongoClients.create(connectionString);
this.databaseName = databaseName;
} catch (Exception e) {
LOGGER.error(e.getMessage());
throw new RuntimeException(e);
}
}
@Override
public void close() throws Exception {
mongoClient.close();
}
public com.mongodb.client.MongoDatabase getDatabase() {
return mongoClient.getDatabase(databaseName);
}
public MongoIterable<String> getCollectionNames() {
return getDatabase().listCollectionNames();
}
public MongoCollection<Document> getCollection(String collectionName) {
return getDatabase().getCollection(collectionName)
.withReadConcern(ReadConcern.MAJORITY);
}
@VisibleForTesting
public MongoCollection<Document> createCollection(String name) {
getDatabase().createCollection(name);
return getDatabase().getCollection(name);
}
@VisibleForTesting
public String getName() {
return getDatabase().getName();
}
public Stream<JsonNode> read(String collectionName, List<String> columnNames, Optional<Bson> filter) {
try {
final MongoCollection<Document> collection = getDatabase().getCollection(collectionName);
final MongoCursor<Document> cursor = collection
.find(filter.orElse(new BsonDocument()))
.batchSize(BATCH_SIZE)
.cursor();
return getStream(cursor, (document) -> MongoUtils.toJsonNode(document, columnNames))
.onClose(() -> {
try {
cursor.close();
} catch (Exception e) {
throw new RuntimeException();
}
});
} catch (Exception e) {
LOGGER.error("Exception attempting to read data from collection: ", collectionName, e.getMessage());
throw new RuntimeException(e);
}
}
private Stream<JsonNode> getStream(MongoCursor<Document> cursor, CheckedFunction<Document, JsonNode, Exception> mapper) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super JsonNode> action) {
try {
Document document = cursor.tryNext();
if (document == null) {
return false;
}
action.accept(mapper.apply(document));
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}, false);
}
}

View File

@@ -0,0 +1,187 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.db.mongodb;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.bson.BsonBinary;
import org.bson.BsonDocument;
import org.bson.BsonDocumentReader;
import org.bson.BsonReader;
import org.bson.BsonType;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Decimal128;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MongoUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoUtils.class);
private static final int DISCOVERY_BATCH_SIZE = 10000;
private static final String AIRBYTE_SUFFIX = "_aibyte_transform";
public static JsonSchemaPrimitive getType(final BsonType dataType) {
return switch (dataType) {
case BOOLEAN -> JsonSchemaPrimitive.BOOLEAN;
case INT32, INT64, DOUBLE, DECIMAL128 -> JsonSchemaPrimitive.NUMBER;
case STRING, SYMBOL, BINARY, DATE_TIME, TIMESTAMP, OBJECT_ID, REGULAR_EXPRESSION, JAVASCRIPT, JAVASCRIPT_WITH_SCOPE -> JsonSchemaPrimitive.STRING;
case ARRAY -> JsonSchemaPrimitive.ARRAY;
case DOCUMENT -> JsonSchemaPrimitive.OBJECT;
default -> JsonSchemaPrimitive.STRING;
};
}
public static JsonNode toJsonNode(final Document document, final List<String> columnNames) {
final ObjectNode objectNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
readBson(document, objectNode, columnNames);
return objectNode;
}
private static void readBson(final Document document, final ObjectNode o, final List<String> columnNames) {
final BsonDocument bsonDocument = toBsonDocument(document);
try (BsonReader reader = new BsonDocumentReader(bsonDocument)) {
reader.readStartDocument();
while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
final var fieldName = reader.readName();
final var fieldType = reader.getCurrentBsonType();
switch (fieldType) {
case BOOLEAN -> o.put(fieldName, reader.readBoolean());
case INT32 -> o.put(fieldName, reader.readInt32());
case INT64 -> o.put(fieldName, reader.readInt64());
case DOUBLE -> o.put(fieldName, reader.readDouble());
case DECIMAL128 -> o.put(fieldName, toDouble(reader.readDecimal128()));
case TIMESTAMP -> o.put(fieldName, toString(reader.readTimestamp()));
case DATE_TIME -> o.put(fieldName, DataTypeUtils.toISO8601String(reader.readDateTime()));
case BINARY -> o.put(fieldName, toByteArray(reader.readBinaryData()));
case SYMBOL -> o.put(fieldName, reader.readSymbol());
case STRING -> o.put(fieldName, reader.readString());
case OBJECT_ID -> o.put(fieldName, toString(reader.readObjectId()));
case JAVASCRIPT -> o.put(fieldName, reader.readJavaScript());
case JAVASCRIPT_WITH_SCOPE -> o.put(fieldName, reader.readJavaScriptWithScope());
case REGULAR_EXPRESSION -> o.put(fieldName, toString(reader.readRegularExpression()));
case DOCUMENT -> o.put(fieldName, documentToString(document.get(fieldName), reader));
case ARRAY -> o.put(fieldName, arrayToString(document.get(fieldName), reader));
default -> reader.skipValue();
}
if (columnNames.contains(fieldName + AIRBYTE_SUFFIX)) {
o.put(fieldName, o.get(fieldName).asText());
}
}
reader.readEndDocument();
} catch (Exception e) {
LOGGER.error("Exception while parsing BsonDocument: ", e.getMessage());
throw new RuntimeException(e);
}
}
/**
* Gets 10.000 documents from collection, gathers all unique fields and its type. In case when one field has different types in 2 and more
* documents, the type is set to String.
*
* @param collection mongo collection
* @return map of unique fields and its type
*/
public static Map<String, BsonType> getUniqueFields(MongoCollection<Document> collection) {
Map<String, BsonType> uniqueFields = new HashMap<>();
try (MongoCursor<Document> cursor = collection.find().batchSize(DISCOVERY_BATCH_SIZE).iterator()) {
while (cursor.hasNext()) {
BsonDocument document = toBsonDocument(cursor.next());
try (BsonReader reader = new BsonDocumentReader(document)) {
reader.readStartDocument();
while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
var fieldName = reader.readName();
var fieldType = reader.getCurrentBsonType();
reader.skipValue();
if (uniqueFields.containsKey(fieldName) && fieldType.compareTo(uniqueFields.get(fieldName)) != 0) {
uniqueFields.replace(fieldName + AIRBYTE_SUFFIX, BsonType.STRING);
} else {
uniqueFields.put(fieldName, fieldType);
}
}
reader.readEndDocument();
}
}
}
return uniqueFields;
}
private static BsonDocument toBsonDocument(final Document document) {
try {
return document.toBsonDocument(BsonDocument.class, Bson.DEFAULT_CODEC_REGISTRY);
} catch (Exception e) {
LOGGER.error("Exception while converting Document to BsonDocument: ", e.getMessage());
throw new RuntimeException(e);
}
}
private static String toString(Object value) {
return value == null ? null : value.toString();
}
private static Double toDouble(Decimal128 value) {
return value == null ? null : value.doubleValue();
}
private static byte[] toByteArray(BsonBinary value) {
return value == null ? null : value.getData();
}
// temporary method for MVP
private static String documentToString(Object obj, BsonReader reader) {
try {
reader.skipValue();
Document document = (Document) obj;
return document.toJson();
} catch (Exception e) {
LOGGER.error("Failed to convert document to a String: ", e.getMessage());
return null;
}
}
// temporary method for MVP
private static String arrayToString(Object obj, BsonReader reader) {
try {
reader.skipValue();
return obj.toString();
} catch (Exception e) {
LOGGER.error("Failed to convert array to a String: ", e.getMessage());
return null;
}
}
}

View File

@@ -48,7 +48,7 @@
| Microsoft SQL Server \(MSSQL\) | [![source-mssql](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mssql%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mssql) |
| Microsoft Teams | [![source-microsoft-teams](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-microsoft-teams%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-microsoft-teams) |
| Mixpanel | [![source-mixpanel](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mixpanel%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mixpanel) |
| Mongo DB | [![source-mongodb](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mongodb%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mongodb) |
| Mongo DB | [![source-mongodb-v2](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mongodb-v2%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mongodb-v2) |
| MySQL | [![source-mysql](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mysql%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mysql) |
| Oracle DB | [![source-oracle](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-oracle%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-oracle) |
| Paypal Transaction | [![paypal-transaction](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-paypal-transaction%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-paypal-transaction) |

View File

@@ -140,6 +140,12 @@ public class BigQuerySource extends AbstractRelationalDbSource<StandardSQLTypeNa
return result;
}
@Override
protected List<TableInfo<CommonField<StandardSQLTypeName>>> discoverInternal(BigQueryDatabase database, String schema) throws Exception {
// todo to be added
return discoverInternal(database);
}
@Override
protected Map<String, List<String>> discoverPrimaryKeys(BigQueryDatabase database, List<TableInfo<CommonField<StandardSQLTypeName>>> tableInfos) {
return Collections.emptyMap();

View File

@@ -0,0 +1,3 @@
*
!Dockerfile
!build

View File

@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev
WORKDIR /airbyte
ENV APPLICATION source-mongodb-v2
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-mongodb-v2

View File

@@ -0,0 +1,35 @@
# MongoDb Source
## Documentation
This is the repository for the MongoDb source connector in Java.
For information about how to use this connector within Airbyte, see [User Documentation](https://docs.airbyte.io/integrations/sources/mongodb-v2)
## Local development
#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-mongodb-v2:build
```
### Locally running the connector docker image
#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:source-mongodb-v2:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.
## Testing
We use `JUnit` for Java tests.
### Test Configuration
No specific configuration needed for testing, MongoDb Test Container is used.
#### Acceptance Tests
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:source-mongodb-v2:integrationTest
```

View File

@@ -0,0 +1,25 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}
application {
mainClass = 'io.airbyte.integrations.source.mongodb.MongoDbSource'
}
dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation project(':airbyte-integrations:connectors:source-relational-db')
compile 'org.mongodb:mongodb-driver-sync:4.3.0'
testImplementation "org.testcontainers:mongodb:1.15.3"
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mongodb-v2')
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}

View File

@@ -0,0 +1,225 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mongodb;
import static com.mongodb.client.model.Filters.gt;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.mongodb.client.MongoCollection;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.Databases;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.db.mongodb.MongoUtils;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.AbstractDbSource;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonType;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MongoDbSource extends AbstractDbSource<BsonType, MongoDatabase> {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);
private static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/?";
private static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&";
private static final String MONGODB_REPLICA_URL = "mongodb://%s%s/?replicaSet=%s&";
private static final String USER = "user";
private static final String PASSWORD = "password";
private static final String INSTANCE_TYPE = "instance_type";
private static final String HOST = "host";
private static final String PORT = "port";
private static final String CLUSTER_URL = "cluster_url";
private static final String DATABASE = "database";
private static final String SERVER_ADDRESSES = "server_addresses";
private static final String REPLICA_SET = "replica_set";
private static final String AUTH_SOURCE = "auth_source";
private static final String TLS = "tls";
private static final String PRIMARY_KEY = "_id";
public static void main(String[] args) throws Exception {
final Source source = new MongoDbSource();
LOGGER.info("starting source: {}", MongoDbSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed source: {}", MongoDbSource.class);
}
@Override
public JsonNode toDatabaseConfig(JsonNode config) {
var credentials = config.has(USER) && config.has(PASSWORD)
? String.format("%s:%s@", config.get(USER).asText(), config.get(PASSWORD).asText())
: StringUtils.EMPTY;
JsonNode instanceConfig = config.get(INSTANCE_TYPE);
String instanceConnectUrl;
if (instanceConfig.has(HOST) && instanceConfig.has(PORT)) {
instanceConnectUrl = String.format(MONGODB_SERVER_URL,
credentials, instanceConfig.get(HOST).asText(), instanceConfig.get(PORT).asText());
} else if (instanceConfig.has(CLUSTER_URL)) {
instanceConnectUrl = String.format(MONGODB_CLUSTER_URL,
credentials, instanceConfig.get(CLUSTER_URL).asText(), config.get(DATABASE).asText());
} else {
instanceConnectUrl = String.format(MONGODB_REPLICA_URL,
credentials, instanceConfig.get(SERVER_ADDRESSES).asText(), config.get(REPLICA_SET).asText());
}
String options = "authSource=".concat(config.get(AUTH_SOURCE).asText());
if (config.get(TLS).asBoolean()) {
options.concat("&tls=true");
}
return Jsons.jsonNode(ImmutableMap.builder()
.put("connectionString", instanceConnectUrl + options)
.put("database", config.get(DATABASE).asText())
.build());
}
@Override
protected MongoDatabase createDatabase(JsonNode config) throws Exception {
var dbConfig = toDatabaseConfig(config);
return Databases.createMongoDatabase(dbConfig.get("connectionString").asText(),
dbConfig.get("database").asText());
}
@Override
public List<CheckedConsumer<MongoDatabase, Exception>> getCheckOperations(JsonNode config)
throws Exception {
List<CheckedConsumer<MongoDatabase, Exception>> checkList = new ArrayList<>();
checkList.add(database -> {
if (database.getCollectionNames() == null || database.getCollectionNames().first() == null) {
throw new Exception("Unable to execute any operation on the source!");
} else {
LOGGER.info("The source passed the basic operation test!");
}
});
return checkList;
}
@Override
protected JsonSchemaPrimitive getType(BsonType fieldType) {
return MongoUtils.getType(fieldType);
}
@Override
public Set<String> getExcludedInternalNameSpaces() {
return Collections.emptySet();
}
@Override
protected List<TableInfo<CommonField<BsonType>>> discoverInternal(MongoDatabase database)
throws Exception {
List<TableInfo<CommonField<BsonType>>> tableInfos = new ArrayList<>();
for (String collectionName : database.getCollectionNames()) {
MongoCollection<Document> collection = database.getCollection(collectionName);
Map<String, BsonType> uniqueFields = MongoUtils.getUniqueFields(collection);
List<CommonField<BsonType>> fields = uniqueFields.keySet().stream()
.map(field -> new CommonField<>(field, uniqueFields.get(field)))
.collect(Collectors.toList());
// The field name _id is reserved for use as a primary key;
TableInfo<CommonField<BsonType>> tableInfo = TableInfo.<CommonField<BsonType>>builder()
.nameSpace(database.getName())
.name(collectionName)
.fields(fields)
.primaryKeys(List.of(PRIMARY_KEY))
.build();
tableInfos.add(tableInfo);
}
return tableInfos;
}
@Override
protected List<TableInfo<CommonField<BsonType>>> discoverInternal(MongoDatabase database, String schema) throws Exception {
// MondoDb doesn't support schemas
return discoverInternal(database);
}
@Override
protected Map<String, List<String>> discoverPrimaryKeys(MongoDatabase database,
List<TableInfo<CommonField<BsonType>>> tableInfos) {
return tableInfos.stream()
.collect(Collectors.toMap(
TableInfo::getName,
TableInfo::getPrimaryKeys));
}
@Override
protected String getQuoteString() {
return "";
}
@Override
public AutoCloseableIterator<JsonNode> queryTableFullRefresh(MongoDatabase database,
List<String> columnNames,
String schemaName,
String tableName) {
return queryTable(database, columnNames, tableName, null);
}
@Override
public AutoCloseableIterator<JsonNode> queryTableIncremental(MongoDatabase database,
List<String> columnNames,
String schemaName,
String tableName,
String cursorField,
BsonType cursorFieldType,
String cursor) {
Bson greaterComparison = gt(cursorField, cursor);
return queryTable(database, columnNames, tableName, greaterComparison);
}
private AutoCloseableIterator<JsonNode> queryTable(MongoDatabase database, List<String> columnNames, String tableName, Bson filter) {
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<JsonNode> stream = database.read(tableName, columnNames, Optional.ofNullable(filter));
return AutoCloseableIterators.fromStream(stream);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@@ -0,0 +1,111 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"changelogUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MongoDb Source Spec",
"type": "object",
"required": ["database"],
"additionalProperties": false,
"properties": {
"instance_type": {
"type": "object",
"title": "MongoDb instance type",
"description": "MongoDb instance to connect to.",
"order": 0,
"oneOf": [
{
"title": "Standalone MongoDb Instance",
"additionalProperties": false,
"required": ["host", "port"],
"properties": {
"host": {
"title": "Host",
"type": "string",
"description": "Host of a Mongo database to be replicated.",
"order": 0
},
"port": {
"title": "Port",
"type": "integer",
"description": "Port of a Mongo database to be replicated.",
"minimum": 0,
"maximum": 65536,
"default": 27017,
"examples": ["27017"],
"order": 1
}
}
},
{
"title": "Replica Set",
"additionalProperties": false,
"required": ["server_addresses", "replica_set"],
"properties": {
"server_addresses": {
"title": "Server addresses",
"type": "string",
"description": "The members of a replica set. Please specify `host`:`port` of each member seperated by comma.",
"examples": ["host1:27017,host2:27017,host3:27017"],
"order": 0
},
"replica_set": {
"title": "Replica Set",
"type": "string",
"description": "A replica set name.",
"order": 1
}
}
},
{
"title": "MongoDB Atlas",
"additionalProperties": false,
"required": ["cluster_url"],
"properties": {
"cluster_url": {
"title": "Cluster URL",
"type": "string",
"description": "URL of a cluster to connect to.",
"order": 0
}
}
}
]
},
"database": {
"title": "Database name",
"type": "string",
"description": "Database to be replicated.",
"order": 1
},
"user": {
"title": "User",
"type": "string",
"description": "User",
"order": 2
},
"password": {
"title": "Password",
"type": "string",
"description": "Password",
"airbyte_secret": true,
"order": 3
},
"auth_source": {
"title": "Authentication source",
"type": "string",
"description": "Authentication source where user information is stored",
"default": "admin",
"examples": ["admin"],
"order": 4
},
"tls": {
"title": "TLS connection",
"type": "boolean",
"description": "If this switch is enabled, TLS connections will be used to connect to MongoDB.",
"default": false,
"order": 5
}
}
}
}

View File

@@ -0,0 +1,136 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mongodb.client.MongoCollection;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.bson.Document;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.utility.DockerImageName;
public class MongoDbSourceAcceptanceTest extends SourceAcceptanceTest {
private MongoDBContainer mongoDBContainer;
private JsonNode config;
private MongoDatabase database;
@Override
protected String getImageName() {
return "airbyte/source-mongodb-v2:dev";
}
@Override
protected JsonNode getConfig() throws Exception {
return config;
}
@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"));
mongoDBContainer.start();
final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("host", mongoDBContainer.getHost())
.put("port", mongoDBContainer.getFirstMappedPort())
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put("instance_type", instanceConfig)
.put("database", "test")
.put("auth_source", "admin")
.put("tls", false)
.build());
String connectionString = String.format("mongodb://%s:%s/",
mongoDBContainer.getHost(),
mongoDBContainer.getFirstMappedPort());
database = new MongoDatabase(connectionString, "test");
MongoCollection<Document> collection = database.createCollection("acceptance_test");
var doc1 = new Document("id", "0001").append("name", "Test");
var doc2 = new Document("id", "0002").append("name", "Mongo");
var doc3 = new Document("id", "0003").append("name", "Source");
collection.insertMany(List.of(doc1, doc2, doc3));
}
@Override
protected void tearDown(TestDestinationEnv testEnv) throws Exception {
database.close();
mongoDBContainer.close();
}
@Override
protected ConnectorSpecification getSpec() throws Exception {
return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);
}
@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("_id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withCursorField(List.of("_id"))
.withStream(CatalogHelpers.createAirbyteStream(
"test.acceptance_test",
Field.of("_id", JsonSchemaPrimitive.STRING),
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withDefaultCursorField(List.of("_id")))));
}
@Override
protected JsonNode getState() throws Exception {
return Jsons.jsonNode(new HashMap<>());
}
@Override
protected List<String> getRegexTests() throws Exception {
return Collections.emptyList();
}
}

View File

@@ -0,0 +1,502 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.relationaldb;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.type.Types;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.AbstractDatabase;
import io.airbyte.db.IncrementalUtils;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class contains helper functions and boilerplate for implementing a source connector for a
* NoSql DB source.
*/
public abstract class AbstractDbSource<DataType, Database extends AbstractDatabase> extends
BaseConnector implements Source {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDbSource.class);
@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
try (final Database database = createDatabaseInternal(config)) {
for (final CheckedConsumer<Database, Exception> checkOperation : getCheckOperations(config)) {
checkOperation.accept(database);
}
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.info("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect with provided configuration. Error: " + e.getMessage());
}
}
@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
try (final Database database = createDatabaseInternal(config)) {
final List<AirbyteStream> streams = getTables(database).stream()
.map(tableInfo -> CatalogHelpers
.createAirbyteStream(tableInfo.getName(), tableInfo.getNameSpace(),
tableInfo.getFields())
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Types.boxToListofList(tableInfo.getPrimaryKeys())))
.collect(Collectors.toList());
return new AirbyteCatalog().withStreams(streams);
}
}
@Override
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final JsonNode state)
throws Exception {
final StateManager stateManager = new StateManager(
state == null ? StateManager.emptyState() : Jsons.object(state, DbState.class),
catalog);
final Instant emittedAt = Instant.now();
final Database database = createDatabaseInternal(config);
final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo =
discoverWithoutSystemTables(database)
.stream()
.collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function
.identity()));
final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt);
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators =
getFullRefreshIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt);
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = Stream
.of(incrementalIterators, fullRefreshIterators)
.flatMap(Collection::stream)
.collect(Collectors.toList());
return AutoCloseableIterators
.appendOnClose(AutoCloseableIterators.concatWithEagerClose(iteratorList), () -> {
LOGGER.info("Closing database connection pool.");
Exceptions.toRuntime(database::close);
LOGGER.info("Closed database connection pool.");
});
}
protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(final Database database) throws Exception {
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces();
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database);
return (systemNameSpaces == null || systemNameSpaces.isEmpty() ? discoveredTables
: discoveredTables.stream().filter(table -> !systemNameSpaces.contains(table.getNameSpace())).collect(
Collectors.toList()));
}
protected List<AutoCloseableIterator<AirbyteMessage>> getFullRefreshIterators(final Database database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
return getSelectedIterators(
database,
catalog,
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.FULL_REFRESH));
}
protected List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final Database database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
return getSelectedIterators(
database,
catalog,
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.INCREMENTAL));
}
protected List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(final Database database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt,
final Predicate<ConfiguredAirbyteStream> selector) {
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
if (selector.test(airbyteStream)) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!tableNameToTable.containsKey(fullyQualifiedTableName)) {
LOGGER
.info("Skipping stream {} because it is not in the source", fullyQualifiedTableName);
continue;
}
final TableInfo<CommonField<DataType>> table = tableNameToTable
.get(fullyQualifiedTableName);
final AutoCloseableIterator<AirbyteMessage> tableReadIterator = createReadIterator(
database,
airbyteStream,
table,
stateManager,
emittedAt);
iteratorList.add(tableReadIterator);
}
}
return iteratorList;
}
protected AutoCloseableIterator<AirbyteMessage> createReadIterator(final Database database,
final ConfiguredAirbyteStream airbyteStream,
final TableInfo<CommonField<DataType>> table,
final StateManager stateManager,
final Instant emittedAt) {
final String streamName = airbyteStream.getStream().getName();
final String namespace = airbyteStream.getStream().getNamespace();
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, namespace);
final Set<String> selectedFieldsInCatalog = CatalogHelpers.getTopLevelFieldNames(airbyteStream);
final List<String> selectedDatabaseFields = table.getFields()
.stream()
.map(CommonField::getName)
.filter(selectedFieldsInCatalog::contains)
.collect(Collectors.toList());
final AutoCloseableIterator<AirbyteMessage> iterator;
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
final String cursorField = IncrementalUtils.getCursorField(airbyteStream);
final Optional<String> cursorOptional = stateManager.getCursor(pair);
final AutoCloseableIterator<AirbyteMessage> airbyteMessageIterator;
if (cursorOptional.isPresent()) {
airbyteMessageIterator = getIncrementalStream(database, airbyteStream, selectedDatabaseFields, table, cursorOptional.get(), emittedAt);
} else {
// if no cursor is present then this is the first read for is the same as doing a full refresh read.
airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt);
}
final JsonSchemaPrimitive cursorType = IncrementalUtils
.getCursorType(airbyteStream, cursorField);
iterator = AutoCloseableIterators.transform(autoCloseableIterator -> new StateDecoratingIterator(
autoCloseableIterator,
stateManager,
pair,
cursorField,
cursorOptional.orElse(null),
cursorType),
airbyteMessageIterator);
} else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
iterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt);
} else if (airbyteStream.getSyncMode() == null) {
throw new IllegalArgumentException(String.format("%s requires a source sync mode", this.getClass()));
} else {
throw new IllegalArgumentException(String.format("%s does not support sync mode: %s.", this.getClass(), airbyteStream.getSyncMode()));
}
final AtomicLong recordCount = new AtomicLong();
return AutoCloseableIterators.transform(iterator, r -> {
final long count = recordCount.incrementAndGet();
if (count % 10000 == 0) {
LOGGER.info("Reading stream {}. Records read: {}", streamName, count);
}
return r;
});
}
protected AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Database database,
final ConfiguredAirbyteStream airbyteStream,
final List<String> selectedDatabaseFields,
final TableInfo<CommonField<DataType>> table,
final String cursor,
final Instant emittedAt) {
final String streamName = airbyteStream.getStream().getName();
final String namespace = airbyteStream.getStream().getNamespace();
final String cursorField = IncrementalUtils.getCursorField(airbyteStream);
final DataType cursorType = table.getFields().stream()
.filter(info -> info.getName().equals(cursorField))
.map(CommonField::getType)
.findFirst()
.orElseThrow();
Preconditions.checkState(table.getFields().stream().anyMatch(f -> f.getName().equals(cursorField)),
String.format("Could not find cursor field %s in table %s", cursorField, table.getName()));
final AutoCloseableIterator<JsonNode> queryIterator = queryTableIncremental(
database,
selectedDatabaseFields,
table.getNameSpace(),
table.getName(),
cursorField,
cursorType,
cursor);
return getMessageIterator(queryIterator, streamName, namespace, emittedAt.toEpochMilli());
}
protected AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Database database,
final String streamName,
final String namespace,
final List<String> selectedDatabaseFields,
final TableInfo<CommonField<DataType>> table,
final Instant emittedAt) {
final AutoCloseableIterator<JsonNode> queryStream =
queryTableFullRefresh(database, selectedDatabaseFields, table.getNameSpace(), table.getName());
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
}
protected String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
return nameSpace != null ? nameSpace + "." + tableName : tableName;
}
public AutoCloseableIterator<AirbyteMessage> getMessageIterator(final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(namespace)
.withEmittedAt(emittedAt)
.withData(r)));
}
/**
* Get list of source tables/data structures for schema discovery.
*
* @param database instance
* @return list of table/data structure info
* @throws Exception might throw an error during connection to database
*/
protected List<TableInfo<Field>> getTables(final Database database) throws Exception {
final List<TableInfo<CommonField<DataType>>> tableInfos = discoverWithoutSystemTables(database);
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys(database, tableInfos);
return tableInfos.stream()
.map(t -> {
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
// records are actually the same and provide a good error message if they are not.
assertColumnsWithSameNameAreSame(t.getNameSpace(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(f -> Field.of(f.getName(), getType(f.getType())))
.distinct()
.collect(Collectors.toList());
final String fullyQualifiedTableName = getFullyQualifiedTableName(t.getNameSpace(), t.getName());
final List<String> primaryKeys = fullyQualifiedTableNameToPrimaryKeys.getOrDefault(fullyQualifiedTableName, Collections
.emptyList());
return TableInfo.<Field>builder().nameSpace(t.getNameSpace()).name(t.getName()).fields(fields).primaryKeys(primaryKeys)
.build();
})
.collect(Collectors.toList());
}
protected void assertColumnsWithSameNameAreSame(final String nameSpace, final String tableName, final List<CommonField<DataType>> columns) {
columns.stream()
.collect(Collectors.groupingBy(CommonField<DataType>::getName))
.values()
.forEach(columnsWithSameName -> {
final CommonField<DataType> comparisonColumn = columnsWithSameName.get(0);
columnsWithSameName.forEach(column -> {
if (!column.equals(comparisonColumn)) {
throw new RuntimeException(
String.format("Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s",
comparisonColumn.getName(), nameSpace, tableName, columns));
}
});
});
}
/**
* Map a database implementation-specific configuration to json object that adheres to the database
* config spec. See resources/spec.json.
*
* @param config database implementation-specific configuration.
* @return database spec config
*/
public abstract JsonNode toDatabaseConfig(JsonNode config);
/**
* Creates a database instance using the database spec config.
*
* @param config database spec config
* @return database instance
* @throws Exception might throw an error during connection to database
*/
protected abstract Database createDatabase(JsonNode config) throws Exception;
/**
* Configures a list of operations that can be used to check the connection to the source.
*
* @return list of consumers that run queries for the check command.
*/
public abstract List<CheckedConsumer<Database, Exception>> getCheckOperations(JsonNode config)
throws Exception;
/**
* Map source types and Airbyte types
*
* @param columnType source data type
* @return airbyte data type
*/
protected abstract JsonSchemaPrimitive getType(DataType columnType);
/**
* Get list of system namespaces(schemas) in order to exclude them from the discover result list.
*
* @return set of system namespaces(schemas) to be excluded
*/
public abstract Set<String> getExcludedInternalNameSpaces();
/**
* Discover all available tables in the source database.
*
* @param database source database
* @return list of the source tables
* @throws Exception access to the database might lead to an exceptions.
*/
protected abstract List<TableInfo<CommonField<DataType>>> discoverInternal(
final Database database)
throws Exception;
/**
* Discovers all available tables within a schema in the source database.
*
* @param database - source database
* @param schema - source schema
* @return list of source tables
* @throws Exception - access to the database might lead to exceptions.
*/
protected abstract List<TableInfo<CommonField<DataType>>> discoverInternal(final Database database, String schema)
throws Exception;
/**
* Discover Primary keys for each table and @return a map of namespace.table name to their
* associated list of primary key fields.
*
* @param database source database
* @param tableInfos list of tables
* @return map of namespace.table and primary key fields.
*/
protected abstract Map<String, List<String>> discoverPrimaryKeys(Database database,
List<TableInfo<CommonField<DataType>>> tableInfos);
/**
* Returns quote symbol of the database
*
* @return quote symbol
*/
protected abstract String getQuoteString();
/**
* Read all data from a table.
*
* @param database source database
* @param columnNames interested column names
* @param schemaName table namespace
* @param tableName target table
* @return iterator with read data
*/
public abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Database database,
final List<String> columnNames,
final String schemaName,
final String tableName);
/**
* Read incremental data from a table. Incremental read should returns only records where cursor
* column value is bigger than cursor.
*
* @param database source database
* @param columnNames interested column names
* @param schemaName table namespace
* @param tableName target table
* @param cursorField cursor field name
* @param cursorFieldType cursor field type
* @param cursor cursor value
* @return iterator with read data
*/
public abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
List<String> columnNames,
String schemaName,
String tableName,
String cursorField,
DataType cursorFieldType,
String cursor);
private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception {
final Database database = createDatabase(sourceConfig);
database.setSourceConfig(sourceConfig);
database.setDatabaseConfig(toDatabaseConfig(sourceConfig));
return database;
}
}

View File

@@ -25,47 +25,12 @@
package io.airbyte.integrations.source.relationaldb;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.type.Types;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.IncrementalUtils;
import io.airbyte.db.SqlDatabase;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,358 +42,20 @@ import org.slf4j.LoggerFactory;
* @see io.airbyte.integrations.source.jdbc.AbstractJdbcSource if you are implementing a relational
* DB which can be accessed via JDBC driver.
*/
public abstract class AbstractRelationalDbSource<DataType, Database extends SqlDatabase> extends BaseConnector implements Source {
public abstract class AbstractRelationalDbSource<DataType, Database extends SqlDatabase> extends
AbstractDbSource<DataType, Database> implements Source {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRelationalDbSource.class);
/**
* Map a database implementation-specific configuration to json object that adheres to the database
* config spec. See resources/spec.json.
*
* @param config database implementation-specific configuration.
* @return database spec config
*/
public abstract JsonNode toDatabaseConfig(JsonNode config);
/**
* Creates a database instance using the database spec config.
*
* @param config database spec config
* @return database instance
* @throws Exception might throw an error during connection to database
*/
protected abstract Database createDatabase(JsonNode config) throws Exception;
/**
* Configures a list of operations that can be used to check the connection to the source.
*
* @return list of consumers that run queries for the check command.
*/
public abstract List<CheckedConsumer<Database, Exception>> getCheckOperations(JsonNode config) throws Exception;
/**
* Map source types and Airbyte types
*
* @param columnType source data type
* @return airbyte data type
*/
protected abstract JsonSchemaPrimitive getType(DataType columnType);
/**
* Get list of system namespaces(schemas) in order to exclude them from the discover result list.
*
* @return
*/
public abstract Set<String> getExcludedInternalNameSpaces();
/**
* Discover all available tables in the source database.
*
* @param database source database
* @return list of the source tables
* @throws Exception access to the database might lead to an exceptions.
*/
protected abstract List<TableInfo<CommonField<DataType>>> discoverInternal(final Database database)
throws Exception;
/**
* Discovers all available tables within a schema in the source database.
*
* @param database - source database
* @param schema - source schema
* @return list of source tables
* @throws Exception - access to the database might lead to exceptions.
*/
protected abstract List<TableInfo<CommonField<DataType>>> discoverInternal(final Database database, String schema)
throws Exception;
/**
* Discover Primary keys for each table and @return a map of namespace.table name to their
* associated list of primary key fields.
*
* @param database source database
* @param tableInfos list of tables
* @return map of namespace.table and primary key fields.
*/
protected abstract Map<String, List<String>> discoverPrimaryKeys(Database database,
List<TableInfo<CommonField<DataType>>> tableInfos);
/**
* Returns quote symbol of the database
*
* @return quote symbol
*/
protected abstract String getQuoteString();
@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
try (final Database database = createDatabaseInternal(config)) {
for (final CheckedConsumer<Database, Exception> checkOperation : getCheckOperations(config)) {
checkOperation.accept(database);
}
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.info("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect with provided configuration. Error: " + e.getMessage());
}
}
@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
try (final Database database = createDatabaseInternal(config)) {
final List<AirbyteStream> streams = getTables(database).stream()
.map(tableInfo -> CatalogHelpers
.createAirbyteStream(tableInfo.getName(), tableInfo.getNameSpace(), tableInfo.getFields())
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Types.boxToListofList(tableInfo.getPrimaryKeys())))
.collect(Collectors.toList());
return new AirbyteCatalog().withStreams(streams);
}
}
@Override
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state)
throws Exception {
final StateManager stateManager = new StateManager(
state == null ? StateManager.emptyState() : Jsons.object(state, DbState.class),
catalog);
final Instant emittedAt = Instant.now();
final Database database = createDatabaseInternal(config);
final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo =
discoverWithoutSystemTables(database)
.stream()
.collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function
.identity()));
final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt);
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators =
getFullRefreshIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt);
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = Stream
.of(incrementalIterators, fullRefreshIterators)
.flatMap(Collection::stream)
.collect(Collectors.toList());
return AutoCloseableIterators
.appendOnClose(AutoCloseableIterators.concatWithEagerClose(iteratorList), () -> {
LOGGER.info("Closing database connection pool.");
Exceptions.toRuntime(database::close);
LOGGER.info("Closed database connection pool.");
});
}
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final Database database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
return getSelectedIterators(
database,
catalog,
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.INCREMENTAL));
}
public List<AutoCloseableIterator<AirbyteMessage>> getFullRefreshIterators(final Database database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
return getSelectedIterators(
database,
catalog,
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.FULL_REFRESH));
}
protected List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(final Database database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt,
final Predicate<ConfiguredAirbyteStream> selector) {
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
if (selector.test(airbyteStream)) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(), stream.getName());
if (!tableNameToTable.containsKey(fullyQualifiedTableName)) {
LOGGER.info("Skipping stream {} because it is not in the source", fullyQualifiedTableName);
continue;
}
final TableInfo<CommonField<DataType>> table = tableNameToTable.get(fullyQualifiedTableName);
final AutoCloseableIterator<AirbyteMessage> tableReadIterator = createReadIterator(
database,
airbyteStream,
table,
stateManager,
emittedAt);
iteratorList.add(tableReadIterator);
}
}
return iteratorList;
}
protected AutoCloseableIterator<AirbyteMessage> createReadIterator(final Database database,
final ConfiguredAirbyteStream airbyteStream,
final TableInfo<CommonField<DataType>> table,
final StateManager stateManager,
final Instant emittedAt) {
final String streamName = airbyteStream.getStream().getName();
final String namespace = airbyteStream.getStream().getNamespace();
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, namespace);
final Set<String> selectedFieldsInCatalog = CatalogHelpers.getTopLevelFieldNames(airbyteStream);
final List<String> selectedDatabaseFields = table.getFields()
.stream()
.map(CommonField::getName)
.filter(selectedFieldsInCatalog::contains)
.collect(Collectors.toList());
final AutoCloseableIterator<AirbyteMessage> iterator;
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
final String cursorField = IncrementalUtils.getCursorField(airbyteStream);
final Optional<String> cursorOptional = stateManager.getCursor(pair);
final AutoCloseableIterator<AirbyteMessage> airbyteMessageIterator;
if (cursorOptional.isPresent()) {
airbyteMessageIterator = getIncrementalStream(database, airbyteStream, selectedDatabaseFields, table, cursorOptional.get(), emittedAt);
} else {
// if no cursor is present then this is the first read for is the same as doing a full refresh read.
airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt);
}
final JsonSchemaPrimitive cursorType = IncrementalUtils
.getCursorType(airbyteStream, cursorField);
iterator = AutoCloseableIterators.transform(autoCloseableIterator -> new StateDecoratingIterator(
autoCloseableIterator,
stateManager,
pair,
cursorField,
cursorOptional.orElse(null),
cursorType),
airbyteMessageIterator);
} else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
iterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt);
} else if (airbyteStream.getSyncMode() == null) {
throw new IllegalArgumentException(String.format("%s requires a source sync mode", this.getClass()));
} else {
throw new IllegalArgumentException(String.format("%s does not support sync mode: %s.", this.getClass(), airbyteStream.getSyncMode()));
}
final AtomicLong recordCount = new AtomicLong();
return AutoCloseableIterators.transform(iterator, r -> {
final long count = recordCount.incrementAndGet();
if (count % 10000 == 0) {
LOGGER.info("Reading stream {}. Records read: {}", streamName, count);
}
return r;
});
}
protected AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Database database,
final ConfiguredAirbyteStream airbyteStream,
final List<String> selectedDatabaseFields,
final TableInfo<CommonField<DataType>> table,
final String cursor,
final Instant emittedAt) {
final String streamName = airbyteStream.getStream().getName();
final String namespace = airbyteStream.getStream().getNamespace();
final String cursorField = IncrementalUtils.getCursorField(airbyteStream);
final DataType cursorType = table.getFields().stream()
.filter(info -> info.getName().equals(cursorField))
.map(CommonField::getType)
.findFirst()
.orElseThrow();
Preconditions.checkState(table.getFields().stream().anyMatch(f -> f.getName().equals(cursorField)),
String.format("Could not find cursor field %s in table %s", cursorField, table.getName()));
final AutoCloseableIterator<JsonNode> queryIterator = queryTableIncremental(
database,
selectedDatabaseFields,
table.getNameSpace(),
table.getName(),
cursorField,
cursorType,
cursor);
return getMessageIterator(queryIterator, streamName, namespace, emittedAt.toEpochMilli());
}
protected AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Database database,
final String streamName,
final String namespace,
final List<String> selectedDatabaseFields,
final TableInfo<CommonField<DataType>> table,
final Instant emittedAt) {
final AutoCloseableIterator<JsonNode> queryStream =
queryTableFullRefresh(database, selectedDatabaseFields, table.getNameSpace(), table.getName());
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
}
protected String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
return nameSpace != null ? nameSpace + "." + tableName : tableName;
}
protected List<TableInfo<Field>> getTables(final Database database) throws Exception {
final List<TableInfo<CommonField<DataType>>> tableInfos = discoverWithoutSystemTables(database);
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys(database, tableInfos);
return tableInfos.stream()
.map(t -> {
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
// records are actually the same and provide a good error message if they are not.
assertColumnsWithSameNameAreSame(t.getNameSpace(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(f -> Field.of(f.getName(), getType(f.getType())))
.distinct()
.collect(Collectors.toList());
final String fullyQualifiedTableName = getFullyQualifiedTableName(t.getNameSpace(), t.getName());
final List<String> primaryKeys = fullyQualifiedTableNameToPrimaryKeys.getOrDefault(fullyQualifiedTableName, Collections
.emptyList());
return TableInfo.<Field>builder().nameSpace(t.getNameSpace()).name(t.getName()).fields(fields).primaryKeys(primaryKeys)
.build();
})
.collect(Collectors.toList());
}
protected void assertColumnsWithSameNameAreSame(final String nameSpace, final String tableName, final List<CommonField<DataType>> columns) {
columns.stream()
.collect(Collectors.groupingBy(CommonField<DataType>::getName))
.values()
.forEach(columnsWithSameName -> {
final CommonField<DataType> comparisonColumn = columnsWithSameName.get(0);
columnsWithSameName.forEach(column -> {
if (!column.equals(comparisonColumn)) {
throw new RuntimeException(
String.format("Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s",
comparisonColumn.getName(), nameSpace, tableName, columns));
}
});
});
}
protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(final Database database) throws Exception {
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces();
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database);
return (systemNameSpaces == null || systemNameSpaces.isEmpty() ? discoveredTables
: discoveredTables.stream().filter(table -> !systemNameSpaces.contains(table.getNameSpace())).collect(
Collectors.toList()));
public AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Database database,
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames),
getFullTableName(schemaName, tableName)));
}
protected String getIdentifierWithQuoting(final String identifier) {
@@ -448,19 +75,6 @@ public abstract class AbstractRelationalDbSource<DataType, Database extends SqlD
: getIdentifierWithQuoting(nameSpace) + "." + getIdentifierWithQuoting(tableName));
}
public AutoCloseableIterator<AirbyteMessage> getMessageIterator(final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(namespace)
.withEmittedAt(emittedAt)
.withData(r)));
}
protected AutoCloseableIterator<JsonNode> queryTable(final Database database, final String sqlQuery) {
return AutoCloseableIterators.lazyIterator(() -> {
try {
@@ -472,42 +86,4 @@ public abstract class AbstractRelationalDbSource<DataType, Database extends SqlD
});
}
public AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Database database,
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames),
getFullTableName(schemaName, tableName)));
}
/**
* Read incremental data from a table. Incremental read should returns only records where cursor
* column value is bigger than cursor.
*
* @param database source database
* @param columnNames interested column names
* @param schemaName table namespace
* @param tableName target table
* @param cursorField cursor field name
* @param cursorFieldType cursor field type
* @param cursor cursor value
* @return iterator with read data
*/
public abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
List<String> columnNames,
String schemaName,
String tableName,
String cursorField,
DataType cursorFieldType,
String cursor);
private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception {
final Database database = createDatabase(sourceConfig);
database.setSourceConfig(sourceConfig);
database.setDatabaseConfig(toDatabaseConfig(sourceConfig));
return database;
}
}

View File

@@ -1,7 +0,0 @@
{
"sourceDefinitionId": "487b930d-7f6a-43ce-8bac-46e6b2de0a55",
"name": "Mongo DB",
"dockerRepository": "airbyte/source-mongodb",
"dockerImageTag": "0.2.0",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mongodb"
}

View File

@@ -83,7 +83,7 @@
* [Microsoft SQL Server \(MSSQL\)](integrations/sources/mssql.md)
* [Microsoft Teams](integrations/sources/microsoft-teams.md)
* [Mixpanel](integrations/sources/mixpanel.md)
* [Mongo DB](integrations/sources/mongodb.md)
* [Mongo DB](integrations/sources/mongodb-v2.md)
* [MySQL](integrations/sources/mysql.md)
* [Okta](integrations/sources/okta.md)
* [Oracle DB](integrations/sources/oracle.md)

View File

@@ -65,7 +65,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex
|[Microsoft Dynamics NAV](./sources/microsoft-dynamics-nav.md)| Beta |
|[Microsoft Teams](./sources/microsoft-teams.md)| Certified |
|[Mixpanel](./sources/mixpanel.md)| Beta |
|[Mongo DB](./sources/mongodb.md)| Alpha |
|[Mongo DB](./sources/mongodb-v2.md)| Beta |
|[MySQL](./sources/mysql.md)| Certified |
|[Okta](./sources/okta.md)| Beta |
|[Oracle DB](./sources/oracle.md)| Certified |

View File

@@ -0,0 +1,99 @@
# Mongo DB
The MongoDB source allows to sync data from MongoDb. Source supports Full Refresh and Incremental sync strategies.
## Resulting schema
MongoDB does not have anything like table definition, thus we have to define column types from actual attributes and their values. Discover phase have two steps:
### Step 1. Find all unique properties
Connector select 10k documents to collect all distinct field.
### Step 2. Determine property types
For each property found, connector determines its type, if all the selected values have the same type - connector will set appropriate type to the property. In all other cases connector will fallback to `string` type.
## Features
| Feature | Supported |
| :--- | :--- |
| Full Refresh Sync | Yes |
| Incremental - Append Sync | Yes |
| Replicate Incremental Deletes | No |
| Namespaces | No |
### Full Refresh sync
Works as usual full refresh sync.
### Incremental sync
Cursor field can not be nested. Currently only top level document properties are supported.
Cursor should **never** be blank. In case cursor is blank - the incremental sync results might be unpredictable and will totally rely on MongoDB comparison algorithm.
Only `datetime` and `integer` cursor types are supported. Cursor type is determined based on the cursor field name:
* `datetime` - if cursor field name contains a string from: `time`, `date`, `_at`, `timestamp`, `ts`
* `integer` - otherwise
## Getting started
This guide describes in details how you can configure MongoDB for integration with Airbyte.
### Create users
Run `mongo` shell, switch to `admin` database and create a `READ_ONLY_USER`. `READ_ONLY_USER` will be used for Airbyte integration. Please make sure that user has read-only privileges.
```javascript
mongo
use admin;
db.createUser({user: "READ_ONLY_USER", pwd: "READ_ONLY_PASSWORD", roles: [{role: "read", db: "TARGET_DATABASE"}]})
```
Make sure the user have appropriate access levels.
### Enable MongoDB authentication
Open `/etc/mongod.conf` and add/replace specific keys:
```yaml
net:
bindIp: 0.0.0.0
security:
authorization: enabled
```
Binding to `0.0.0.0` will allow to connect to database from any IP address.
The last line will enable MongoDB security. Now only authenticated users will be able to access the database.
### Configure firewall
Make sure that MongoDB is accessible from external servers. Specific commands will depend on the firewall you are using \(UFW/iptables/AWS/etc\). Please refer to appropriate documentation.
Your `READ_ONLY_USER` should now be ready for use with Airbyte.
### Сonfiguration Parameters
* Database: database name
* Authentication Source: specifies the database that the supplied credentials should be validated against. Defaults to `admin`.
* User: username to use when connecting
* Password: used to authenticate the user
* TSL: whether to use TSL connection
* **Standalone MongoDb instance**
* Host: URL of the database
* Port: Port to use for connecting to the database
* **Replica Set**
* Server addresses: the members of a replica set
* Replica Set: A replica set name
* **MongoDb Atlas Cluster**
* Cluster URL: URL of a cluster to connect to
For more information regarding configuration parameters, please see [MongoDb Documentation](https://docs.mongodb.com/drivers/java/sync/v4.3/fundamentals/connection/).
## Changelog
| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.0 | 2021-08-30 | [5530](https://github.com/airbytehq/airbyte/pull/5530) | New source: MongoDb ported to java |