1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🎉 New destination: Cassandra (#7186)

* add cassandra destination connector

* refactor and docs.

* delete test dockerfile

* revert Dockerfile rm change

* refactor & fix acceptance tests & format

* revert stream peek

* remove get pip

* add address example

* improved copy and code refactor

* add docker-compose and improved docs

Co-authored-by: itaseski <ivica.taseski@seavus.com>
This commit is contained in:
itaseskii
2021-11-05 23:02:01 +01:00
committed by GitHub
parent 1f295f24e6
commit f53fd5e66b
33 changed files with 1722 additions and 2 deletions

View File

@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "707456df-6f4f-4ced-b5c6-03f73bcad1c5",
"name": "Cassandra",
"dockerRepository": "airbyte/destination-cassandra",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/cassandra"
}

View File

@@ -128,7 +128,6 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
@Override
protected void acceptTracked(final AirbyteMessage message) throws Exception {
Preconditions.checkState(hasStarted, "Cannot accept records until consumer has started");
if (message.getType() == Type.RECORD) {
final AirbyteRecordMessage recordMessage = message.getRecord();
final AirbyteStreamNameNamespacePair stream = AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage);

View File

@@ -496,7 +496,6 @@ public abstract class DestinationAcceptanceTest {
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
final JsonNode config = getConfig();
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false);
final List<AirbyteMessage> secondSyncMessages = Lists.newArrayList(
new AirbyteMessage()
.withType(Type.RECORD)

View File

@@ -112,3 +112,4 @@
| Redshift | [![destination-redshift](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-redshift%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-redshift) |
| S3 | [![destination-s3](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-s3) |
| Snowflake | [![destination-snowflake](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-snowflake%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-snowflake) |
| Cassandra | [![destination-cassandra](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-cassandra%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-cassandra) |

View File

@@ -0,0 +1,3 @@
*
!Dockerfile
!build

View File

@@ -0,0 +1,11 @@
FROM airbyte/integration-base-java:dev
WORKDIR /airbyte
ENV APPLICATION destination-cassandra
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-cassandra

View File

@@ -0,0 +1,68 @@
# Destination Cassandra
This is the repository for the Cassandra destination connector in Java.
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/cassandra).
## Local development
#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:destination-cassandra:build
```
#### Create credentials
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.
**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.
### Locally running the connector docker image
#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:destination-cassandra:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-cassandra:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-cassandra:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-cassandra:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-cassandra:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
We use `JUnit` for Java tests.
### Unit and Integration Tests
Place unit tests under `src/test/io/airbyte/integrations/destinations/cassandra`.
#### Acceptance Tests
Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in
`src/test-integration/java/io/airbyte/integrations/destinations/cassandraDestinationAcceptanceTest.java`.
### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:destination-cassandra:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:destination-cassandra:integrationTest
```
## Dependency Management
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

View File

@@ -0,0 +1,30 @@
# Cassandra Destination
Cassandra is a free and open-source, distributed, wide-column store, NoSQL database management system designed to handle
large amounts of data across many commodity servers, providing high availability with no single point of failure
The data is structured in keyspaces and tables and is partitioned and replicated across different nodes in the
cluster.
[Read more about Cassandra](https://cassandra.apache.org/_/index.html)
This connector maps an incoming `stream` to a Cassandra `table` and a `namespace` to a Cassandra`keyspace`.
When using destination sync mode `append` and `append_dedup`, an `insert` operation is performed against an existing
Cassandra table.
When using `overwrite`, the records are first placed in a temp table. When all the messages have been received the data
is copied to the final table which is first truncated and the temp table is deleted.
The Implementation uses the [Datastax](https://github.com/datastax/java-driver) driver in order to access
Cassandra. [CassandraCqlProvider](./src/main/java/io/airbyte/integrations/destination/cassandra/CassandraCqlProvider.java)
handles the communication with the Cassandra cluster and internally it uses
the [SessionManager](./src/main/java/io/airbyte/integrations/destination/cassandra/SessionManager.java) to retrieve a
CqlSession to the cluster.
The [CassandraMessageConsumer](./src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java)
class contains the logic for handling airbyte messages, events and copying data between tables.
## Development
See the [CassandraCqlProvider](./src/main/java/io/airbyte/integrations/destination/cassandra/CassandraCqlProvider.java)
class on how to use the datastax driver.
[Datastax docs.](https://docs.datastax.com/en/developer/java-driver/3.0/)

View File

@@ -0,0 +1,33 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}
application {
mainClass = 'io.airbyte.integrations.destination.cassandra.CassandraDestination'
}
def cassandraDriver = '4.13.0'
def testContainersVersion = '1.16.0'
def assertVersion = '3.21.0'
dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation "com.datastax.oss:java-driver-core:${cassandraDriver}"
implementation "com.datastax.oss:java-driver-query-builder:${cassandraDriver}"
implementation "com.datastax.oss:java-driver-mapper-runtime:${cassandraDriver}"
// https://mvnrepository.com/artifact/org.assertj/assertj-core
testImplementation "org.assertj:assertj-core:${assertVersion}"
testImplementation "org.testcontainers:cassandra:${testContainersVersion}"
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-cassandra')
}

View File

@@ -0,0 +1,24 @@
version: '3.7'
services:
cassandra1:
image: cassandra:4.0
ports:
- "9042:9042"
environment:
- "MAX_HEAP_SIZE=2048M"
- "HEAP_NEWSIZE=1024M"
- "CASSANDRA_CLUSTER_NAME=cassandra_cluster"
# Uncomment if you want to run a Cassandra cluster
# cassandra2:
# image: cassandra:4.0
# ports:
# - "9043:9042"
# environment:
# - "MAX_HEAP_SIZE=2048M"
# - "HEAP_NEWSIZE=1024M"
# - "CASSANDRA_SEEDS=cassandra1"
# - "CASSANDRA_CLUSTER_NAME=cassandra_cluster"
# depends_on:
# - cassandra1

View File

@@ -0,0 +1,4 @@
{
"username": "paste-username-here",
"password": "paste-password-here"
}

View File

@@ -0,0 +1,114 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Objects;
/*
* Immutable configuration class for storing cassandra related config.
*/
class CassandraConfig {
private final String keyspace;
private final String username;
private final String password;
private final String address;
private final int port;
private final String datacenter;
private final int replication;
public CassandraConfig(String keyspace,
String username,
String password,
String address,
int port,
String datacenter,
int replication) {
this.keyspace = keyspace;
this.username = username;
this.password = password;
this.address = address;
this.port = port;
this.datacenter = datacenter;
this.replication = replication;
}
public CassandraConfig(JsonNode config) {
this.keyspace = config.get("keyspace").asText();
this.username = config.get("username").asText();
this.password = config.get("password").asText();
this.address = config.get("address").asText();
this.port = config.get("port").asInt(9042);
this.datacenter = config.get("datacenter").asText("datacenter1");
this.replication = config.get("replication").asInt(1);
}
public String getKeyspace() {
return keyspace;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getAddress() {
return address;
}
public int getPort() {
return port;
}
public String getDatacenter() {
return datacenter;
}
public int getReplication() {
return replication;
}
@Override
public String toString() {
return "CassandraConfig{" +
"keyspace='" + keyspace + '\'' +
", username='" + username + '\'' +
", password='" + password + '\'' +
", address='" + address + '\'' +
", port=" + port +
", datacenter='" + datacenter + '\'' +
", replication=" + replication +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CassandraConfig that = (CassandraConfig) o;
return port == that.port && username.equals(that.username) && password.equals(that.password) &&
address.equals(that.address) && datacenter.equals(that.datacenter);
}
@Override
public int hashCode() {
return Objects.hash(username, password, address, port, datacenter);
}
}

View File

@@ -0,0 +1,179 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.now;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import io.airbyte.integrations.base.JavaBaseConstants;
import java.io.Closeable;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class CassandraCqlProvider implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraCqlProvider.class);
private static final int N_THREADS = Runtime.getRuntime().availableProcessors();
private final ExecutorService executorService;
private final CqlSession cqlSession;
private final CassandraConfig cassandraConfig;
private final String columnId;
private final String columnData;
private final String columnTimestamp;
public CassandraCqlProvider(CassandraConfig cassandraConfig) {
this.cassandraConfig = cassandraConfig;
this.cqlSession = SessionManager.initSession(cassandraConfig);
var nameTransformer = new CassandraNameTransformer(cassandraConfig);
this.columnId = nameTransformer.outputColumn(JavaBaseConstants.COLUMN_NAME_AB_ID);
this.columnData = nameTransformer.outputColumn(JavaBaseConstants.COLUMN_NAME_DATA);
this.columnTimestamp = nameTransformer.outputColumn(JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
this.executorService = Executors.newFixedThreadPool(N_THREADS);
}
public void createKeySpaceIfNotExists(String keyspace, int replicationFactor) {
var query = SchemaBuilder.createKeyspace(keyspace)
.ifNotExists()
.withSimpleStrategy(replicationFactor)
.build();
cqlSession.execute(query);
}
public void createTableIfNotExists(String keyspace, String tableName) {
var query = SchemaBuilder.createTable(keyspace, tableName)
.ifNotExists()
.withPartitionKey(columnId, DataTypes.UUID)
.withColumn(columnData, DataTypes.TEXT)
.withColumn(columnTimestamp, DataTypes.TIMESTAMP)
.build();
cqlSession.execute(query);
}
public void dropTableIfExists(String keyspace, String tableName) {
var query = SchemaBuilder.dropTable(keyspace, tableName)
.ifExists()
.build();
cqlSession.execute(query);
}
public void insert(String keyspace, String tableName, String jsonData) {
var query = QueryBuilder.insertInto(keyspace, tableName)
.value(columnId, QueryBuilder.literal(Uuids.random()))
.value(columnData, QueryBuilder.literal(jsonData))
.value(columnTimestamp, QueryBuilder.toTimestamp(now()))
.build();
cqlSession.execute(query);
}
public void truncate(String keyspace, String tableName) {
var query = QueryBuilder.truncate(keyspace, tableName).build();
cqlSession.execute(query);
}
public List<CassandraRecord> select(String keyspace, String tableName) {
var query = QueryBuilder.selectFrom(keyspace, tableName)
.columns(columnId, columnData, columnTimestamp)
.build();
return cqlSession.execute(query)
.map(result -> new CassandraRecord(
result.get(columnId, UUID.class),
result.get(columnData, String.class),
result.get(columnTimestamp, Instant.class)))
.all();
}
public List<Tuple<String, List<String>>> retrieveMetadata() {
return cqlSession.getMetadata().getKeyspaces().values().stream()
.map(keyspace -> Tuple.of(keyspace.getName().toString(), keyspace.getTables().values()
.stream()
.map(table -> table.getName().toString())
.collect(Collectors.toList())))
.collect(Collectors.toList());
}
public void copy(String keyspace, String sourceTable, String destinationTable) {
var select = String.format("SELECT * FROM %s.%s WHERE token(%s) > ? AND token(%s) <= ?",
keyspace, sourceTable, columnId, columnId);
var selectStatement = cqlSession.prepare(select);
var insert = String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (?, ?, ?)",
keyspace, destinationTable, columnId, columnData, columnTimestamp);
var insertStatement = cqlSession.prepare(insert);
// perform full table scan in parallel using token ranges
// optimal for copying large amounts of data
cqlSession.getMetadata().getTokenMap()
.map(TokenMap::getTokenRanges)
.orElseThrow(IllegalStateException::new)
.stream()
.flatMap(range -> range.unwrap().stream())
.map(range -> selectStatement.bind(range.getStart(), range.getEnd()))
// explore datastax 4.x async api as an alternative for async processing
.map(selectBoundStatement -> executorService.submit(() -> batchInsert(selectBoundStatement, insertStatement)))
.forEach(this::awaitThread);
}
private void batchInsert(BoundStatement select, PreparedStatement insert) {
// unlogged removes the log record for increased insert speed
var batchStatement = BatchStatement.builder(BatchType.UNLOGGED);
cqlSession.execute(select).all().stream()
.map(r -> CassandraRecord.of(
r.get(columnId, UUID.class),
r.get(columnData, String.class),
r.get(columnTimestamp, Instant.class)))
.map(r -> insert.bind(r.getId(), r.getData(), r.getTimestamp()))
.forEach(batchStatement::addStatement);
cqlSession.execute(batchStatement.build());
}
private void awaitThread(Future<?> future) {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error("Interrupted thread while copying data with reason: ", e);
} catch (ExecutionException e) {
LOGGER.error("Error while copying data with reason: ", e);
}
}
@Override
public void close() {
// wait for tasks completion and terminate executor gracefully
executorService.shutdown();
// close cassandra session for the given config
SessionManager.closeSession(cassandraConfig);
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.UUID;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class CassandraDestination extends BaseConnector implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraDestination.class);
public static void main(String[] args) throws Exception {
new IntegrationRunner(new CassandraDestination()).run(args);
}
@Override
public AirbyteConnectionStatus check(JsonNode config) {
var cassandraConfig = new CassandraConfig(config);
// add random uuid to avoid conflicts with existing tables.
String tableName = "table_" + UUID.randomUUID().toString().replace("-", "");
CassandraCqlProvider cassandraCqlProvider = null;
try {
cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
// check connection and write permissions
cassandraCqlProvider.createKeySpaceIfNotExists(cassandraConfig.getKeyspace(),
cassandraConfig.getReplication());
cassandraCqlProvider.createTableIfNotExists(cassandraConfig.getKeyspace(), tableName);
cassandraCqlProvider.insert(cassandraConfig.getKeyspace(), tableName, "{}");
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.error("Can't establish Cassandra connection with reason: ", e);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED);
} finally {
if (cassandraCqlProvider != null) {
try {
cassandraCqlProvider.dropTableIfExists(cassandraConfig.getKeyspace(), tableName);
} catch (Exception e) {
LOGGER.error("Error while deleting temp table {} with reason: ", tableName, e);
}
cassandraCqlProvider.close();
}
}
}
@Override
public AirbyteMessageConsumer getConsumer(JsonNode config,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
return new CassandraMessageConsumer(new CassandraConfig(config), configuredCatalog, outputRecordCollector);
}
}

View File

@@ -0,0 +1,109 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class CassandraMessageConsumer extends FailureTrackingAirbyteMessageConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageConsumer.class);
private final CassandraConfig cassandraConfig;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, CassandraStreamConfig> cassandraStreams;
private final CassandraCqlProvider cassandraCqlProvider;
private AirbyteMessage lastMessage = null;
public CassandraMessageConsumer(CassandraConfig cassandraConfig,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
this.cassandraConfig = cassandraConfig;
this.outputRecordCollector = outputRecordCollector;
this.cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
var nameTransformer = new CassandraNameTransformer(cassandraConfig);
this.cassandraStreams = configuredCatalog.getStreams().stream()
.collect(Collectors.toUnmodifiableMap(
AirbyteStreamNameNamespacePair::fromConfiguredAirbyteSteam,
k -> new CassandraStreamConfig(
nameTransformer.outputKeyspace(k.getStream().getNamespace()),
nameTransformer.outputTable(k.getStream().getName()),
nameTransformer.outputTmpTable(k.getStream().getName()),
k.getDestinationSyncMode())));
}
@Override
protected void startTracked() {
cassandraStreams.forEach((k, v) -> {
cassandraCqlProvider.createKeySpaceIfNotExists(v.getKeyspace(), cassandraConfig.getReplication());
cassandraCqlProvider.createTableIfNotExists(v.getKeyspace(), v.getTempTableName());
});
}
@Override
protected void acceptTracked(AirbyteMessage message) {
if (message.getType() == AirbyteMessage.Type.RECORD) {
var messageRecord = message.getRecord();
var streamConfig =
cassandraStreams.get(AirbyteStreamNameNamespacePair.fromRecordMessage(messageRecord));
if (streamConfig == null) {
throw new IllegalArgumentException("Unrecognized destination stream");
}
var data = Jsons.serialize(messageRecord.getData());
cassandraCqlProvider.insert(streamConfig.getKeyspace(), streamConfig.getTempTableName(), data);
} else if (message.getType() == AirbyteMessage.Type.STATE) {
this.lastMessage = message;
} else {
LOGGER.warn("Unsupported airbyte message type: {}", message.getType());
}
}
@Override
protected void close(boolean hasFailed) {
if (!hasFailed) {
cassandraStreams.forEach((k, v) -> {
try {
cassandraCqlProvider.createTableIfNotExists(v.getKeyspace(), v.getTableName());
switch (v.getDestinationSyncMode()) {
case APPEND -> {
cassandraCqlProvider.copy(v.getKeyspace(), v.getTempTableName(), v.getTableName());
}
case OVERWRITE -> {
cassandraCqlProvider.truncate(v.getKeyspace(), v.getTableName());
cassandraCqlProvider.copy(v.getKeyspace(), v.getTempTableName(), v.getTableName());
}
default -> throw new UnsupportedOperationException();
}
} catch (Exception e) {
LOGGER.error("Error while copying data to table {}: : ", v.getTableName(), e);
}
});
outputRecordCollector.accept(lastMessage);
}
cassandraStreams.forEach((k, v) -> {
try {
cassandraCqlProvider.dropTableIfExists(v.getKeyspace(), v.getTempTableName());
} catch (Exception e) {
LOGGER.error("Error while deleting temp table {} with reason: ", v.getTempTableName(), e);
}
});
cassandraCqlProvider.close();
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import com.google.common.base.CharMatcher;
import io.airbyte.commons.text.Names;
import io.airbyte.integrations.destination.StandardNameTransformer;
class CassandraNameTransformer extends StandardNameTransformer {
private final CassandraConfig cassandraConfig;
public CassandraNameTransformer(CassandraConfig cassandraConfig) {
this.cassandraConfig = cassandraConfig;
}
String outputKeyspace(String namespace) {
if (namespace == null || namespace.isBlank()) {
return cassandraConfig.getKeyspace();
}
return CharMatcher.is('_').trimLeadingFrom(Names.toAlphanumericAndUnderscore(namespace));
}
String outputTable(String streamName) {
var tableName = super.getRawTableName(streamName.toLowerCase()).substring(1);
// max allowed length for a cassandra table is 48 characters
return tableName.length() > 48 ? tableName.substring(0, 48) : tableName;
}
String outputTmpTable(String streamName) {
var tableName = super.getTmpTableName(streamName.toLowerCase()).substring(1);
// max allowed length for a cassandra table is 48 characters
return tableName.length() > 48 ? tableName.substring(0, 48) : tableName;
}
String outputColumn(String columnName) {
return Names.doubleQuote(columnName.toLowerCase());
}
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import java.time.Instant;
import java.util.UUID;
class CassandraRecord {
private final UUID id;
private final String data;
private final Instant timestamp;
public CassandraRecord(UUID id, String data, Instant timestamp) {
this.id = id;
this.data = data;
this.timestamp = timestamp;
}
static CassandraRecord of(UUID id, String data, Instant timestamp) {
return new CassandraRecord(id, data, timestamp);
}
public UUID getId() {
return id;
}
public String getData() {
return data;
}
public Instant getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return "CassandraRecord{" +
"id=" + id +
", data='" + data + '\'' +
", timestamp=" + timestamp +
'}';
}
}

View File

@@ -0,0 +1,58 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import io.airbyte.protocol.models.DestinationSyncMode;
/*
* Immutable configuration class for storing destination stream config.
*/
class CassandraStreamConfig {
private final String keyspace;
private final String tableName;
private final String tempTableName;
private final DestinationSyncMode destinationSyncMode;
public CassandraStreamConfig(String keyspace,
String tableName,
String tempTableName,
DestinationSyncMode destinationSyncMode) {
this.keyspace = keyspace;
this.tableName = tableName;
this.tempTableName = tempTableName;
this.destinationSyncMode = destinationSyncMode;
}
public String getKeyspace() {
return keyspace;
}
public String getTableName() {
return tableName;
}
public String getTempTableName() {
return tempTableName;
}
public DestinationSyncMode getDestinationSyncMode() {
return destinationSyncMode;
}
@Override
public String toString() {
return "CassandraStreamConfig{" +
"keyspace='" + keyspace + '\'' +
", tableName='" + tableName + '\'' +
", tempTableName='" + tempTableName + '\'' +
", destinationSyncMode=" + destinationSyncMode +
'}';
}
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import com.datastax.oss.driver.api.core.CqlSession;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
class SessionManager {
// AtomicInteger is used for convenience, this class is not thread safe
// and needs additional synchronization for that.
private static final ConcurrentHashMap<CassandraConfig, Tuple<CqlSession, AtomicInteger>> sessions;
static {
sessions = new ConcurrentHashMap<>();
}
private SessionManager() {
}
/*
* CqlSession objects are heavyweight and can hold several tcp connections to the Cassandra cluster,
* for that reason it is better if sessions are reused per configuration. Sessions are thread-safe
* and can be accessed from different threads.
*
*/
public static CqlSession initSession(CassandraConfig cassandraConfig) {
var cachedSession = sessions.get(cassandraConfig);
if (cachedSession != null) {
cachedSession.value2().incrementAndGet();
return cachedSession.value1();
} else {
var session = CqlSession.builder()
.withLocalDatacenter(cassandraConfig.getDatacenter())
.addContactPoint(new InetSocketAddress(cassandraConfig.getAddress(), cassandraConfig.getPort()))
.withAuthCredentials(cassandraConfig.getUsername(), cassandraConfig.getPassword())
.build();
sessions.put(cassandraConfig, Tuple.of(session, new AtomicInteger(1)));
return session;
}
}
/*
* Close session configured with cassandra config. if the session is being used by more than one
* external instance only decrease the usage count, otherwise close the session and remove it from
* the map.
*
*/
public static void closeSession(CassandraConfig cassandraConfig) {
var cachedSession = sessions.get(cassandraConfig);
if (cachedSession == null) {
throw new IllegalStateException("No session for the provided config");
}
int count = cachedSession.value2().decrementAndGet();
if (count < 1) {
cachedSession.value1().close();
sessions.remove(cassandraConfig);
}
}
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
public class Tuple<V1, V2> {
private final V1 value1;
private final V2 value2;
public Tuple(V1 value1, V2 value2) {
this.value1 = value1;
this.value2 = value2;
}
public static <V1, V2> Tuple<V1, V2> of(V1 value1, V2 value2) {
return new Tuple<>(value1, value2);
}
public V1 value1() {
return value1;
}
public V2 value2() {
return value2;
}
@Override
public String toString() {
return "Tuple{" +
"value1=" + value1 +
", value2=" + value2 +
'}';
}
}

View File

@@ -0,0 +1,65 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/cassandra",
"supportsIncremental": true,
"supportsNormalization": false,
"supportsDBT": false,
"supported_destination_sync_modes": ["overwrite", "append"],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Cassandra Destination Spec",
"type": "object",
"required": ["keyspace", "username", "password", "address", "port"],
"additionalProperties": true,
"properties": {
"keyspace": {
"title": "Keyspace",
"description": "Default Cassandra keyspace to create data in.",
"type": "string",
"order": 0
},
"username": {
"title": "Username",
"description": "Username to use to access Cassandra.",
"type": "string",
"order": 1
},
"password": {
"title": "Password",
"description": "Password associated with Cassandra.",
"type": "string",
"airbyte_secret": true,
"order": 2
},
"address": {
"title": "Address",
"description": "Address to connect to.",
"type": "string",
"examples": ["localhost,127.0.0.1"],
"order": 3
},
"port": {
"title": "Port",
"description": "Port of Cassandra.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 9042,
"order": 4
},
"datacenter": {
"title": "Datacenter",
"description": "Datacenter of the cassandra cluster.",
"type": "string",
"default": "datacenter1",
"order": 5
},
"replication": {
"title": "Replication factor",
"type": "integer",
"description": "Indicates to how many nodes the data should be replicated to.",
"default": 1,
"order": 6
}
}
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import org.testcontainers.containers.CassandraContainer;
class CassandraContainerInitializr {
private static ConfiguredCassandraContainer cassandraContainer;
private CassandraContainerInitializr() {
}
public static ConfiguredCassandraContainer initContainer() {
if (cassandraContainer == null) {
cassandraContainer = new ConfiguredCassandraContainer();
}
cassandraContainer.start();
return cassandraContainer;
}
public static class ConfiguredCassandraContainer extends CassandraContainer<ConfiguredCassandraContainer> {
ConfiguredCassandraContainer() {
// latest compatible version with the internal testcontainers datastax driver.
super("cassandra:3.11.11");
}
}
}

View File

@@ -0,0 +1,135 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CassandraCqlProviderIT {
private static final String CASSANDRA_KEYSPACE = "cassandra_keyspace";
private static final String CASSANDRA_TABLE = "cassandra_table";
private CassandraCqlProvider cassandraCqlProvider;
private CassandraNameTransformer nameTransformer;
@BeforeAll
void setup() {
var cassandraContainer = CassandraContainerInitializr.initContainer();
var cassandraConfig = TestDataFactory.createCassandraConfig(
cassandraContainer.getUsername(),
cassandraContainer.getPassword(),
cassandraContainer.getHost(),
cassandraContainer.getFirstMappedPort());
this.cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
this.nameTransformer = new CassandraNameTransformer(cassandraConfig);
cassandraCqlProvider.createKeySpaceIfNotExists(CASSANDRA_KEYSPACE, 1);
cassandraCqlProvider.createTableIfNotExists(CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
}
@AfterEach
void clean() {
cassandraCqlProvider.truncate(CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
}
@Test
void testCreateKeySpaceIfNotExists() {
String keyspace = nameTransformer.outputKeyspace("test_keyspace");
assertDoesNotThrow(() -> cassandraCqlProvider.createKeySpaceIfNotExists(keyspace, 1));
}
@Test
void testCreateTableIfNotExists() {
String table = nameTransformer.outputTable("test_stream");
assertDoesNotThrow(() -> cassandraCqlProvider.createTableIfNotExists(CASSANDRA_KEYSPACE, table));
}
@Test
void testInsert() {
// given
cassandraCqlProvider.insert(CASSANDRA_KEYSPACE, CASSANDRA_TABLE, "{\"property\":\"data1\"}");
cassandraCqlProvider.insert(CASSANDRA_KEYSPACE, CASSANDRA_TABLE, "{\"property\":\"data2\"}");
cassandraCqlProvider.insert(CASSANDRA_KEYSPACE, CASSANDRA_TABLE, "{\"property\":\"data3\"}");
// when
var resultSet = cassandraCqlProvider.select(CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
// then
assertThat(resultSet)
.isNotNull()
.hasSize(3)
.anyMatch(r -> r.getData().equals("{\"property\":\"data1\"}"))
.anyMatch(r -> r.getData().equals("{\"property\":\"data2\"}"))
.anyMatch(r -> r.getData().equals("{\"property\":\"data3\"}"));
}
@Test
void testTruncate() {
// given
cassandraCqlProvider.insert(CASSANDRA_KEYSPACE, CASSANDRA_TABLE, "{\"property\":\"data1\"}");
cassandraCqlProvider.insert(CASSANDRA_KEYSPACE, CASSANDRA_TABLE, "{\"property\":\"data2\"}");
cassandraCqlProvider.insert(CASSANDRA_KEYSPACE, CASSANDRA_TABLE, "{\"property\":\"data3\"}");
// when
cassandraCqlProvider.truncate(CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
var resultSet = cassandraCqlProvider.select(CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
// then
assertThat(resultSet)
.isNotNull()
.isEmpty();
}
@Test
void testDropTableIfExists() {
// given
String table = nameTransformer.outputTmpTable("test_stream");
cassandraCqlProvider.createTableIfNotExists(CASSANDRA_KEYSPACE, table);
// when
cassandraCqlProvider.dropTableIfExists(CASSANDRA_KEYSPACE, table);
// then
assertThrows(InvalidQueryException.class, () -> cassandraCqlProvider.select(CASSANDRA_KEYSPACE, table));
}
@Test
void testCopy() {
// given
String tmpTable = nameTransformer.outputTmpTable("test_stream_copy");
cassandraCqlProvider.createTableIfNotExists(CASSANDRA_KEYSPACE, tmpTable);
cassandraCqlProvider.insert(CASSANDRA_KEYSPACE, tmpTable, "{\"property\":\"data1\"}");
cassandraCqlProvider.insert(CASSANDRA_KEYSPACE, tmpTable, "{\"property\":\"data2\"}");
cassandraCqlProvider.insert(CASSANDRA_KEYSPACE, tmpTable, "{\"property\":\"data3\"}");
String rawTable = nameTransformer.outputTable("test_stream_copy");
cassandraCqlProvider.createTableIfNotExists(CASSANDRA_KEYSPACE, rawTable);
// when
cassandraCqlProvider.copy(CASSANDRA_KEYSPACE, tmpTable, rawTable);
var resultSet = cassandraCqlProvider.select(CASSANDRA_KEYSPACE, rawTable);
// then
assertThat(resultSet)
.isNotNull()
.hasSize(3)
.anyMatch(r -> r.getData().equals("{\"property\":\"data1\"}"))
.anyMatch(r -> r.getData().equals("{\"property\":\"data2\"}"))
.anyMatch(r -> r.getData().equals("{\"property\":\"data3\"}"));
}
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CassandraDestinationAcceptanceTest extends DestinationAcceptanceTest {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraDestinationAcceptanceTest.class);
private JsonNode configJson;
private CassandraCqlProvider cassandraCqlProvider;
private CassandraNameTransformer cassandraNameTransformer;
private static CassandraContainerInitializr.ConfiguredCassandraContainer cassandraContainer;
@BeforeAll
static void initContainer() {
cassandraContainer = CassandraContainerInitializr.initContainer();
}
@Override
protected void setup(TestDestinationEnv testEnv) {
configJson = TestDataFactory.createJsonConfig(
cassandraContainer.getUsername(),
cassandraContainer.getPassword(),
cassandraContainer.getHost(),
cassandraContainer.getFirstMappedPort());
var cassandraConfig = new CassandraConfig(configJson);
cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
cassandraNameTransformer = new CassandraNameTransformer(cassandraConfig);
}
@Override
protected void tearDown(TestDestinationEnv testEnv) {
cassandraCqlProvider.retrieveMetadata().forEach(meta -> {
var keyspace = meta.value1();
meta.value2().forEach(table -> cassandraCqlProvider.truncate(keyspace, table));
});
}
@Override
protected String getImageName() {
return "airbyte/destination-cassandra:dev";
}
@Override
protected JsonNode getConfig() {
return configJson;
}
@Override
protected boolean implementsNamespaces() {
return true;
}
@Override
protected JsonNode getFailCheckConfig() {
return TestDataFactory.createJsonConfig(
"usr",
"pw",
"127.0.192.1",
8080);
}
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema) {
var keyspace = cassandraNameTransformer.outputKeyspace(namespace);
var table = cassandraNameTransformer.outputTable(streamName);
return cassandraCqlProvider.select(keyspace, table).stream()
.sorted(Comparator.comparing(CassandraRecord::getTimestamp))
.map(CassandraRecord::getData)
.map(Jsons::deserialize)
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import static org.assertj.core.api.Assertions.assertThat;
import io.airbyte.integrations.destination.cassandra.CassandraContainerInitializr.ConfiguredCassandraContainer;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CassandraDestinationIT {
private CassandraDestination cassandraDestination;
private ConfiguredCassandraContainer cassandraContainer;
@BeforeAll
void setup() {
this.cassandraContainer = CassandraContainerInitializr.initContainer();
this.cassandraDestination = new CassandraDestination();
}
@Test
void testCheckWithStatusSucceeded() {
var jsonConfiguration = TestDataFactory.createJsonConfig(
cassandraContainer.getUsername(),
cassandraContainer.getPassword(),
cassandraContainer.getHost(),
cassandraContainer.getFirstMappedPort());
var connectionStatus = cassandraDestination.check(jsonConfiguration);
assertThat(connectionStatus.getStatus()).isEqualTo(AirbyteConnectionStatus.Status.SUCCEEDED);
}
@Test
void testCheckWithStatusFailed() {
var jsonConfiguration = TestDataFactory.createJsonConfig(
"usr",
"pw",
"192.0.2.1",
8080);
var connectionStatus = cassandraDestination.check(jsonConfiguration);
assertThat(connectionStatus.getStatus()).isEqualTo(AirbyteConnectionStatus.Status.FAILED);
}
}

View File

@@ -0,0 +1,130 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;
@TestMethodOrder(OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CassandraMessageConsumerIT {
private static final String AIRBYTE_NAMESPACE_1 = "airbyte_namespace_1";
private static final String AIRBYTE_NAMESPACE_2 = "airbyte_namespace_2";
private static final String AIRBYTE_STREAM_1 = "airbyte_stream_1";
private static final String AIRBYTE_STREAM_2 = "airbyte_stream_2";
private CassandraMessageConsumer cassandraMessageConsumer;
private CassandraCqlProvider cassandraCqlProvider;
private CassandraNameTransformer nameTransformer;
@BeforeAll
void setup() {
var cassandraContainer = CassandraContainerInitializr.initContainer();
var cassandraConfig = TestDataFactory.createCassandraConfig(
cassandraContainer.getUsername(),
cassandraContainer.getPassword(),
cassandraContainer.getHost(),
cassandraContainer.getFirstMappedPort());
var stream1 = TestDataFactory.createAirbyteStream(AIRBYTE_STREAM_1, AIRBYTE_NAMESPACE_1);
var stream2 = TestDataFactory.createAirbyteStream(AIRBYTE_STREAM_2, AIRBYTE_NAMESPACE_2);
var cStream1 = TestDataFactory.createConfiguredAirbyteStream(DestinationSyncMode.APPEND, stream1);
var cStream2 = TestDataFactory.createConfiguredAirbyteStream(DestinationSyncMode.OVERWRITE, stream2);
var catalog = TestDataFactory.createConfiguredAirbyteCatalog(cStream1, cStream2);
cassandraMessageConsumer = new CassandraMessageConsumer(cassandraConfig, catalog, message -> {});
cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
nameTransformer = new CassandraNameTransformer(cassandraConfig);
}
@Test
@Order(1)
void testStartTracked() {
assertDoesNotThrow(() -> cassandraMessageConsumer.startTracked());
}
@Test
@Order(2)
void testAcceptTracked() {
Function<String, JsonNode> function =
data -> Jsons.jsonNode(ImmutableMap.builder().put("property", data).build());
assertDoesNotThrow(() -> {
cassandraMessageConsumer.acceptTracked(
TestDataFactory.createAirbyteMessage(AirbyteMessage.Type.RECORD, AIRBYTE_STREAM_1, AIRBYTE_NAMESPACE_1,
function.apply("data1")));
cassandraMessageConsumer.acceptTracked(
TestDataFactory.createAirbyteMessage(AirbyteMessage.Type.RECORD, AIRBYTE_STREAM_1, AIRBYTE_NAMESPACE_1,
function.apply("data2")));
cassandraMessageConsumer.acceptTracked(
TestDataFactory.createAirbyteMessage(AirbyteMessage.Type.RECORD, AIRBYTE_STREAM_2, AIRBYTE_NAMESPACE_2,
function.apply("data3")));
cassandraMessageConsumer.acceptTracked(
TestDataFactory.createAirbyteMessage(AirbyteMessage.Type.RECORD, AIRBYTE_STREAM_2, AIRBYTE_NAMESPACE_2,
function.apply("data4")));
cassandraMessageConsumer.acceptTracked(
TestDataFactory.createAirbyteMessage(AirbyteMessage.Type.STATE, AIRBYTE_STREAM_2, AIRBYTE_NAMESPACE_2,
function.apply("data5")));
});
}
@Test
@Order(3)
void testClose() {
assertDoesNotThrow(() -> cassandraMessageConsumer.close(false));
}
@Test
@Order(4)
void testFinalState() {
var keyspace1 = nameTransformer.outputKeyspace(AIRBYTE_NAMESPACE_1);
var keyspace2 = nameTransformer.outputKeyspace(AIRBYTE_NAMESPACE_2);
var table1 = nameTransformer.outputTable(AIRBYTE_STREAM_1);
var table2 = nameTransformer.outputTable(AIRBYTE_STREAM_2);
var resultSet1 = cassandraCqlProvider.select(keyspace1, table1);
var resultSet2 = cassandraCqlProvider.select(keyspace2, table2);
assertThat(resultSet1)
.isNotNull()
.hasSize(2)
.anyMatch(r -> r.getData().equals("{\"property\":\"data1\"}"))
.anyMatch(r -> r.getData().equals("{\"property\":\"data2\"}"));
assertThat(resultSet2)
.isNotNull()
.hasSize(2)
.anyMatch(r -> r.getData().equals("{\"property\":\"data3\"}"))
.anyMatch(r -> r.getData().equals("{\"property\":\"data4\"}"));
}
}

View File

@@ -0,0 +1,77 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.time.Instant;
import java.util.List;
public class TestDataFactory {
private TestDataFactory() {
}
static CassandraConfig createCassandraConfig(String username, String password, String address, int port) {
return new CassandraConfig(
"default_keyspace",
username,
password,
address,
port,
"datacenter1",
1);
}
static JsonNode createJsonConfig(String username, String password, String address, int port) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("keyspace", "default_keyspace")
.put("username", username)
.put("password", password)
.put("address", address)
.put("port", port)
.put("datacenter", "datacenter1")
.put("replication", 1)
.build());
}
static AirbyteMessage createAirbyteMessage(AirbyteMessage.Type type,
String streamName,
String namespace,
JsonNode data) {
return new AirbyteMessage()
.withType(type)
.withRecord(new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(namespace)
.withData(data)
.withEmittedAt(Instant.now().toEpochMilli()));
}
static AirbyteStream createAirbyteStream(String name, String namespace) {
return new AirbyteStream()
.withName(name)
.withNamespace(namespace);
}
static ConfiguredAirbyteStream createConfiguredAirbyteStream(DestinationSyncMode syncMode, AirbyteStream stream) {
return new ConfiguredAirbyteStream()
.withDestinationSyncMode(syncMode)
.withStream(stream);
}
static ConfiguredAirbyteCatalog createConfiguredAirbyteCatalog(ConfiguredAirbyteStream... configuredStreams) {
return new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStreams));
}
}

View File

@@ -0,0 +1,40 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class CassandraConfigTest {
private CassandraConfig cassandraConfig;
@BeforeEach
void setup() {
var jsonNode = TestDataFactory.createJsonConfig(
"usr",
"pw",
"127.0.0.1",
9042);
this.cassandraConfig = new CassandraConfig(jsonNode);
}
@Test
void testConfig() {
assertThat(cassandraConfig)
.hasFieldOrPropertyWithValue("keyspace", "default_keyspace")
.hasFieldOrPropertyWithValue("username", "usr")
.hasFieldOrPropertyWithValue("password", "pw")
.hasFieldOrPropertyWithValue("address", "127.0.0.1")
.hasFieldOrPropertyWithValue("port", 9042)
.hasFieldOrPropertyWithValue("datacenter", "datacenter1")
.hasFieldOrPropertyWithValue("replication", 1);
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CassandraNameTransformerTest {
private CassandraNameTransformer cassandraNameTransformer;
@BeforeAll
void setup() {
var cassandraConfig = TestDataFactory.createCassandraConfig(
"usr",
"pw",
"127.0.0.1",
9042);
this.cassandraNameTransformer = new CassandraNameTransformer(cassandraConfig);
}
@Test
void testOutputTable() {
var table = cassandraNameTransformer.outputTable("stream_name");
assertThat(table).matches("airbyte_raw_stream_name");
}
@Test
void testOutputTmpTable() {
var table = cassandraNameTransformer.outputTmpTable("stream_name");
assertThat(table).matches("airbyte_tmp_+[a-z]+_stream_name");
}
@Test
void testOutputKeyspace() {
var keyspace = cassandraNameTransformer.outputKeyspace("***keyspace^h");
assertThat(keyspace).matches("keyspace_h");
}
@Test
void outputColumn() {
var column = cassandraNameTransformer.outputColumn("_airbyte_data");
assertThat(column).matches("\"_airbyte_data\"");
}
}

View File

@@ -0,0 +1,77 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.cassandra;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.time.Instant;
import java.util.List;
public class TestDataFactory {
private TestDataFactory() {
}
static CassandraConfig createCassandraConfig(String username, String password, String address, int port) {
return new CassandraConfig(
"default_keyspace",
username,
password,
address,
port,
"datacenter1",
1);
}
static JsonNode createJsonConfig(String username, String password, String address, int port) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("keyspace", "default_keyspace")
.put("username", username)
.put("password", password)
.put("address", address)
.put("port", port)
.put("datacenter", "datacenter1")
.put("replication", 1)
.build());
}
static AirbyteMessage createAirbyteMessage(AirbyteMessage.Type type,
String streamName,
String namespace,
JsonNode data) {
return new AirbyteMessage()
.withType(type)
.withRecord(new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(namespace)
.withData(data)
.withEmittedAt(Instant.now().toEpochMilli()));
}
static AirbyteStream createAirbyteStream(String name, String namespace) {
return new AirbyteStream()
.withName(name)
.withNamespace(namespace);
}
static ConfiguredAirbyteStream createConfiguredAirbyteStream(DestinationSyncMode syncMode, AirbyteStream stream) {
return new ConfiguredAirbyteStream()
.withDestinationSyncMode(syncMode)
.withStream(stream);
}
static ConfiguredAirbyteCatalog createConfiguredAirbyteCatalog(ConfiguredAirbyteStream... configuredStreams) {
return new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStreams));
}
}

View File

@@ -162,6 +162,7 @@
* [Redshift](integrations/destinations/redshift.md)
* [S3](integrations/destinations/s3.md)
* [Snowflake](integrations/destinations/snowflake.md)
* [Cassandra](integrations/destinations/cassandra.md)
* [Custom or New Connector](integrations/custom-connectors.md)
* [Connector Development](connector-development/README.md)
* [Tutorials](connector-development/tutorials/README.md)

View File

@@ -147,4 +147,5 @@ Airbyte uses a grading system for connectors to help users understand what to ex
| [S3](destinations/s3.md) | Certified |
| [SQL Server \(MSSQL\)](destinations/mssql.md) | Alpha |
| [Snowflake](destinations/snowflake.md) | Certified |
| [Cassandra](destinations/cassandra.md) | Alpha |

View File

@@ -0,0 +1,49 @@
# Cassandra
## Sync overview
### Output schema
The incoming airbyte data is structured in keyspaces and tables and is partitioned and replicated across different nodes
in the cluster. This connector maps an incoming `stream` to a Cassandra `table` and a `namespace` to a
Cassandra`keyspace`. Fields in the airbyte message become different columns in the Cassandra tables. Each table will
contain the following columns.
* `_airbyte_ab_id`: A random uuid generator to be used as a partition key.
* `_airbyte_emitted_at`: a timestamp representing when the event was received from the data source.
* `_airbyte_data`: a json text representing the extracted data.
### Features
| Feature | Support | Notes |
| :--- | :---: | :--- |
| Full Refresh Sync | ✅ | Warning: this mode deletes all previously synced data in the configured DynamoDB table. |
| Incremental - Append Sync | ✅ | |
| Incremental - Deduped History | ❌ | As this connector does not support dbt, we don't support this sync mode on this destination. |
| Namespaces | ✅ | Namespace will be used as part of the table name. |
### Performance considerations
Cassandra is designed to handle large amounts of data by using different nodes in the cluster in order to perform write
operations. As long as you have enough nodes in the cluster the database can scale infinitely and handle any amount of
data from the connector.
## Getting started
### Requirements
* The driver is compatible with _Cassandra >= 2.1_
* Configuration
* Keyspace [default keyspace to use when writing data]
* Username [authentication username]
* Password [authentication password]
* Address [cluster address]
* Port [default: 9042]
* Datacenter [optional] [default: datacenter1]
* Replication [optional] [default: 1]
### Setup guide
######TODO: more info, screenshots?, etc...