1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Redshift Destination & refactoring to introduce destination-jdbc

Closes #193
Closes #1126
This commit is contained in:
Christophe Duong
2020-12-03 18:07:46 +01:00
committed by GitHub
parent 91bb8397ba
commit d06392e900
45 changed files with 2073 additions and 341 deletions

View File

@@ -43,4 +43,19 @@ public class Names {
.replaceAll(NON_ALPHANUMERIC_AND_UNDERSCORE_PATTERN, "_");
}
/**
* Concatenate Strings together, but handles the case where the first string is already quoted
*
* @param name Quoted or Unquoted name to append more characters to
* @param suffix additional string to concatenate
* @return name and suffix concatenated together
*/
public static String concatQuotedNames(String name, String suffix) {
if (name.endsWith("\"")) {
return name.substring(0, name.length() - 1) + suffix + "\"";
} else {
return name + suffix;
}
}
}

View File

@@ -2,6 +2,6 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.1.6",
"dockerImageTag": "0.1.7",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-postgres-destination"
}

View File

@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift"
}

View File

@@ -6,7 +6,7 @@
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://hub.docker.com/r/airbyte/integration-singer-postgres-destination
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
@@ -18,3 +18,8 @@
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift

View File

@@ -6,8 +6,10 @@ plugins {
dependencies {
implementation 'commons-cli:commons-cli:1.4'
implementation project(':airbyte-db')
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-queue')
implementation files(project(':airbyte-integrations:bases:base').airbyteDocker.outputs)
}

View File

@@ -0,0 +1,141 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import com.google.common.base.Charsets;
import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.CloseableQueue;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.queue.BigQueue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BufferedStreamConsumer extends FailureTrackingConsumer<AirbyteMessage> implements DestinationConsumerStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(BufferedStreamConsumer.class);
private static final long THREAD_DELAY_MILLIS = 500L;
private static final long GRACEFUL_SHUTDOWN_MINUTES = 5L;
private static final int MIN_RECORDS = 500;
private static final int BATCH_SIZE = 500;
private final BufferedWriteOperations destination;
private Map<String, DestinationWriteContext> writeConfigs;
private Map<String, CloseableQueue<byte[]>> writeBuffers;
private final ScheduledExecutorService writerPool;
private final ConfiguredAirbyteCatalog catalog;
public BufferedStreamConsumer(BufferedWriteOperations destination, ConfiguredAirbyteCatalog catalog) {
this.destination = destination;
this.writeConfigs = new HashMap<>();
this.writeBuffers = new HashMap<>();
this.writerPool = Executors.newSingleThreadScheduledExecutor();
this.catalog = catalog;
Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofMinutes(GRACEFUL_SHUTDOWN_MINUTES), writerPool));
}
@Override
public void setContext(Map<String, DestinationWriteContext> configs) throws IOException {
writeConfigs = configs;
writeBuffers = new HashMap<>();
final Path queueRoot = Files.createTempDirectory("queues");
for (String streamName : configs.keySet()) {
final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(streamName), streamName);
writeBuffers.put(streamName, writeBuffer);
}
writerPool.scheduleWithFixedDelay(
() -> writeStreamsWithNRecords(MIN_RECORDS, BATCH_SIZE, writeConfigs, writeBuffers, destination),
THREAD_DELAY_MILLIS,
THREAD_DELAY_MILLIS,
TimeUnit.MILLISECONDS);
}
private static void writeStreamsWithNRecords(int minRecords,
int batchSize,
Map<String, DestinationWriteContext> writeConfigs,
Map<String, CloseableQueue<byte[]>> writeBuffers,
BufferedWriteOperations destination) {
for (final Map.Entry<String, DestinationWriteContext> entry : writeConfigs.entrySet()) {
final String schemaName = entry.getValue().getOutputNamespaceName();
final String tmpTableName = entry.getValue().getOutputTableName();
final CloseableQueue<byte[]> writeBuffer = writeBuffers.get(entry.getKey());
while (writeBuffer.size() > minRecords) {
try {
destination.insertBufferedRecords(batchSize, writeBuffer, schemaName, tmpTableName);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
@Override
protected void acceptTracked(AirbyteMessage message) throws Exception {
// ignore other message types.
if (message.getType() == AirbyteMessage.Type.RECORD) {
if (!writeConfigs.containsKey(message.getRecord().getStream())) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(message)));
}
writeBuffers.get(message.getRecord().getStream()).offer(Jsons.serialize(message.getRecord()).getBytes(Charsets.UTF_8));
}
}
@Override
protected void close(boolean hasFailed) throws Exception {
if (hasFailed) {
LOGGER.error("executing on failed close procedure.");
// kill executor pool fast.
writerPool.shutdown();
writerPool.awaitTermination(1, TimeUnit.SECONDS);
} else {
LOGGER.info("executing on success close procedure.");
// shutdown executor pool with time to complete writes.
writerPool.shutdown();
writerPool.awaitTermination(GRACEFUL_SHUTDOWN_MINUTES, TimeUnit.MINUTES);
// write anything that is left in the buffers.
writeStreamsWithNRecords(0, BATCH_SIZE, writeConfigs, writeBuffers, destination);
}
for (CloseableQueue<byte[]> writeBuffer : writeBuffers.values()) {
writeBuffer.close();
}
}
}

View File

@@ -0,0 +1,33 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import io.airbyte.commons.lang.CloseableQueue;
public interface BufferedWriteOperations {
void insertBufferedRecords(int batchSize, CloseableQueue<byte[]> writeBuffer, String namespace, String streamName) throws Exception;
}

View File

@@ -0,0 +1,38 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import io.airbyte.protocol.models.AirbyteMessage;
import java.util.Map;
/**
* DestinationConsumer configured with a DestinationWriteContext configuration object to slightly
* change their behavior at runtime.
*/
public interface DestinationConsumerStrategy extends DestinationConsumer<AirbyteMessage> {
void setContext(Map<String, DestinationWriteContext> configs) throws Exception;
}

View File

@@ -0,0 +1,46 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import io.airbyte.protocol.models.SyncMode;
/**
* This configuration is used by the TmpToFinalTable consumers to configure their behavior on where
* to apply their task and data operations
*/
public class DestinationCopyContext extends DestinationWriteContext {
private final String inputTableName;
DestinationCopyContext(String outputNamespaceName, String inputTableName, String outputTableName, SyncMode syncMode) {
super(outputNamespaceName, outputTableName, syncMode);
this.inputTableName = inputTableName;
}
public String getInputTableName() {
return inputTableName;
}
}

View File

@@ -0,0 +1,57 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import io.airbyte.protocol.models.SyncMode;
/**
* This configuration is used by the RecordConsumers to adapt their behavior at runtime such as
* where to apply their task and the kind of data operations
*/
public class DestinationWriteContext {
private final String outputNamespaceName;
private final String outputTableName;
private final SyncMode syncMode;
DestinationWriteContext(String outputNamespaceName, String outputTableName, SyncMode syncMode) {
this.outputNamespaceName = outputNamespaceName;
this.outputTableName = outputTableName;
this.syncMode = syncMode;
}
public String getOutputNamespaceName() {
return outputNamespaceName;
}
public String getOutputTableName() {
return outputTableName;
}
public SyncMode getSyncMode() {
return syncMode;
}
}

View File

@@ -0,0 +1,74 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.text.Names;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.util.HashMap;
import java.util.Map;
/**
* Factory class to convert from config and catalog objects into DestinationWriterContext
* configuration object. This configuration is then used by the RecordConsumers configure their
* behavior on where to apply their task and data operations
*/
public class DestinationWriteContextFactory {
private final SQLNamingResolvable namingResolver;
public DestinationWriteContextFactory(SQLNamingResolvable namingResolver) {
this.namingResolver = namingResolver;
}
public Map<String, DestinationWriteContext> build(JsonNode config, ConfiguredAirbyteCatalog catalog) {
Map<String, DestinationWriteContext> result = new HashMap<>();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getStream().getName();
final String schemaName = getNamingResolver().getIdentifier(getSchemaName(config, stream));
final String tableName = Names.concatQuotedNames(getNamingResolver().getIdentifier(streamName), "_raw");
final SyncMode syncMode = stream.getSyncMode() != null ? stream.getSyncMode() : SyncMode.FULL_REFRESH;
result.put(streamName, new DestinationWriteContext(schemaName, tableName, syncMode));
}
return result;
}
protected String getSchemaName(JsonNode config, ConfiguredAirbyteStream stream) {
// do we need to retrieve another more specific schema from this stream?
if (config.has("schema")) {
return config.get("schema").asText();
} else {
return "public";
}
}
public SQLNamingResolvable getNamingResolver() {
return namingResolver;
}
}

View File

@@ -0,0 +1,39 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
/**
* Necessary Operations to manipulate table creation and row insertions that may be required by some
* RecordConsumer to properly function.
*/
public interface InsertTableOperations {
void createDestinationTable(String schemaName, String tmpTableName) throws Exception;
void truncateTable(String schemaName, String tableName) throws Exception;
void insertIntoFromSelect(String schemaName, String srcTableName, String dstTableName) throws Exception;
}

View File

@@ -0,0 +1,40 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
/**
* Necessary Operations to manipulate schema and table creation (or deletion) that may be required
* by some RecordConsumer to properly function.
*
*/
public interface TableCreationOperations {
void createSchema(String schemaName) throws Exception;
void createDestinationTable(String schemaName, String tmpTableName) throws Exception;
void dropDestinationTable(String schemaName, String tmpTableName) throws Exception;
}

View File

@@ -0,0 +1,114 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import io.airbyte.commons.text.Names;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.SyncMode;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This consumer will delegate actual Record Consumer tasks to Consumers provided in the
* constructors.
*
* However this class will actually dispatch such records towards a temporary consumer first to
* accumulate data in a staging location.
*
* If this temporary operation ends successfully without errors, then the second consumer to
* finalize a copy from temporary location to final destination will be invoked by this consumer.
*/
public class TmpDestinationConsumer extends FailureTrackingConsumer<AirbyteMessage> implements DestinationConsumerStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(TmpDestinationConsumer.class);
private final TableCreationOperations destination;
private final DestinationConsumerStrategy tmpDestinationConsumer;
private final TmpToFinalTable finalDestinationConsumer;
private Map<String, DestinationWriteContext> tmpConfigs;
public TmpDestinationConsumer(TableCreationOperations destination,
DestinationConsumerStrategy tmpDestinationConsumer,
TmpToFinalTable finalDestinationConsumer) {
this.destination = destination;
this.tmpDestinationConsumer = tmpDestinationConsumer;
this.finalDestinationConsumer = finalDestinationConsumer;
tmpConfigs = new HashMap<>();
}
@Override
public void setContext(Map<String, DestinationWriteContext> configs) throws Exception {
final Set<String> schemaSet = new HashSet<>();
tmpConfigs = new HashMap<>();
final Map<String, DestinationCopyContext> copyConfigs = new HashMap<>();
for (Entry<String, DestinationWriteContext> entry : configs.entrySet()) {
DestinationWriteContext config = entry.getValue();
final String schemaName = config.getOutputNamespaceName();
final String tableName = config.getOutputTableName();
final String tmpTableName = Names.concatQuotedNames(tableName, "_" + Instant.now().toEpochMilli());
DestinationWriteContext tmpConfig = new DestinationWriteContext(schemaName, tmpTableName, SyncMode.FULL_REFRESH);
tmpConfigs.put(entry.getKey(), tmpConfig);
DestinationCopyContext copyConfig = new DestinationCopyContext(schemaName, tmpTableName, tableName, config.getSyncMode());
copyConfigs.put(entry.getKey(), copyConfig);
if (!schemaSet.contains(schemaName)) {
destination.createSchema(schemaName);
schemaSet.add(schemaName);
}
destination.createDestinationTable(schemaName, tmpTableName);
}
tmpDestinationConsumer.setContext(tmpConfigs);
finalDestinationConsumer.setContext(copyConfigs);
}
@Override
protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception {
tmpDestinationConsumer.accept(airbyteMessage);
}
@Override
protected void close(boolean hasFailed) throws Exception {
tmpDestinationConsumer.close();
if (!hasFailed) {
LOGGER.info("executing on success close procedure.");
finalDestinationConsumer.execute();
}
for (Entry<String, DestinationWriteContext> entry : tmpConfigs.entrySet()) {
final String schemaName = entry.getValue().getOutputNamespaceName();
final String tmpTableName = entry.getValue().getOutputTableName();
destination.dropDestinationTable(schemaName, tmpTableName);
}
}
}

View File

@@ -0,0 +1,40 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import java.util.Map;
/**
* Interface to move data from one temporary location to a final target destination
*
* Parameters per String are first set by the setContext methods before executing the actual move
*/
public interface TmpToFinalTable {
void setContext(Map<String, DestinationCopyContext> configs);
void execute() throws Exception;
}

View File

@@ -0,0 +1,74 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
/**
* Implementation of TmpToFinalTable in order to move data from a tmp destination to a final target.
*
* This implementation is geared towards databases using queries based on Truncate table & Insert
* Into SQL statements. (The truncate operation is performed depending on the sync mode incremental
* or not, in which case data is appended to the table when inserting)
*/
public class TruncateInsertIntoConsumer implements TmpToFinalTable {
private final InsertTableOperations destination;
private Map<String, DestinationCopyContext> copyConfigs;
public TruncateInsertIntoConsumer(InsertTableOperations destination) {
this.destination = destination;
this.copyConfigs = new HashMap<>();
}
@Override
public void setContext(Map<String, DestinationCopyContext> configs) {
copyConfigs = configs;
}
@Override
public void execute() throws Exception {
if (copyConfigs.isEmpty()) {
throw new RuntimeException("copyConfigs is empty, did you setContext() beforehand?");
}
for (Entry<String, DestinationCopyContext> entry : copyConfigs.entrySet()) {
final DestinationCopyContext config = entry.getValue();
final String schemaName = config.getOutputNamespaceName();
final String srcTableName = config.getInputTableName();
final String dstTableName = config.getOutputTableName();
destination.createDestinationTable(schemaName, dstTableName);
switch (config.getSyncMode()) {
case FULL_REFRESH -> destination.truncateTable(schemaName, dstTableName);
case INCREMENTAL -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + config.getSyncMode());
}
destination.insertIntoFromSelect(schemaName, srcTableName, dstTableName);
}
}
}

View File

@@ -50,7 +50,7 @@
{%- endmacro %}
{% macro redshift__json_extract(json_column, json_path_list) -%}
json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
case when json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }}) != '' then json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }}) end
{%- endmacro %}
{% macro snowflake__json_extract(json_column, json_path_list) -%}
@@ -76,7 +76,7 @@
{%- endmacro %}
{% macro redshift__json_extract_scalar(json_column, json_path_list) -%}
json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }})
case when json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }}) != '' then json_extract_path_text({{ adapter.quote_as_configured(json_column, 'identifier')|trim }}, {{ format_json_path(json_path_list) }}) end
{%- endmacro %}
{% macro snowflake__json_extract_scalar(json_column, json_path_list) -%}

View File

@@ -28,4 +28,5 @@ from enum import Enum
class DestinationType(Enum):
bigquery = "bigquery"
postgres = "postgres"
redshift = "redshift"
snowflake = "snowflake"

View File

@@ -34,6 +34,7 @@ import yaml
class DestinationType(Enum):
bigquery = "bigquery"
postgres = "postgres"
redshift = "redshift"
snowflake = "snowflake"
@@ -70,6 +71,7 @@ class TransformConfig:
transformed_integration_config = {
DestinationType.bigquery: self.transform_bigquery,
DestinationType.postgres: self.transform_postgres,
DestinationType.redshift: self.transform_redshift,
DestinationType.snowflake: self.transform_snowflake,
}[integration_type](config)
@@ -113,6 +115,22 @@ class TransformConfig:
return dbt_config
def transform_redshift(self, config: dict):
print("transform_redshift")
dbt_config = dict()
# https://docs.getdbt.com/reference/warehouse-profiles/redshift-profile
dbt_config["type"] = "redshift"
dbt_config["host"] = config["host"]
dbt_config["user"] = config["username"]
dbt_config["pass"] = config["password"]
dbt_config["port"] = config["port"]
dbt_config["dbname"] = config["database"]
dbt_config["schema"] = config["schema"]
dbt_config["threads"] = 32
return dbt_config
def transform_snowflake(self, config: dict):
print("transform_snowflake")
dbt_config = dict()

View File

@@ -1,8 +1,8 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-08-29T00:00:00Z", "NZD": 0.12, "HKD": 2.13}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-08-31"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589100, "data": { "date": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD": 7.15}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "date": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD": 7.15}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-01"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589200, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589300, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.99, "HKD": 7.99, "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589400, "data": { "date": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.99, "HKD": 7.99, "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "date": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}

View File

@@ -0,0 +1,3 @@
*
!Dockerfile
!build

View File

@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev
WORKDIR /airbyte
ENV APPLICATION destination-jdbc
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-jdbc

View File

@@ -0,0 +1,24 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
// todo: needs standard destination test
}
application {
mainClass = 'io.airbyte.integrations.destination.jdbc.JdbcDestination'
}
dependencies {
implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
testImplementation "org.testcontainers:postgresql:1.15.0-rc2"
integrationTestImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2"
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}

View File

@@ -0,0 +1,7 @@
# JDBC Destination
We are not planning to expose this destination in the UI yet. It serves as a base upon which we can build all of our other JDBC-compliant destinations.
The reasons we are not exposing this destination by itself are:
1. It is not terribly user-friendly (jdbc urls are hard for a human to parse)
1. Each JDBC-compliant db, we need to make sure the appropriate drivers are installed on the image. We don't want to frontload installing all possible drivers, and instead would like to be more methodical. Instead for each JDBC-compliant destination, we will extend this one and then install only the necessary JDBC drivers on that destination's image.

View File

@@ -0,0 +1,202 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.jdbc;
import static org.jooq.impl.DSL.currentSchema;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.CloseableQueue;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.BufferedStreamConsumer;
import io.airbyte.integrations.base.BufferedWriteOperations;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.DestinationConsumerStrategy;
import io.airbyte.integrations.base.DestinationWriteContext;
import io.airbyte.integrations.base.DestinationWriteContextFactory;
import io.airbyte.integrations.base.InsertTableOperations;
import io.airbyte.integrations.base.SQLNamingResolvable;
import io.airbyte.integrations.base.TableCreationOperations;
import io.airbyte.integrations.base.TmpDestinationConsumer;
import io.airbyte.integrations.base.TmpToFinalTable;
import io.airbyte.integrations.base.TruncateInsertIntoConsumer;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Map;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractJdbcDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class);
protected static final String COLUMN_NAME = "data";
private final String driverClass;
private final SQLDialect dialect;
private final SQLNamingResolvable namingResolver;
public AbstractJdbcDestination(final String driverClass, final SQLDialect dialect, final SQLNamingResolvable namingResolver) {
this.driverClass = driverClass;
this.dialect = dialect;
this.namingResolver = namingResolver;
}
@Override
public ConnectorSpecification spec() throws IOException {
// return a JsonSchema representation of the spec for the integration.
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}
@Override
public AirbyteConnectionStatus check(JsonNode config) {
try (final Database database = getDatabase(config)) {
// attempt to get current schema. this is a cheap query to sanity check that we can connect to the
// database. `currentSchema()` is a jooq method that will run the appropriate query based on which
// database it is connected to.
database.query(this::getCurrentDatabaseName);
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.debug("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Can't connect with provided configuration.");
}
}
protected Database getDatabase(JsonNode config) {
final JsonNode jdbcConfig = toJdbcConfig(config);
return Databases.createDatabase(
jdbcConfig.get("username").asText(),
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
jdbcConfig.get("jdbc_url").asText(),
driverClass,
dialect);
}
public abstract JsonNode toJdbcConfig(JsonNode config);
private String getCurrentDatabaseName(DSLContext ctx) {
return ctx.select(currentSchema()).fetch().get(0).getValue(0, String.class);
}
@Override
public SQLNamingResolvable getNamingResolver() {
return namingResolver;
}
@Override
public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception {
final Map<String, DestinationWriteContext> writeConfigs =
new DestinationWriteContextFactory(getNamingResolver()).build(config, catalog);
final DestinationImpl destination = new DestinationImpl(getDatabase(config));
DestinationConsumerStrategy buffer = new BufferedStreamConsumer(destination, catalog);
TmpToFinalTable commit = new TruncateInsertIntoConsumer(destination);
DestinationConsumerStrategy result = new TmpDestinationConsumer(destination, buffer, commit);
result.setContext(writeConfigs);
return result;
}
protected String getDefaultSchemaName(JsonNode config) {
if (config.has("schema")) {
return config.get("schema").asText();
} else {
return "public";
}
}
protected String createSchemaQuery(String schemaName) {
return String.format("CREATE SCHEMA IF NOT EXISTS %s;\n", schemaName);
}
protected abstract String createDestinationTableQuery(String schemaName, String tableName);
protected String dropDestinationTableQuery(String schemaName, String tableName) {
return String.format("DROP TABLE IF EXISTS %s.%s;\n", schemaName, tableName);
}
protected String truncateTableQuery(String schemaName, String tableName) {
return String.format("TRUNCATE TABLE %s.%s;\n", schemaName, tableName);
}
protected String insertIntoFromSelectQuery(String schemaName, String srcTableName, String dstTableName) {
return String.format("INSERT INTO %s.%s SELECT * FROM %s.%s;\n", schemaName, dstTableName, schemaName, srcTableName);
}
protected abstract String insertBufferedRecordsQuery(int batchSize, CloseableQueue<byte[]> writeBuffer, String schemaName, String tableName);
private class DestinationImpl implements TableCreationOperations, InsertTableOperations, BufferedWriteOperations {
private final Database database;
public DestinationImpl(Database database) {
this.database = database;
}
@Override
public void createSchema(String schemaName) throws Exception {
database.query(ctx -> ctx.execute(createSchemaQuery(schemaName)));
}
@Override
public void createDestinationTable(String schemaName, String tableName) throws SQLException {
database.query(ctx -> ctx.execute(createDestinationTableQuery(schemaName, tableName)));
}
@Override
public void insertBufferedRecords(int batchSize, CloseableQueue<byte[]> writeBuffer, String schemaName, String tableName) throws Exception {
database.query(ctx -> ctx.execute(insertBufferedRecordsQuery(batchSize, writeBuffer, schemaName, tableName)));
}
@Override
public void truncateTable(String schemaName, String tableName) throws Exception {
database.query(ctx -> ctx.execute(truncateTableQuery(schemaName, tableName)));
}
@Override
public void insertIntoFromSelect(String schemaName, String srcTableName, String dstTableName) throws Exception {
database.query(ctx -> ctx.execute(insertIntoFromSelectQuery(schemaName, srcTableName, dstTableName)));
}
@Override
public void dropDestinationTable(String schemaName, String tableName) throws SQLException {
database.query(ctx -> ctx.execute(dropDestinationTableQuery(schemaName, tableName)));
}
}
}

View File

@@ -0,0 +1,102 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.jdbc;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.table;
import static org.jooq.impl.DSL.unquotedName;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.CloseableQueue;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.ExtendedSQLNaming;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.UUID;
import org.jooq.InsertValuesStep3;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JdbcDestination extends AbstractJdbcDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcDestination.class);
public JdbcDestination() {
super("org.postgresql.Driver", SQLDialect.POSTGRES, new ExtendedSQLNaming());
}
// no-op for JdbcIntegration since the config it receives is designed to be use for JDBC.
@Override
public JsonNode toJdbcConfig(JsonNode config) {
return config;
}
@Override
public String createDestinationTableQuery(String schemaName, String tableName) {
return String.format(
"CREATE TABLE IF NOT EXISTS %s.%s ( \n"
+ "ab_id VARCHAR PRIMARY KEY,\n"
+ "%s JSONB,\n"
+ "emitted_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n"
+ ");\n",
schemaName, tableName, COLUMN_NAME);
}
@Override
public String insertBufferedRecordsQuery(int batchSize, CloseableQueue<byte[]> writeBuffer, String schemaName, String tableName) {
InsertValuesStep3<Record, String, JSONB, OffsetDateTime> step =
DSL.insertInto(table(unquotedName(schemaName, tableName)), field("ab_id", String.class),
field(COLUMN_NAME, JSONB.class), field("emitted_at", OffsetDateTime.class));
for (int i = 0; i < batchSize; i++) {
final byte[] record = writeBuffer.poll();
if (record == null) {
break;
}
final AirbyteRecordMessage message = Jsons.deserialize(new String(record, Charsets.UTF_8), AirbyteRecordMessage.class);
step = step.values(UUID.randomUUID().toString(), JSONB.valueOf(Jsons.serialize(message.getData())),
OffsetDateTime.of(LocalDateTime.ofEpochSecond(message.getEmittedAt() / 1000, 0, ZoneOffset.UTC), ZoneOffset.UTC));
}
return step.toString();
}
public static void main(String[] args) throws Exception {
final Destination destination = new JdbcDestination();
LOGGER.info("starting destination: {}", JdbcDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", JdbcDestination.class);
}
}

View File

@@ -0,0 +1,31 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "JDBC Destination Spec",
"type": "object",
"required": ["username", "jdbc_url"],
"additionalProperties": false,
"properties": {
"username": {
"description": "Username to use to access the database.",
"type": "string"
},
"password": {
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true
},
"jdbc_url": {
"description": "JDBC formatted url. See the standard <a href=\"https://docs.oracle.com/cd/E17952_01/connector-j-8.0-en/connector-j-reference-jdbc-url-format.html\">here</a>.",
"type": "string"
},
"schema": {
"description": "Unless specifically configured, the usual value for this field is \"public\".",
"type": "string",
"examples": ["public"],
"default": "public"
}
}
}
}

View File

@@ -0,0 +1,145 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.jdbc;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.ExtendedSQLNaming;
import io.airbyte.integrations.standardtest.destination.TestDestination;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.testcontainers.containers.PostgreSQLContainer;
public class JdbcIntegrationTest extends TestDestination {
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
private static final String RAW_DATA_COLUMN = "data";
private PostgreSQLContainer<?> db;
private final ExtendedSQLNaming namingResolver = new ExtendedSQLNaming();
@Override
protected String getImageName() {
return "airbyte/destination-jdbc:dev";
}
@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("schema", "public")
.put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()))
.build());
}
@Override
protected JsonNode getFailCheckConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("username", db.getUsername())
.put("password", "wrong password")
.put("schema", "public")
.put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()))
.build());
}
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName) throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName))
.stream()
.map(r -> Jsons.deserialize(r.get(RAW_DATA_COLUMN).asText()))
.collect(Collectors.toList());
}
@Override
protected boolean implementsBasicNormalization() {
return false;
}
@Override
protected boolean implementsIncremental() {
return true;
}
@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv env, String streamName)
throws Exception {
String tableName = namingResolver.getIdentifier(streamName);
if (!tableName.startsWith("\"")) {
// Currently, Normalization always quote tables identifiers
tableName = "\"" + tableName + "\"";
}
return retrieveRecordsFromTable(tableName);
}
@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}
private List<JsonNode> retrieveRecordsFromTable(String tableName) throws SQLException {
return Databases.createPostgresDatabase(db.getUsername(), db.getPassword(),
db.getJdbcUrl()).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s ORDER BY emitted_at ASC;", tableName))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
@Override
protected void setup(TestDestinationEnv testEnv) {
db = new PostgreSQLContainer<>("postgres:13-alpine");
db.start();
}
@Override
protected void tearDown(TestDestinationEnv testEnv) {
db.stop();
db.close();
}
}

View File

@@ -0,0 +1,316 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.jdbc;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
class JdbcDestinationTest {
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
private static final Instant NOW = Instant.now();
private static final String USERS_STREAM_NAME = "users";
private static final String TASKS_STREAM_NAME = "tasks-list";
private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build()))
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "susan").put("id", "30").build()))
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_TASKS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "announce the game.").build()))
.withEmittedAt(NOW.toEpochMilli()));
// also used for testing quote escaping
private static final AirbyteMessage MESSAGE_TASKS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "ship some 'code'.").build()))
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build())));
private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("id", JsonSchemaPrimitive.STRING)),
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING))));
private PostgreSQLContainer<?> container;
private JsonNode config;
private Database database;
@BeforeEach
void setup() {
container = new PostgreSQLContainer<>("postgres:13-alpine");
container.start();
config = createConfig("public");
database = Databases.createPostgresDatabase(
config.get("username").asText(),
config.get("password").asText(),
config.get("jdbc_url").asText());
}
@AfterEach
void tearDown() throws Exception {
database.close();
container.close();
}
// todo - same test as csv destination
@Test
void testSpec() throws IOException {
final ConnectorSpecification actual = new JdbcDestination().spec();
final String resourceString = MoreResources.readResource("spec.json");
final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class);
assertEquals(expected, actual);
}
// todo - same test as csv destination
@Test
void testCheckSuccess() {
final AirbyteConnectionStatus actual = new JdbcDestination().check(config);
final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
assertEquals(expected, actual);
}
@Test
void testCheckFailure() {
((ObjectNode) config).put("password", "fake");
final AirbyteConnectionStatus actual = new JdbcDestination().check(config);
final AirbyteConnectionStatus expected = new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Can't connect with provided configuration.");
assertEquals(expected, actual);
}
@Test
void testWriteSuccess() throws Exception {
final JdbcDestination destination = new JdbcDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, CATALOG);
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
consumer.accept(MESSAGE_USERS2);
consumer.accept(MESSAGE_TASKS2);
consumer.accept(MESSAGE_STATE);
consumer.close();
Set<JsonNode> usersActual = recordRetriever(destination.getNamingResolver().getRawTableName(USERS_STREAM_NAME));
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);
Set<JsonNode> tasksActual = recordRetriever(destination.getNamingResolver().getRawTableName(TASKS_STREAM_NAME));
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);
assertTmpTablesNotPresent(
CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getStream).map(AirbyteStream::getName).collect(Collectors.toList()));
}
@Test
void testWriteIncremental() throws Exception {
final ConfiguredAirbyteCatalog catalog = Jsons.clone(CATALOG);
catalog.getStreams().forEach(stream -> stream.withSyncMode(SyncMode.INCREMENTAL));
final JdbcDestination destination = new JdbcDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, catalog);
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
consumer.accept(MESSAGE_USERS2);
consumer.accept(MESSAGE_TASKS2);
consumer.accept(MESSAGE_STATE);
consumer.close();
final DestinationConsumer<AirbyteMessage> consumer2 = destination.write(config, catalog);
final AirbyteMessage messageUser3 = new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "michael").put("id", "87").build()))
.withEmittedAt(NOW.toEpochMilli()));
consumer2.accept(messageUser3);
consumer2.close();
Set<JsonNode> usersActual = recordRetriever(destination.getNamingResolver().getRawTableName(USERS_STREAM_NAME));
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(
MESSAGE_USERS1.getRecord().getData(),
MESSAGE_USERS2.getRecord().getData(),
messageUser3.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);
Set<JsonNode> tasksActual = recordRetriever(destination.getNamingResolver().getRawTableName(TASKS_STREAM_NAME));
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);
assertTmpTablesNotPresent(
CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getStream).map(AirbyteStream::getName).collect(Collectors.toList()));
}
@Test
void testWriteNewSchema() throws Exception {
JsonNode newConfig = createConfig("new_schema");
final JdbcDestination destination = new JdbcDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(newConfig, CATALOG);
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
consumer.accept(MESSAGE_USERS2);
consumer.accept(MESSAGE_TASKS2);
consumer.accept(MESSAGE_STATE);
consumer.close();
final String schemaName = destination.getNamingResolver().getIdentifier("new_schema");
String streamName = schemaName + "." + destination.getNamingResolver().getRawTableName(USERS_STREAM_NAME);
Set<JsonNode> usersActual = recordRetriever(streamName);
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);
streamName = schemaName + "." + destination.getNamingResolver().getRawTableName(TASKS_STREAM_NAME);
Set<JsonNode> tasksActual = recordRetriever(streamName);
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);
assertTmpTablesNotPresent(
CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getStream).map(AirbyteStream::getName).collect(Collectors.toList()));
assertThrows(RuntimeException.class, () -> recordRetriever(destination.getNamingResolver().getRawTableName(USERS_STREAM_NAME)));
}
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
void testWriteFailure() throws Exception {
// hack to force an exception to be thrown from within the consumer.
final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1);
doThrow(new RuntimeException()).when(spiedMessage).getRecord();
final JdbcDestination destination = new JdbcDestination();
final DestinationConsumer<AirbyteMessage> consumer = spy(destination.write(config, CATALOG));
assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage));
consumer.accept(MESSAGE_USERS2);
consumer.close();
final List<String> tableNames = CATALOG.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(s -> destination.getNamingResolver().getRawTableName(s.getName()))
.collect(Collectors.toList());
assertTmpTablesNotPresent(CATALOG.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(AirbyteStream::getName)
.collect(Collectors.toList()));
// assert that no tables were created.
assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith)));
}
private List<String> fetchNamesOfTablesInDb() throws SQLException {
return database.query(
ctx -> ctx.fetch("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';"))
.stream()
.map(record -> (String) record.get("table_name")).collect(Collectors.toList());
}
private void assertTmpTablesNotPresent(List<String> tableNames) throws SQLException {
Set<String> tmpTableNamePrefixes = tableNames.stream().map(name -> name + "_\\d+").collect(Collectors.toSet());
assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tmpTableNamePrefixes.stream().anyMatch(tableName::matches)));
}
private Set<JsonNode> recordRetriever(String streamName) throws Exception {
return database.query(ctx -> ctx
.fetch(String.format("SELECT * FROM %s ORDER BY emitted_at ASC;", streamName))
.stream()
.peek(record -> {
// ensure emitted_at is not in the future
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
OffsetDateTime emitted_at = record.get("emitted_at", OffsetDateTime.class);
assertTrue(now.toEpochSecond() >= emitted_at.toEpochSecond());
})
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.map(r -> Jsons.deserialize(r.get(JdbcDestination.COLUMN_NAME).asText()))
.collect(Collectors.toSet()));
}
private JsonNode createConfig(String schemaName) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("schema", schemaName)
.put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s",
container.getHost(),
container.getFirstMappedPort(),
container.getDatabaseName()))
.build());
}
}

View File

@@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/destination-postgres

View File

@@ -2,6 +2,7 @@ plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
// todo: needs standard destination test
}
application {
@@ -12,7 +13,7 @@ dependencies {
implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-queue')
implementation project(':airbyte-integrations:connectors:destination-jdbc')
testImplementation "org.testcontainers:postgresql:1.15.0-rc2"

View File

@@ -30,370 +30,83 @@ import static org.jooq.impl.DSL.unquotedName;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.CloseableQueue;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.SQLNamingResolvable;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.queue.BigQueue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.jooq.DSLContext;
import org.jooq.InsertValuesStep3;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PostgresDestination implements Destination {
public class PostgresDestination extends AbstractJdbcDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresDestination.class);
static final String COLUMN_NAME = "data";
private final SQLNamingResolvable namingResolver;
protected static final String COLUMN_NAME = AbstractJdbcDestination.COLUMN_NAME;
public PostgresDestination() {
namingResolver = new PostgresSQLNaming();
super("org.postgresql.Driver", SQLDialect.POSTGRES, new PostgresSQLNaming());
}
@Override
public ConnectorSpecification spec() throws IOException {
// return a jsonschema representation of the spec for the integration.
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}
public JsonNode toJdbcConfig(JsonNode config) {
ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()))
.put("schema", getDefaultSchemaName(config));
@Override
public AirbyteConnectionStatus check(JsonNode config) {
try (final Database database = getDatabase(config)) {
database.query(ctx -> ctx.execute(
"SELECT *\n"
+ "FROM pg_catalog.pg_tables\n"
+ "WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' LIMIT 1;"));
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
// todo (cgardens) - better error messaging for common cases. e.g. wrong password.
return new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Can't connect with provided configuration.");
if (config.has("password")) {
configBuilder.put("password", config.get("password").asText());
}
return Jsons.jsonNode(configBuilder.build());
}
@Override
public SQLNamingResolvable getNamingResolver() {
return namingResolver;
}
/**
* Strategy:
* <p>
* 1. Create a temporary table for each stream
* </p>
* <p>
* 2. Accumulate records in a buffer. One buffer per stream.
* </p>
* <p>
* 3. As records accumulate write them in batch to the database. We set a minimum numbers of records
* before writing to avoid wasteful record-wise writes.
* </p>
* <p>
* 4. Once all records have been written to buffer, flush the buffer and write any remaining records
* to the database (regardless of how few are left).
* </p>
* <p>
* 5. In a single transaction, delete the target tables if they exist and rename the temp tables to
* the final table name.
* </p>
*
* @param config - integration-specific configuration object as json. e.g. { "username": "airbyte",
* "password": "super secure" }
* @param catalog - schema of the incoming messages.
* @return consumer that writes singer messages to the database.
* @throws Exception - anything could happen!
*/
@Override
public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception {
// connect to db.
final Database database = getDatabase(config);
final Map<String, WriteConfig> writeBuffers = new HashMap<>();
final Set<String> schemaSet = new HashSet<>();
// create tmp tables if not exist
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getStream().getName();
final String schemaName = getNamingResolver().getIdentifier(getConfigSchemaName(config));
final String tableName = getNamingResolver().getRawTableName(streamName);
final String tmpTableName = getNamingResolver().getTmpTableName(streamName);
if (!schemaSet.contains(schemaName)) {
database.query(ctx -> ctx.execute(createSchemaQuery(schemaName)));
schemaSet.add(schemaName);
}
database.query(ctx -> ctx.execute(createRawTableQuery(schemaName, tmpTableName)));
final Path queueRoot = Files.createTempDirectory("queues");
final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(streamName), streamName);
final SyncMode syncMode = stream.getSyncMode() == null ? SyncMode.FULL_REFRESH : stream.getSyncMode();
writeBuffers.put(streamName, new WriteConfig(schemaName, tableName, tmpTableName, writeBuffer, syncMode));
}
// write to tmp tables
// if success copy delete main table if exists. rename tmp tables to real tables.
return new RecordConsumer(database, writeBuffers, catalog);
}
static String createSchemaQuery(String schemaName) {
return String.format("CREATE SCHEMA IF NOT EXISTS %s;\n", schemaName);
}
static String createRawTableQuery(String schemaName, String streamName) {
protected String createDestinationTableQuery(String schemaName, String tableName) {
return String.format(
"CREATE TABLE IF NOT EXISTS %s.%s ( \n"
+ "ab_id VARCHAR PRIMARY KEY,\n"
+ "%s JSONB,\n"
+ "emitted_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n"
+ ");\n",
schemaName, streamName, COLUMN_NAME);
schemaName, tableName, COLUMN_NAME);
}
public static class RecordConsumer extends FailureTrackingConsumer<AirbyteMessage> implements DestinationConsumer<AirbyteMessage> {
@Override
protected String insertBufferedRecordsQuery(int batchSize, CloseableQueue<byte[]> writeBuffer, String schemaName, String tableName) {
InsertValuesStep3<Record, String, JSONB, OffsetDateTime> step =
DSL.insertInto(
table(unquotedName(schemaName, tableName)),
field("ab_id", String.class),
field(COLUMN_NAME, JSONB.class),
field("emitted_at", OffsetDateTime.class));
private static final long THREAD_DELAY_MILLIS = 500L;
private static final long GRACEFUL_SHUTDOWN_MINUTES = 5L;
private static final int MIN_RECORDS = 500;
private static final int BATCH_SIZE = 500;
private final ScheduledExecutorService writerPool;
private final Database database;
private final Map<String, WriteConfig> writeConfigs;
private final ConfiguredAirbyteCatalog catalog;
public RecordConsumer(Database database, Map<String, WriteConfig> writeConfigs, ConfiguredAirbyteCatalog catalog) {
this.database = database;
this.writeConfigs = writeConfigs;
this.catalog = catalog;
this.writerPool = Executors.newSingleThreadScheduledExecutor();
// todo (cgardens) - how long? boh.
Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofMinutes(GRACEFUL_SHUTDOWN_MINUTES), writerPool));
writerPool.scheduleWithFixedDelay(
() -> writeStreamsWithNRecords(MIN_RECORDS, BATCH_SIZE, writeConfigs, database),
THREAD_DELAY_MILLIS,
THREAD_DELAY_MILLIS,
TimeUnit.MILLISECONDS);
}
/**
* Write records from buffer to postgres in batch.
*
* @param minRecords - the minimum number of records in the buffer before writing. helps avoid
* wastefully writing one record at a time.
* @param batchSize - the maximum number of records to write in a single insert.
* @param writeBuffers - map of stream name to its respective buffer.
* @param database - connection to the db.
*/
private static void writeStreamsWithNRecords(int minRecords,
int batchSize,
Map<String, WriteConfig> writeBuffers,
Database database) {
for (final Map.Entry<String, WriteConfig> entry : writeBuffers.entrySet()) {
final String schemaName = entry.getValue().getSchemaName();
final String tmpTableName = entry.getValue().getTmpTableName();
final CloseableQueue<byte[]> writeBuffer = entry.getValue().getWriteBuffer();
while (writeBuffer.size() > minRecords) {
try {
database.query(ctx -> buildWriteQuery(ctx, batchSize, writeBuffer, schemaName, tmpTableName).execute());
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
for (int i = 0; i < batchSize; i++) {
final byte[] record = writeBuffer.poll();
if (record == null) {
break;
}
final AirbyteRecordMessage message = Jsons.deserialize(new String(record, Charsets.UTF_8), AirbyteRecordMessage.class);
step = step.values(
UUID.randomUUID().toString(),
JSONB.valueOf(Jsons.serialize(message.getData())),
OffsetDateTime.of(LocalDateTime.ofEpochSecond(message.getEmittedAt() / 1000, 0, ZoneOffset.UTC), ZoneOffset.UTC));
}
// build the following query:
// INSERT INTO <schemaName>.<tableName>(data)
// VALUES
// ({ "my": "data" }),
// ({ "my": "data" });
private static InsertValuesStep3<Record, String, JSONB, OffsetDateTime> buildWriteQuery(DSLContext ctx,
int batchSize,
CloseableQueue<byte[]> writeBuffer,
String schemaName,
String tmpTableName) {
InsertValuesStep3<Record, String, JSONB, OffsetDateTime> step =
ctx.insertInto(table(unquotedName(schemaName, tmpTableName)), field("ab_id", String.class),
field(COLUMN_NAME, JSONB.class), field("emitted_at", OffsetDateTime.class));
for (int i = 0; i < batchSize; i++) {
final byte[] record = writeBuffer.poll();
if (record == null) {
break;
}
final AirbyteRecordMessage message = Jsons.deserialize(new String(record, Charsets.UTF_8), AirbyteRecordMessage.class);
step = step.values(UUID.randomUUID().toString(), JSONB.valueOf(Jsons.serialize(message.getData())),
OffsetDateTime.of(LocalDateTime.ofEpochSecond(message.getEmittedAt() / 1000, 0, ZoneOffset.UTC), ZoneOffset.UTC));
}
return step;
}
@Override
public void acceptTracked(AirbyteMessage message) {
// ignore other message types.
if (message.getType() == AirbyteMessage.Type.RECORD) {
if (!writeConfigs.containsKey(message.getRecord().getStream())) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(message)));
}
writeConfigs.get(message.getRecord().getStream()).getWriteBuffer().offer(Jsons.serialize(message.getRecord()).getBytes(Charsets.UTF_8));
}
}
@Override
public void close(boolean hasFailed) throws Exception {
if (hasFailed) {
LOGGER.error("executing on failed close procedure.");
// kill executor pool fast.
writerPool.shutdown();
writerPool.awaitTermination(1, TimeUnit.SECONDS);
} else {
LOGGER.error("executing on success close procedure.");
// shutdown executor pool with time to complete writes.
writerPool.shutdown();
writerPool.awaitTermination(GRACEFUL_SHUTDOWN_MINUTES, TimeUnit.MINUTES);
// write anything that is left in the buffers.
writeStreamsWithNRecords(0, 500, writeConfigs, database);
database.transaction(ctx -> {
final StringBuilder query = new StringBuilder();
for (final WriteConfig writeConfig : writeConfigs.values()) {
// create tables if not exist.
final String schemaName = writeConfig.getSchemaName();
query.append(createRawTableQuery(writeConfig.getSchemaName(), writeConfig.getTableName()));
switch (writeConfig.getSyncMode()) {
case FULL_REFRESH -> {
// truncate table if already exist.
query.append(String.format("TRUNCATE TABLE %s.%s;\n", writeConfig.getSchemaName(), writeConfig.getTableName()));
}
case INCREMENTAL -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}
// always copy data from tmp table into "main" table.
query.append(String.format("INSERT INTO %s.%s SELECT * FROM %s.%s;\n", writeConfig.getSchemaName(),
writeConfig.getTableName(), writeConfig.getSchemaName(), writeConfig.getTmpTableName()));
}
return ctx.execute(query.toString());
});
}
// close buffers.
for (final WriteConfig writeConfig : writeConfigs.values()) {
writeConfig.getWriteBuffer().close();
}
cleanupTmpTables(database, writeConfigs);
database.close();
}
private static void cleanupTmpTables(Database database, Map<String, WriteConfig> writeConfigs) {
for (WriteConfig writeConfig : writeConfigs.values()) {
try {
database.query(
ctx -> ctx.execute(String.format("DROP TABLE IF EXISTS %s.%s;", writeConfig.getSchemaName(), writeConfig.getTmpTableName())));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
}
private static class WriteConfig {
private final String schemaName;
private final String tableName;
private final String tmpTableName;
private final CloseableQueue<byte[]> writeBuffer;
private final SyncMode syncMode;
private WriteConfig(String schemaName, String tableName, String tmpTableName, CloseableQueue<byte[]> writeBuffer, SyncMode syncMode) {
this.schemaName = schemaName;
this.tableName = tableName;
this.tmpTableName = tmpTableName;
this.writeBuffer = writeBuffer;
this.syncMode = syncMode;
}
public String getSchemaName() {
return schemaName;
}
public String getTableName() {
return tableName;
}
public String getTmpTableName() {
return tmpTableName;
}
public CloseableQueue<byte[]> getWriteBuffer() {
return writeBuffer;
}
public SyncMode getSyncMode() {
return syncMode;
}
}
private Database getDatabase(JsonNode config) {
return Databases.createPostgresDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:postgresql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));
}
private static String getConfigSchemaName(JsonNode config) {
if (config.has("schema")) {
return config.get("schema").asText();
} else {
return "public";
}
return step.toString();
}
public static void main(String[] args) throws Exception {

View File

@@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Postgres Destination Spec",
"type": "object",
"required": ["host", "port", "username", "database", "schema"],
"required": ["host", "port", "username", "database"],
"additionalProperties": false,
"properties": {
"host": {

View File

@@ -0,0 +1,3 @@
*
!Dockerfile
!build

View File

@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev
WORKDIR /airbyte
ENV APPLICATION destination-redshift
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-redshift

View File

@@ -0,0 +1,36 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
// todo: needs standard destination test
}
application {
mainClass = 'io.airbyte.integrations.destination.redshift.RedshiftDestination'
}
repositories {
maven {
url "https://repository.mulesoft.org/nexus/content/repositories/public/"
}
}
dependencies {
implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation 'com.amazon.redshift:redshift-jdbc42:1.2.43.1067'
testImplementation 'org.apache.commons:commons-text:1.9'
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.apache.commons:commons-dbcp2:2.7.0'
testImplementation project(':airbyte-test-utils')
integrationTestImplementation project(':airbyte-integrations:bases:standard-destination-test')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}

View File

@@ -0,0 +1,116 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.redshift;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.table;
import static org.jooq.impl.DSL.unquotedName;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.CloseableQueue;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.UUID;
import org.jooq.InsertValuesStep3;
import org.jooq.Record;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RedshiftDestination extends AbstractJdbcDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestination.class);
protected static final String COLUMN_NAME = AbstractJdbcDestination.COLUMN_NAME;
public RedshiftDestination() {
super("com.amazon.redshift.jdbc.Driver", null, new RedshiftSQLNaming());
}
@Override
public JsonNode toJdbcConfig(JsonNode redshiftConfig) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("username", redshiftConfig.get("username").asText())
.put("password", redshiftConfig.get("password").asText())
.put("jdbc_url", String.format("jdbc:redshift://%s:%s/%s",
redshiftConfig.get("host").asText(),
redshiftConfig.get("port").asText(),
redshiftConfig.get("database").asText()))
.put("schema", getDefaultSchemaName(redshiftConfig))
.build());
}
@Override
protected String createDestinationTableQuery(String schemaName, String tableName) {
return String.format(
"CREATE TABLE IF NOT EXISTS %s.%s ( \n"
+ "ab_id VARCHAR PRIMARY KEY,\n"
+ "%s VARCHAR(max),\n"
+ "emitted_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n"
+ ");\n",
schemaName, tableName, COLUMN_NAME);
}
@Override
protected String insertBufferedRecordsQuery(int batchSize,
CloseableQueue<byte[]> writeBuffer,
String schemaName,
String tmpTableName) {
InsertValuesStep3<Record, String, String, OffsetDateTime> step = DSL.insertInto(
table(unquotedName(schemaName, tmpTableName)),
field("ab_id", String.class),
field(COLUMN_NAME, String.class),
field("emitted_at", OffsetDateTime.class));
for (int i = 0; i < batchSize; i++) {
final byte[] record = writeBuffer.poll();
if (record == null) {
break;
}
final AirbyteRecordMessage message = Jsons.deserialize(new String(record, Charsets.UTF_8), AirbyteRecordMessage.class);
step = step.values(
UUID.randomUUID().toString(),
Jsons.serialize(message.getData()),
OffsetDateTime.of(LocalDateTime.ofEpochSecond(message.getEmittedAt() / 1000, 0, ZoneOffset.UTC), ZoneOffset.UTC));
}
return step.toString();
}
public static void main(String[] args) throws Exception {
final Destination destination = new RedshiftDestination();
LOGGER.info("starting destination: {}", RedshiftDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", RedshiftDestination.class);
}
}

View File

@@ -22,7 +22,9 @@
* SOFTWARE.
*/
package io.airbyte.integrations.base;
package io.airbyte.integrations.destination.redshift;
import io.airbyte.integrations.base.ExtendedSQLNaming;
public class RedshiftSQLNaming extends ExtendedSQLNaming {

View File

@@ -0,0 +1,50 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Redshift Destination Spec",
"type": "object",
"required": ["host", "port", "database", "username", "password"],
"additionalProperties": false,
"properties": {
"host": {
"description": "Host Endpoint of the Redshift Cluster (must include the cluster-id, region and end with .redshift.amazonaws.com)",
"type": "string"
},
"port": {
"description": "Port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 5439,
"examples": ["5439"]
},
"username": {
"description": "Username to use to access the database.",
"type": "string"
},
"password": {
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true
},
"database": {
"description": "Name of the database.",
"type": "string"
},
"schema": {
"description": "Unless specifically configured, the usual value for this field is \"public\".",
"type": "string",
"examples": ["public"],
"default": "public"
},
"basic_normalization": {
"type": "boolean",
"default": false,
"description": "Whether or not to normalize the data in the destination. See <a href=\"https://docs.airbyte.io/architecture/basic-normalization.md\">basic normalization</a> for more details.",
"title": "Basic Normalization",
"examples": [true, false]
}
}
}
}

View File

@@ -0,0 +1,156 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.redshift;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.ExtendedSQLNaming;
import io.airbyte.integrations.standardtest.destination.TestDestination;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
public class RedshiftIntegrationTest extends TestDestination {
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
private static final String COLUMN_NAME = "data";
// config from which to create / delete schemas.
private JsonNode baseConfig;
// config which refers to the schema that the test is being run in.
private JsonNode config;
private final ExtendedSQLNaming namingResolver = new ExtendedSQLNaming();
@Override
protected String getImageName() {
return "airbyte/destination-redshift:dev";
}
@Override
protected JsonNode getConfig() {
return config;
}
private static JsonNode getStaticConfig() {
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")));
}
@Override
protected JsonNode getFailCheckConfig() {
final JsonNode invalidConfig = Jsons.clone(config);
((ObjectNode) invalidConfig).put("password", "wrong password");
return invalidConfig;
}
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName) throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName))
.stream()
.map(j -> Jsons.deserialize(j.get(COLUMN_NAME).asText()))
.collect(Collectors.toList());
}
@Override
protected boolean implementsBasicNormalization() {
return true;
}
@Override
protected boolean implementsIncremental() {
return true;
}
@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName) throws Exception {
String tableName = namingResolver.getIdentifier(streamName);
if (!tableName.startsWith("\"")) {
// Currently, Normalization always quote tables identifiers
tableName = "\"" + tableName + "\"";
}
return retrieveRecordsFromTable(tableName);
}
@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}
private List<JsonNode> retrieveRecordsFromTable(String tableName) throws SQLException {
final String schemaName = config.get("schema").asText();
return getDatabase().query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY emitted_at ASC;", schemaName, tableName))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
// for each test we create a new schema in the database. run the test in there and then remove it.
@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
final String schemaName = ("integration_test_" + RandomStringUtils.randomAlphanumeric(5));
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
baseConfig = getStaticConfig();
getDatabase().query(ctx -> ctx.execute(createSchemaQuery));
final JsonNode configForSchema = Jsons.clone(baseConfig);
((ObjectNode) configForSchema).put("schema", schemaName);
config = configForSchema;
}
@Override
protected void tearDown(TestDestinationEnv testEnv) throws Exception {
final String dropSchemaQuery = String.format("DROP SCHEMA IF EXISTS %s CASCADE", config.get("schema").asText());
getDatabase().query(ctx -> ctx.execute(dropSchemaQuery));
}
private Database getDatabase() {
return Databases.createDatabase(
baseConfig.get("username").asText(),
baseConfig.get("password").asText(),
String.format("jdbc:redshift://%s:%s/%s",
baseConfig.get("host").asText(),
baseConfig.get("port").asText(),
baseConfig.get("database").asText()),
"com.amazon.redshift.jdbc.Driver", null);
}
}

View File

@@ -1,5 +1,5 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "JDBC Source Spec",
@@ -13,7 +13,8 @@
},
"password": {
"description": "Password associated with the username.",
"type": "string"
"type": "string",
"airbyte_secret": true
},
"jdbc_url": {
"description": "JDBC formatted url. See the standard <a href=\"https://docs.oracle.com/cd/E17952_01/connector-j-8.0-en/connector-j-reference-jdbc-url-format.html\">here</a>.",

View File

@@ -53,6 +53,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
public enum DestinationType {
BIGQUERY,
POSTGRES,
REDSHIFT,
SNOWFLAKE
}

View File

@@ -38,6 +38,7 @@ public class NormalizationRunnerFactory {
ImmutableMap.<String, DefaultNormalizationRunner.DestinationType>builder()
.put("airbyte/destination-bigquery", DefaultNormalizationRunner.DestinationType.BIGQUERY)
.put("airbyte/destination-postgres", DefaultNormalizationRunner.DestinationType.POSTGRES)
.put("airbyte/destination-redshift", DefaultNormalizationRunner.DestinationType.REDSHIFT)
.put("airbyte/destination-snowflake", DefaultNormalizationRunner.DestinationType.SNOWFLAKE)
.build();

View File

@@ -26,6 +26,7 @@
* [Microsoft SQL Server \(MSSQL\)](integrations/sources/mssql.md)
* [Postgres](integrations/sources/postgres.md)
* [Recurly](integrations/sources/recurly.md)
* [Redshift](integrations/sources/redshift.md)
* [Salesforce](integrations/sources/salesforce.md)
* [Sendgrid](integrations/sources/sendgrid.md)
* [Shopify](integrations/sources/shopify.md)
@@ -35,6 +36,7 @@
* [BigQuery](integrations/destinations/bigquery.md)
* [Local CSV](integrations/destinations/local-csv.md)
* [Postgres](integrations/destinations/postgres.md)
* [Redshift](integrations/destinations/redshift.md)
* [Snowflake](integrations/destinations/snowflake.md)
* [Custom or New Connector](integrations/custom-connectors.md)
* [Connector Changelog](integrations/integrations-changelog.md)

View File

@@ -2,7 +2,59 @@
## Overview
Not released yet
The Airbyte Redshift destination allows you to sync data to Redshift.
This Redshift destination connector is built on top of the destination-jdbc code base and is configured to rely on JDBC 4.2 standard drivers provided by Amazon via Mulesoft [here](https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42) as described in Redshift documentation [here](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-install.html).
### Sync overview
#### Output schema
Each stream will be output into its own raw table in Redshift. Each table will contain 3 columns:
* `ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in Redshift is `VARCHAR`.
* `emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in Redshift is `TIMESTAMP WITH TIME ZONE`.
* `data`: a json blob representing with the event data. The column type in Redshift is `VARCHAR` but can be be parsed with JSON functions.
#### Features
| Feature | Supported?\(Yes/No\) | Notes |
| :--- | :--- | :--- |
| Full Refresh Sync | Yes | |
#### Target Database
You will need to choose an existing database or create a new database that will be used to store synced data from Airbyte.
## Getting started
### Requirements
1. Active Redshift cluster
2. Allow connections from Airbyte to your Redshift cluster \(if they exist in separate VPCs\)
### Setup guide
#### 1. Make sure your cluster is active and accessible from the machine running Airbyte
This is dependent on your networking setup. The easiest way to verify if Airbyte is able to connect to your Redshift cluster is via the check connection tool in the UI.
You can check AWS Redshift documentation with a tutorial on how to properly configure your cluster's access [here](https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-authorize-cluster-access.html)
#### 2. Fill up connection info
Next is to provide the necessary information on how to connect to your cluster such as
the `host` whcih is part of the connection string or Endpoint accessible [here](https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-connect-to-cluster.html#rs-gsg-how-to-get-connection-string) without the `port` and `database` name
(it typically includes the cluster-id, region and end with `.redshift.amazonaws.com`).
You should have all the requirements needed to configure Redshift as a destination in the UI. You'll need the following information to configure the destination:
* **Host**
* **Port**
* **Username**
* **Password**
* **Schema**
* **Database**
* This database needs to exist within the cluster provided.
## Notes about Redshift Naming Conventions