1
0
mirror of synced 2025-12-23 21:03:15 -05:00

🎉 New Destination: Exasol (#21200)

* initial Exasol connector stub

* CUrrent version

* Exasol connector current version

* Exasol connector current version

* Exasol connector current version

* Exasol connector current version

* Remove unwanted changes

* Upgrade dependencies

* Use Exasol container for integration tests

* Purge database before test

* Fix integration tests

* Adapt to new API

* Fix integration tests

* Use separate fields for host, port and fingerprint

* Deactivate action on forks

* Activate action on forks

* Update documentation

* Adapt type of ID column

* Fix Exasol integration tests

* Pass certificate fingerprint when creating a DB connection
* Use the build directory for temp Exasol credentials

* Add reason for overriding testcontainers dependency

* Replace deprecated method

* Code cleanup

* Enable SSH support

* Use JdbcDestinationAcceptanceTest instead of generic base class

* Remove SSH support (will be added later)

* Apply suggestions from code review

Co-authored-by: Christoph Kuhnke <github@kuhnke.net>

* Improve debugging

* Use default name transformer implementation where possible

* Add unit tests for name transformer

* Add unit tests for destination

* Implement review findings by @ckunki

* Implement review findings by @itaseskii

* Apply suggestions from code review

Co-authored-by: Christoph Kuhnke <github@kuhnke.net>

* Describe data type limitation

* Make schema name mandatory

* Remove workaround for bug in ExasolContainer

* Add test for executing statements in a transaction

Replace Exasol specific code with default method

* add destination-exasol to destination_definitions.yaml

* add destination-exasol to destination_specs.yaml

---------

Co-authored-by: ThomasBestfleisch <thomas.bestfleisch@exasol.com>
Co-authored-by: Ivica Taseski <ivica.taseski94@gmail.com>
Co-authored-by: Christoph Kuhnke <github@kuhnke.net>
Co-authored-by: Sunny <6833405+sh4sh@users.noreply.github.com>
This commit is contained in:
Christoph Pirkl
2023-02-22 00:10:03 +01:00
committed by GitHub
parent aad9d400b1
commit f9bc114d0d
18 changed files with 910 additions and 0 deletions

View File

@@ -128,6 +128,12 @@
documentationUrl: https://docs.airbyte.com/integrations/destinations/elasticsearch
icon: elasticsearch.svg
releaseStage: alpha
- name: Exasol
destinationDefinitionId: bb6071d9-6f34-4766-bec2-d1d4ed81a653
dockerRepository: airbyte/destination-exasol
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/exasol
releaseStage: alpha
- name: Firebolt
destinationDefinitionId: 18081484-02a5-4662-8dba-b270b582f321
dockerRepository: airbyte/destination-firebolt

View File

@@ -2238,6 +2238,71 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-exasol:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/exasol"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Exasol Destination Spec"
type: "object"
required:
- "host"
- "port"
- "username"
- "schema"
additionalProperties: true
properties:
host:
title: "Host"
description: "Hostname of the database."
type: "string"
order: 0
port:
title: "Port"
description: "Port of the database."
type: "integer"
minimum: 0
maximum: 65536
default: 8563
examples:
- "8563"
order: 1
certificateFingerprint:
title: "Certificate Fingerprint"
description: "Fingerprint of the Exasol server's TLS certificate"
type: "string"
examples:
- "ABC123..."
order: 2
username:
title: "User"
description: "Username to use to access the database."
type: "string"
order: 3
password:
title: "Password"
description: "Password associated with the username."
type: "string"
airbyte_secret: true
order: 4
schema:
title: "Schema Name"
description: "Schema Name"
type: "string"
order: 5
jdbc_url_params:
description: "Additional properties to pass to the JDBC URL string when\
\ connecting to the database formatted as 'key=value' pairs separated\
\ by the symbol ';'. (example: key1=value1;key2=value2;key3=value3)."
title: "JDBC URL Params"
type: "string"
order: 6
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-firebolt:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/firebolt"

View File

@@ -20,6 +20,7 @@ public enum DatabaseDriver {
REDSHIFT("com.amazon.redshift.jdbc.Driver", "jdbc:redshift://%s:%d/%s"),
SNOWFLAKE("net.snowflake.client.jdbc.SnowflakeDriver", "jdbc:snowflake://%s/"),
YUGABYTEDB("com.yugabyte.Driver", "jdbc:yugabytedb://%s:%d/%s"),
EXASOL("com.exasol.jdbc.EXADriver", "jdbc:exa:%s:%d"),
TERADATA("com.teradata.jdbc.TeraDriver", "jdbc:teradata://%s/");
private final String driverClassName;

View File

@@ -163,6 +163,7 @@
| Dev Null | [![destination-dev-null](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-dev-null%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/destination-dev-null) |
| Elasticsearch | (Temporarily Not Available) |
| End-to-End Testing | [![destination-e2e-test](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-e2e-test%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/destination-e2e-test) |
| Exasol | [![destination-exasol](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-exasol%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-exasol) |
| Google Cloud Storage (GCS) | [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-gcs%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/destination-gcs) |
| Google Firestore | [![destination-firestore](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-firestore%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/destination-firestore) |
| Google PubSub | [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/connectors/destination-pubsub) |

View File

@@ -0,0 +1,3 @@
*
!Dockerfile
!build

View File

@@ -0,0 +1,18 @@
FROM airbyte/integration-base-java:dev AS build
WORKDIR /airbyte
ENV APPLICATION destination-exasol
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar
FROM airbyte/integration-base-java:dev
WORKDIR /airbyte
ENV APPLICATION destination-exasol
COPY --from=build /airbyte /airbyte
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-exasol

View File

@@ -0,0 +1,67 @@
# Destination Exasol
This is the repository for the Exasol destination connector in Java.
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/exasol).
## Local development
#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:destination-exasol:build
```
#### Create credentials
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.
**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.
### Locally running the connector docker image
#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:destination-exasol:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-exasol:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-exasol:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-exasol:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-exasol:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
The connector uses `JUnit` for Java tests.
### Unit and Integration Tests
Place unit tests under `src/test/io/airbyte/integrations/destinations/exasol`.
#### Acceptance Tests
Airbyte has a standard test suite that all destination connectors must pass.
### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:destination-exasol:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:destination-exasol:integrationTest
```
## Dependency Management
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately according to [semantic versioning](https://semver.org/).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from the Airbyte team will take a look at your PR and iterate with you to merge it into master.

View File

@@ -0,0 +1,19 @@
# Exasol
## Overview
Exasol is the in-memory database built for analytics.
## Endpoints
The destination-exasol connector uses the official [Exasol JDBC driver](https://docs.exasol.com/db/latest/connect_exasol/drivers/jdbc.htm).
## Quick Notes
- TLS connections are used by default. If the Exasol database uses a self-signed certificate, specify the certificate fingerprint.
## Reference
- [Exasol homepage](https://www.exasol.com/)
- [Exasol documentation](https://docs.exasol.com/db/latest/home.htm)
- [Exasol JDBC driver documentation](https://docs.exasol.com/db/latest/connect_exasol/drivers/jdbc.htm)

View File

@@ -0,0 +1,31 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}
application {
mainClass = 'io.airbyte.integrations.destination.exasol.ExasolDestination'
}
dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation 'com.exasol:exasol-jdbc:7.1.17'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
// Explicitly upgrade testcontainers to avoid java.lang.NoSuchMethodError:
// 'org.testcontainers.containers.GenericContainer com.exasol.containers.ExasolContainer.withCopyToContainer(org.testcontainers.images.builder.Transferable, java.lang.String)'
testImplementation 'org.testcontainers:testcontainers:1.17.6'
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-exasol')
integrationTestJavaImplementation 'com.exasol:exasol-testcontainers:6.5.0'
integrationTestJavaImplementation 'org.testcontainers:testcontainers:1.17.6'
}

View File

@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.exasol;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import java.util.HashMap;
import java.util.Map;
public class ExasolDestination extends AbstractJdbcDestination implements Destination {
public static final String DRIVER_CLASS = DatabaseDriver.EXASOL.getDriverClassName();
public ExasolDestination() {
super(DRIVER_CLASS, new ExasolSQLNameTransformer(), new ExasolSqlOperations());
}
public static void main(String[] args) throws Exception {
new IntegrationRunner(new ExasolDestination()).run(args);
}
@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final String jdbcUrl = String.format(DatabaseDriver.EXASOL.getUrlFormatString(),
config.get(JdbcUtils.HOST_KEY).asText(), config.get(JdbcUtils.PORT_KEY).asInt());
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
.put(JdbcUtils.JDBC_URL_KEY, jdbcUrl)
.put("schema", config.get(JdbcUtils.SCHEMA_KEY).asText());
if (config.has(JdbcUtils.PASSWORD_KEY)) {
configBuilder.put(JdbcUtils.PASSWORD_KEY, config.get(JdbcUtils.PASSWORD_KEY).asText());
}
if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
}
return Jsons.jsonNode(configBuilder.build());
}
@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
Map<String, String> properties = new HashMap<>();
properties.put("autocommit", "0");
if (config.has("certificateFingerprint")) {
properties.put("fingerprint", config.get("certificateFingerprint").asText());
}
return properties;
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.exasol;
import io.airbyte.commons.text.Names;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
public class ExasolSQLNameTransformer extends ExtendedNameTransformer {
@Override
public String applyDefaultCase(final String input) {
return input.toUpperCase();
}
@Override
public String getRawTableName(final String streamName) {
// Exasol identifiers starting with _ must be quoted
return Names.doubleQuote(super.getRawTableName(streamName));
}
@Override
public String getTmpTableName(final String streamName) {
// Exasol identifiers starting with _ must be quoted
return Names.doubleQuote(super.getTmpTableName(streamName));
}
@Override
public String convertStreamName(final String input) {
// Sometimes the stream name is already quoted, so remove quotes before converting.
// Exasol identifiers starting with _ must be quoted.
return Names.doubleQuote(super.convertStreamName(unquote(input)));
}
private static String unquote(final String input) {
String result = input;
if(result.startsWith("\"")) {
result = result.substring(1);
}
if(result.endsWith("\"")) {
result = result.substring(0, result.length()-1);
}
return result;
}
}

View File

@@ -0,0 +1,72 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.exasol;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
public class ExasolSqlOperations extends JdbcSqlOperations {
public static final String COLUMN_NAME_AB_ID =
"\"" + JavaBaseConstants.COLUMN_NAME_AB_ID.toUpperCase() + "\"";
public static final String COLUMN_NAME_DATA =
"\"" + JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase() + "\"";
public static final String COLUMN_NAME_EMITTED_AT =
"\"" + JavaBaseConstants.COLUMN_NAME_EMITTED_AT.toUpperCase() + "\"";
@Override
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
String query = String.format("""
CREATE TABLE IF NOT EXISTS %s.%s (
%s VARCHAR(64),
%s VARCHAR(2000000),
%s TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(%s)
)""",
schemaName, tableName,
ExasolSqlOperations.COLUMN_NAME_AB_ID,
ExasolSqlOperations.COLUMN_NAME_DATA,
ExasolSqlOperations.COLUMN_NAME_EMITTED_AT,
ExasolSqlOperations.COLUMN_NAME_AB_ID);
LOGGER.info("Create table query: {}", query);
return query;
}
@Override
public void executeTransaction(final JdbcDatabase database, final List<String> queries) throws Exception {
database.executeWithinTransaction(queries);
}
@Override
protected void insertRecordsInternal(JdbcDatabase database, List<AirbyteRecordMessage> records, String schemaName, String tableName) throws Exception {
if (records.isEmpty()) {
return;
}
Path tmpFile = createBatchFile(tableName, records);
try {
String importStatement = String.format("""
IMPORT INTO %s.%s
FROM LOCAL CSV FILE '%s'
ROW SEPARATOR = 'CRLF'
COLUMN SEPARATOR = ','""", schemaName, tableName, tmpFile.toAbsolutePath());
LOGGER.info("IMPORT statement: {}", importStatement);
database.execute(connection -> connection.createStatement().execute(importStatement));
} finally {
Files.delete(tmpFile);
}
}
private Path createBatchFile(String tableName, List<AirbyteRecordMessage> records) throws Exception {
Path tmpFile = Files.createTempFile(tableName + "-", ".tmp");
writeBatchToFile(tmpFile.toFile(), records);
return tmpFile;
}
}

View File

@@ -0,0 +1,64 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/exasol",
"supportsIncremental": true,
"supportsNormalization": false,
"supportsDBT": false,
"supported_destination_sync_modes": ["overwrite", "append"],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Exasol Destination Spec",
"type": "object",
"required": ["host", "port", "username", "schema"],
"additionalProperties": true,
"properties": {
"host": {
"title": "Host",
"description": "Hostname of the database.",
"type": "string",
"order": 0
},
"port": {
"title": "Port",
"description": "Port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 8563,
"examples": ["8563"],
"order": 1
},
"certificateFingerprint": {
"title": "Certificate Fingerprint",
"description": "Fingerprint of the Exasol server's TLS certificate",
"type": "string",
"examples": ["ABC123..."],
"order": 2
},
"username": {
"title": "User",
"description": "Username to use to access the database.",
"type": "string",
"order": 3
},
"password": {
"title": "Password",
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true,
"order": 4
},
"schema": {
"title": "Schema Name",
"description": "Schema Name",
"type": "string",
"order": 5
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol ';'. (example: key1=value1;key2=value2;key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 6
}
}
}
}

View File

@@ -0,0 +1,147 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.exasol;
import com.exasol.containers.ExasolContainer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import org.jooq.DSLContext;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ExasolDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ExasolDestinationAcceptanceTest.class);
private static final ExasolContainer<? extends ExasolContainer<?>> EXASOL = new ExasolContainer<>()
.withReuse(true);
private final NamingConventionTransformer namingResolver = new ExasolSQLNameTransformer();
private static JsonNode config;
@BeforeAll
static void startExasolContainer() {
EXASOL.start();
config = createExasolConfig(EXASOL);
}
private static JsonNode createExasolConfig(ExasolContainer<? extends ExasolContainer<?>> exasol) {
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, exasol.getHost())
.put(JdbcUtils.PORT_KEY, exasol.getFirstMappedDatabasePort())
.put("certificateFingerprint", exasol.getTlsCertificateFingerprint().orElseThrow())
.put(JdbcUtils.USERNAME_KEY, exasol.getUsername())
.put(JdbcUtils.PASSWORD_KEY, exasol.getPassword())
.put(JdbcUtils.SCHEMA_KEY, "TEST")
.build());
}
@AfterAll
static void stopExasolContainer() {
EXASOL.stop();
}
@Override
protected String getImageName() {
return "airbyte/destination-exasol:dev";
}
@Override
protected JsonNode getConfig() {
return Jsons.clone(config);
}
@Override
protected JsonNode getFailCheckConfig() {
final JsonNode clone = Jsons.clone(getConfig());
((ObjectNode) clone).put(JdbcUtils.PASSWORD_KEY, "wrong password");
return clone;
}
@Override
protected boolean supportBasicDataTypeTest() {
return true;
}
@Override
protected boolean supportArrayDataTypeTest() {
return true;
}
@Override
protected boolean supportObjectDataTypeTest() {
return true;
}
@Override
protected boolean implementsNamespaces() {
return true;
}
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema) throws SQLException {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), "\""+namespace+"\"")
.stream()
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase()))
.map(node -> Jsons.deserialize(node.asText()))
.collect(Collectors.toList());
}
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, ExasolSqlOperations.COLUMN_NAME_EMITTED_AT);
LOGGER.info("Retrieving records using query {}", query);
try (final DSLContext dslContext = getDSLContext(config)) {
final List<org.jooq.Record> result = new Database(dslContext)
.query(ctx -> new ArrayList<>(ctx.fetch(query)));
return result
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList());
}
}
private static DSLContext getDSLContext(final JsonNode config) {
String jdbcUrl = String.format(DatabaseDriver.EXASOL.getUrlFormatString(), config.get(JdbcUtils.HOST_KEY).asText(), config.get(JdbcUtils.PORT_KEY).asInt());
Map<String, String> jdbcConnectionProperties = Map.of("fingerprint", config.get("certificateFingerprint").asText());
return DSLContextFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.get(JdbcUtils.PASSWORD_KEY).asText(),
DatabaseDriver.EXASOL.getDriverClassName(),
jdbcUrl,
null,
jdbcConnectionProperties);
}
@Override
protected void setup(TestDestinationEnv testEnv) {
// Nothing to do
}
@Override
protected void tearDown(TestDestinationEnv testEnv) {
EXASOL.purgeDatabase();
}
}

View File

@@ -0,0 +1,88 @@
package io.airbyte.integrations.destination.exasol;
import com.exasol.containers.ExasolContainer;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
class ExasolSqlOperationsAcceptanceTest {
private static final ExasolContainer<? extends ExasolContainer<?>> EXASOL = new ExasolContainer<>()
.withReuse(true);
private ExasolSqlOperations operations;
@BeforeAll
static void startExasolContainer() {
EXASOL.start();
}
@AfterAll
static void stopExasolContainer() {
EXASOL.stop();
}
@BeforeEach void setup() {
this.operations = new ExasolSqlOperations();
EXASOL.purgeDatabase();
}
@Test
void executeTransactionEmptyList() {
assertDoesNotThrow(() -> executeTransaction());
}
@Test
void executeTransactionSingleStatementSuccess() throws Exception {
executeTransaction("CREATE SCHEMA TESTING_SCHEMA");
assertSchemaExists("TESTING_SCHEMA", true);
}
@Test
void executeTransactionTowStatementsSuccess() throws Exception {
executeTransaction("CREATE SCHEMA TESTING_SCHEMA", "CREATE TABLE TESTING_TABLE (C1 VARCHAR(5))");
assertSchemaExists("TESTING_SCHEMA", true);
assertTableExists("TESTING_SCHEMA", "TESTING_TABLE");
}
@Test
void executeTransactionTwoStatementsFailure() throws Exception {
assertThrows(SQLSyntaxErrorException.class, () -> executeTransaction("CREATE SCHEMA TESTING_SCHEMA", "INVALID STATEMENT"));
assertSchemaExists("TESTING_SCHEMA", false);
}
private static void assertSchemaExists(String schemaName, boolean exists) throws SQLException {
try (ResultSet rs = EXASOL.createConnection().getMetaData().getSchemas(null, schemaName)) {
assertThat("Schema exists", rs.next(), equalTo(exists));
}
}
private static void assertTableExists(String schemaName, String tableName) throws SQLException {
try (ResultSet rs = EXASOL.createConnection().getMetaData().getTables(null, schemaName, tableName, null)) {
assertThat("Table exists", rs.next(), equalTo(true));
}
}
private void executeTransaction(String... statements) throws Exception {
this.operations.executeTransaction(createDatabase(), Arrays.asList(statements));
}
private JdbcDatabase createDatabase() {
DataSource dataSource = DataSourceFactory.create(EXASOL.getUsername(), EXASOL.getPassword(), ExasolDestination.DRIVER_CLASS, EXASOL.getJdbcUrl());
return new DefaultJdbcDatabase(dataSource);
}
}

View File

@@ -0,0 +1,86 @@
package io.airbyte.integrations.destination.exasol;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.db.jdbc.JdbcUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
class ExasolDestinationTest {
private ExasolDestination destination;
@BeforeEach
void setup() {
destination = new ExasolDestination();
}
private JsonNode createConfig() {
return createConfig(new HashMap<>());
}
private JsonNode createConfig(final Map<String, Object> additionalConfigs) {
return Jsons.jsonNode(MoreMaps.merge(baseParameters(), additionalConfigs));
}
private Map<String, Object> baseParameters() {
return ImmutableMap.<String, Object>builder()
.put(JdbcUtils.HOST_KEY, "localhost")
.put(JdbcUtils.PORT_KEY, "8563")
.put(JdbcUtils.USERNAME_KEY, "sys")
.put(JdbcUtils.SCHEMA_KEY, "mySchema")
.build();
}
@Test
void toJdbcConfigDefault() {
var result = destination.toJdbcConfig(createConfig());
assertAll(
() -> assertThat(result.size(), equalTo(3)),
() -> assertThat(result.get(JdbcUtils.USERNAME_KEY).asText(), equalTo("sys")),
() -> assertThat(result.get(JdbcUtils.JDBC_URL_KEY).asText(), equalTo("jdbc:exa:localhost:8563")),
() -> assertThat(result.get(JdbcUtils.SCHEMA_KEY).asText(), equalTo("mySchema"))
);
}
@Test
void toJdbcConfigWithPassword() {
var result = destination.toJdbcConfig(createConfig(Map.of(JdbcUtils.PASSWORD_KEY, "exasol")));
assertAll(
() -> assertThat(result.size(), equalTo(4)),
() -> assertThat(result.get(JdbcUtils.PASSWORD_KEY).asText(), equalTo("exasol"))
);
}
@Test
void toJdbcConfigWithJdbcUrlParameters() {
var result = destination.toJdbcConfig(createConfig(Map.of(JdbcUtils.JDBC_URL_PARAMS_KEY, "param=value")));
assertAll(
() -> assertThat(result.size(), equalTo(4)),
() -> assertThat(result.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText(), equalTo("param=value"))
);
}
@Test
void getDefaultConnectionProperties() {
var result = destination.getDefaultConnectionProperties(createConfig());
assertThat(result, equalTo(Map.of("autocommit", "0")));
}
@Test
void getDefaultConnectionPropertiesWithFingerprint() {
var result = destination.getDefaultConnectionProperties(createConfig(Map.of("certificateFingerprint", "ABC")));
assertThat(result, equalTo(Map.of("fingerprint", "ABC", "autocommit", "0")));
}
}

View File

@@ -0,0 +1,65 @@
package io.airbyte.integrations.destination.exasol;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import static org.junit.jupiter.api.Assertions.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
class ExasolSQLNameTransformerTest {
private ExasolSQLNameTransformer transformer;
@BeforeEach
void setUp() {
transformer = new ExasolSQLNameTransformer();
}
@ParameterizedTest
@CsvSource({"text, TEXT", "Text, TEXT", "TEXT, TEXT", "_äöüß, _ÄÖÜSS"})
void applyDefaultCase(String input, String expectedOutput) {
assertEquals(expectedOutput, transformer.applyDefaultCase(input));
}
@ParameterizedTest
@CsvSource({"stream, \"_airbyte_raw_stream\"",
"Stream, \"_airbyte_raw_Stream\"",
"stream*, \"_airbyte_raw_stream_\"",
"äöü, \"_airbyte_raw_aou\""})
void getRawTableName(String streamName, String expectedTableName) {
assertEquals(expectedTableName, transformer.getRawTableName(streamName));
}
@Test
void getTmpTableNamePrefixSuffix() {
String tmpTableName = transformer.getTmpTableName("stream");
assertThat(tmpTableName, allOf(
startsWith("\"_airbyte_tmp_"),
endsWith("_stream\"")));
}
@Test
void getTmpTableNameDifferentForEachCall() {
String name1 = transformer.getTmpTableName("stream");
String name2 = transformer.getTmpTableName("stream");
assertThat(name1, not(equalTo(name2)));
}
@ParameterizedTest
@CsvSource({"stream, stream",
"Stream, Stream",
"STREAM, STREAM",
"stream*, stream_",
"_stream_, _stream_",
"äöü, aou",
"\"stream, stream",
"stream\", stream",
"\"stream\", stream",})
void convertStreamName(String streamName, String expectedTableName) {
assertThat(transformer.convertStreamName(streamName), equalTo("\"" + expectedTableName + "\""));
}
}

View File

@@ -0,0 +1,72 @@
# Exasol
Exasol is the in-memory database built for analytics.
## Sync overview
### Output schema
Each Airbyte Stream becomes an Exasol table and each Airbyte Field becomes an Exasol column. Each Exasol table created by Airbyte will contain 3 columns:
* `_AIRBYTE_AB_ID`: a uuid assigned by Airbyte to each event that is processed. The column type in Exasol is `VARCHAR(64)`.
* `_AIRBYTE_DATA`: a json blob representing with the event data. The column type in Exasol is `VARCHAR(2000000)`.
* `_AIRBYTE_EMITTED_AT`: a timestamp representing when the event was pulled from the data source. The column type in Exasol is `TIMESTAMP`.
### Features
The Exasol destination supports the following features:
| Feature | Supported? (Yes/No) | Notes |
| :--- | :--- | :--- |
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
| Incremental - Deduped History | No | |
| Normalization | No | |
| Namespaces | Yes | |
| SSL connection | Yes | TLS |
| SSH Tunnel Support | No | |
### Limitations
#### Maximum data size two million characters
Exasol does not have a special data type for storing data of arbitrary length or JSON. That's why this connector uses type `VARCHAR(2000000)` for storing Airbyte data.
## Getting started
### Requirements
To use the Exasol destination, you'll need Exasol database version 7.1 or above.
#### Network Access
Make sure your Exasol database can be accessed by Airbyte. If your database is within a VPC, you may need to allow access from the IP you're using to expose Airbyte.
#### **Permissions**
As Airbyte namespaces allow to store data into different schemas, there are different scenarios requiring different permissions assigned to the user account. The following table describes 4 scenarios regarding the login user and the destination user.
| Login user | Destination user | Required permissions | Comment |
| :--- | :--- | :--- | :--- |
| DBA User | Any user | - | |
| Regular user | Same user as login | Create, drop and write table, create session | |
| Regular user | Any existing user | Create, drop and write ANY table, create session | Grants can be provided on a system level by DBA or by target user directly |
| Regular user | Not existing user | Create, drop and write ANY table, create user, create session | Grants should be provided on a system level by DBA |
We highly recommend creating an Airbyte-specific user for this purpose.
### Setup guide
You should now have all the requirements needed to configure Exasol as a destination in the UI. You'll need the following information to configure the Exasol destination:
* Host
* Port
* Fingerprint of the Exasol server's TLS certificate (if the database uses a self-signed certificate)
* Username
* Password
## Changelog
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------|
| 0.1.0 | 2023-01-?? | [21200](https://github.com/airbytehq/airbyte/pull/21200) | Initial version of the Exasol destination |