Implement majority of more efficient Redshift Copy Strategy. (#2547)
Instead of inserts, we write the data to S3 and issue a COPY command copying from S3 into redshift. Use a single file as its sufficiently performant and we do not want to introduce file destination related operations yet. Use an open source library for uploads as AWS does not natively support streaming loads. My intention with this PR is to first implement the meat of the write-and-copy strategy. This is mainly centered around the RedshiftCopier class. I plan hook up the RedshiftCopier to the actual Destination class, and implement all the plumbing, in a follow up PR. Co-authored-by: Davin Chia <davinchia@Davins-MacBook-Pro.local>
This commit is contained in:
@@ -26,12 +26,24 @@ package io.airbyte.integrations.base;
|
||||
|
||||
import io.airbyte.commons.functional.CheckedConsumer;
|
||||
|
||||
// Lifecycle:
|
||||
// 1. Instantiate consumer.
|
||||
// 2. start() to initialize any resources that need to be created BEFORE the consumer consumes any
|
||||
// messages.
|
||||
// 2. consumes ALL records via {@link DestinationConsumer#accept(T)}
|
||||
// 3. Always (on success or failure) finalize by calling {@link DestinationConsumer#close()}
|
||||
/**
|
||||
* Interface for the destination's consumption of incoming records wrapped in an
|
||||
* {@link io.airbyte.protocol.models.AirbyteMessage}.
|
||||
*
|
||||
* This is via the accept method, which commonly handles parsing, validation, batching and writing
|
||||
* of the transformed data to the final destination i.e. the technical system data is being written
|
||||
* to.
|
||||
*
|
||||
* Lifecycle:
|
||||
* <li>1. Instantiate consumer.</li>
|
||||
* <li>2. start() to initialize any resources that need to be created BEFORE the consumer consumes
|
||||
* any messages.</li>
|
||||
* <li>3. Consumes ALL records via {@link DestinationConsumer#accept(T)}</li>
|
||||
* <li>4. Always (on success or failure) finalize by calling
|
||||
* {@link DestinationConsumer#close()}</li>
|
||||
*
|
||||
* We encourage implementing this interface using the {@link FailureTrackingConsumer} class.
|
||||
*/
|
||||
public interface DestinationConsumer<T> extends CheckedConsumer<T, Exception>, AutoCloseable {
|
||||
|
||||
void start() throws Exception;
|
||||
|
||||
@@ -27,6 +27,18 @@ package io.airbyte.integrations.base;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Minimal abstract class intended to provide a consistent structure to classes seeking to implement
|
||||
* the {@link DestinationConsumer} interface. The original interface methods are wrapped in generic
|
||||
* exception handlers - any exception is caught and logged.
|
||||
*
|
||||
* Two methods are intended for extension: - startTracked: Wraps set up of necessary
|
||||
* infrastructure/configuration before message consumption. - acceptTracked: Wraps actual processing
|
||||
* of each {@link io.airbyte.protocol.models.AirbyteMessage}.
|
||||
*
|
||||
* Though not necessary, we highly encourage using this class when implementing destinations. See
|
||||
* child classes for examples.
|
||||
*/
|
||||
public abstract class FailureTrackingConsumer<T> implements DestinationConsumer<T> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FailureTrackingConsumer.class);
|
||||
|
||||
@@ -120,6 +120,7 @@ public class IntegrationRunner {
|
||||
LOGGER.info("Completed integration: {}", integration.getClass().getName());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void consumeWriteStream(DestinationConsumer<AirbyteMessage> consumer) throws Exception {
|
||||
final Scanner input = new Scanner(System.in);
|
||||
try (consumer) {
|
||||
|
||||
Reference in New Issue
Block a user