1
0
mirror of synced 2025-12-23 21:03:15 -05:00

🚨 Destination Snowflake: Remove GCS/S3 Staging. (#29236)

As title, remove the GCS/S3 staging methods.

There isn't much usage so we can remove this. Internal Staging is also recommended by Snowflake, so using that is both cheaper and faster.

Co-authored-by: davinchia <davinchia@users.noreply.github.com>
Co-authored-by: Evan Tahler <evan@airbyte.io>
Co-authored-by: Pedro S. Lopez <pedroslopez@me.com>
This commit is contained in:
Davin Chia
2023-08-18 13:15:25 -07:00
committed by GitHub
parent 6ba7c03e44
commit ec0f83b03f
23 changed files with 21 additions and 1359 deletions

View File

@@ -37,7 +37,8 @@ public interface Destination extends Integration {
/**
* Default implementation allows us to not have to touch existing destinations while avoiding a lot
* of conditional statements in {@link IntegrationRunner}.
* of conditional statements in {@link IntegrationRunner}. This is preferred over #getConsumer and
* is the default Async Framework method.
*
* @param config config
* @param catalog catalog

View File

@@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
ENV ENABLE_SENTRY true
LABEL io.airbyte.version=1.3.3
LABEL io.airbyte.version=2.0.0
LABEL io.airbyte.name=airbyte/destination-snowflake
ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 1.3.3
dockerImageTag: 2.0.0
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
@@ -28,6 +28,11 @@ data:
supportsDbt: true
tags:
- language:java
releases:
breakingChanges:
2.0.0:
message: "Remove GCS/S3 loading method support."
upgradeDeadline: "2023-08-31"
ab_internal:
sl: 200
ql: 400

View File

@@ -10,15 +10,12 @@ import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
// TODO: Remove the Switching Destination from this class as part of code cleanup.
@Slf4j
public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestination.DestinationType> {
@@ -26,8 +23,6 @@ public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestinat
private final String airbyteEnvironment;
enum DestinationType {
COPY_S3,
COPY_GCS,
INTERNAL_STAGING
}
@@ -40,29 +35,8 @@ public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestinat
@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
log.info("destination class: {}", getClass());
final var useAsyncSnowflake = useAsyncSnowflake(config);
log.info("using async snowflake: {}", useAsyncSnowflake);
if (useAsyncSnowflake) {
return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector);
} else {
return new ShimToSerializedAirbyteMessageConsumer(getConsumer(config, catalog, outputRecordCollector));
}
}
public static boolean useAsyncSnowflake(final JsonNode config) {
final Set<String> stagingLoadingMethods = Set.of("internal staging", "internal-staging", "internal_staging");
return Optional.of(config)
.map(node -> node.get("loading_method"))
.map(node -> node.get("method"))
.map(JsonNode::asText)
.map(String::toLowerCase)
.map(loadingMethod -> stagingLoadingMethods.contains(loadingMethod))
.orElse(false);
final Consumer<AirbyteMessage> outputRecordCollector) {
return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector);
}
}

View File

@@ -14,42 +14,13 @@ import java.util.Map;
public class SnowflakeDestinationResolver {
public static DestinationType getTypeFromConfig(final JsonNode config) {
if (isS3Copy(config)) {
return DestinationType.COPY_S3;
} else if (isGcsCopy(config)) {
return DestinationType.COPY_GCS;
} else {
return DestinationType.INTERNAL_STAGING;
}
}
public static boolean isS3Copy(final JsonNode config) {
return config.has("loading_method") && config.get("loading_method").isObject() && config.get("loading_method").has("s3_bucket_name");
}
public static boolean isGcsCopy(final JsonNode config) {
return config.has("loading_method") && config.get("loading_method").isObject() && config.get("loading_method").has("project_id");
}
public static int getNumberOfFileBuffers(final JsonNode config) {
int numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER;
if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) {
numOfFileBuffers = Math.min(config.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
}
// Only allows for values 10 <= numOfFileBuffers <= 50
return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
}
public static Map<DestinationType, Destination> getTypeToDestination(
final String airbyteEnvironment) {
final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination(airbyteEnvironment);
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination(airbyteEnvironment);
public static Map<DestinationType, Destination> getTypeToDestination(final String airbyteEnvironment) {
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination(airbyteEnvironment);
return ImmutableMap.of(
DestinationType.COPY_S3, s3StagingDestination,
DestinationType.COPY_GCS, gcsStagingDestination,
DestinationType.INTERNAL_STAGING, internalStagingDestination);
return ImmutableMap.of(DestinationType.INTERNAL_STAGING, internalStagingDestination);
}
}

View File

@@ -1,174 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.snowflake;
import static io.airbyte.integrations.destination.snowflake.SnowflakeDestinationResolver.getNumberOfFileBuffers;
import static io.airbyte.integrations.destination.snowflake.SnowflakeS3StagingDestination.isPurgeStagingData;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.NoOpDestinationV1V2Migrator;
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsConfig;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnowflakeGcsStagingDestination extends AbstractJdbcDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeGcsStagingDestination.class);
private final String airbyteEnvironment;
public SnowflakeGcsStagingDestination(final String airbyteEnvironment) {
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
}
public SnowflakeGcsStagingDestination(final SnowflakeSQLNameTransformer nameTransformer, final String airbyteEnvironment) {
super("", nameTransformer, new SnowflakeSqlOperations());
this.airbyteEnvironment = airbyteEnvironment;
}
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
if (TypingAndDedupingFlag.isDestinationV2()) {
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("GCS staging is slated for deprecation and will not be upgraded to Destinations v2. Please use the Snowflake internal staging destination instead.");
}
final GcsConfig gcsConfig = GcsConfig.getGcsConfig(config);
final NamingConventionTransformer nameTransformer = getNamingResolver();
final SnowflakeGcsStagingSqlOperations snowflakeGcsStagingSqlOperations =
new SnowflakeGcsStagingSqlOperations(nameTransformer, gcsConfig);
final DataSource dataSource = getDataSource(config);
try {
final JdbcDatabase database = getDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
attemptTableOperations(outputSchema, database, nameTransformer, snowflakeGcsStagingSqlOperations,
true);
attemptWriteAndDeleteGcsObject(gcsConfig, outputSchema);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
} finally {
try {
DataSourceFactory.close(dataSource);
} catch (final Exception e) {
LOGGER.warn("Unable to close data source.", e);
}
}
}
private static void attemptWriteAndDeleteGcsObject(final GcsConfig gcsConfig, final String outputTableName) throws IOException {
final Storage storageClient = getStorageClient(gcsConfig);
final BlobId blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName);
final BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
storageClient.create(blobInfo);
try (final WriteChannel writer = storageClient.writer(blobInfo)) {
// Try to write a dummy message to make sure user has all required permissions
final byte[] content = "Hello, World!".getBytes(UTF_8);
writer.write(ByteBuffer.wrap(content, 0, content.length));
} finally {
storageClient.delete(blobId);
}
}
public static Storage getStorageClient(final GcsConfig gcsConfig) throws IOException {
final InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes(StandardCharsets.UTF_8));
final GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream);
return StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(gcsConfig.getProjectId())
.build()
.getService();
}
@Override
public DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}
@Override
protected JdbcDatabase getDatabase(final DataSource dataSource) {
return SnowflakeDatabase.getDatabase(dataSource);
}
@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
return Collections.emptyMap();
}
// this is a no op since we override getDatabase.
@Override
public JsonNode toJdbcConfig(final JsonNode config) {
return Jsons.emptyObject();
}
@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final GcsConfig gcsConfig = GcsConfig.getGcsConfig(config);
return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(getDataSource(config)),
new SnowflakeGcsStagingSqlOperations(getNamingResolver(), gcsConfig),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX, getNumberOfFileBuffers(config))),
config,
catalog,
isPurgeStagingData(config),
new TypeAndDedupeOperationValve(),
new NoopTyperDeduper(),
null,
null);
}
}

View File

@@ -1,217 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.snowflake;
import static io.airbyte.integrations.destination.snowflake.SnowflakeInternalStagingSqlOperations.UPLOAD_RETRY_LIMIT;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsConfig;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.staging.StagingOperations;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.joda.time.DateTime;
public class SnowflakeGcsStagingSqlOperations extends SnowflakeSqlOperations implements StagingOperations {
private final NamingConventionTransformer nameTransformer;
private final Storage storageClient;
private final GcsConfig gcsConfig;
private final Set<String> fullObjectKeys = new HashSet<>();
public SnowflakeGcsStagingSqlOperations(final NamingConventionTransformer nameTransformer, final GcsConfig gcsConfig) {
this.nameTransformer = nameTransformer;
this.gcsConfig = gcsConfig;
this.storageClient = getStorageClient(gcsConfig);
}
private Storage getStorageClient(final GcsConfig gcsConfig) {
try {
final InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes(StandardCharsets.UTF_8));
final GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream);
return StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(gcsConfig.getProjectId())
.build()
.getService();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
@Override
public String getStageName(final String namespace, final String streamName) {
return nameTransformer.applyDefaultCase(String.join(".",
nameTransformer.convertStreamName(namespace),
nameTransformer.convertStreamName(streamName)));
}
@Override
public String getStagingPath(final UUID connectionId, final String namespace, final String streamName, final DateTime writeDatetime) {
// see https://docs.snowflake.com/en/user-guide/data-load-considerations-stage.html
return nameTransformer.applyDefaultCase(String.format("%s/%s_%02d_%02d_%02d/%s/",
getStageName(namespace, streamName),
writeDatetime.year().get(),
writeDatetime.monthOfYear().get(),
writeDatetime.dayOfMonth().get(),
writeDatetime.hourOfDay().get(),
connectionId));
}
@Override
public void createStageIfNotExists(final JdbcDatabase database, final String stageName) throws Exception {
final String bucket = gcsConfig.getBucketName();
if (!doesBucketExist(bucket)) {
LOGGER.info("Bucket {} does not exist; creating...", bucket);
storageClient.create(BucketInfo.newBuilder(bucket).build());
LOGGER.info("Bucket {} has been created.", bucket);
}
}
private boolean doesBucketExist(final String bucket) {
return storageClient.get(bucket, Storage.BucketGetOption.fields()) != null;
}
@Override
public String uploadRecordsToStage(final JdbcDatabase database,
final SerializableBuffer recordsData,
final String schemaName,
final String stageName,
final String stagingPath)
throws Exception {
final List<Exception> exceptionsThrown = new ArrayList<>();
while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT) {
try {
final String fileName = loadDataIntoBucket(stagingPath, recordsData);
LOGGER.info("Successfully loaded records to stage {} with {} re-attempt(s)", stagingPath, exceptionsThrown.size());
return fileName;
} catch (final Exception e) {
LOGGER.error("Failed to upload records into storage {}", stagingPath, e);
exceptionsThrown.add(e);
}
}
throw new RuntimeException(String.format("Exceptions thrown while uploading records into storage: %s", Strings.join(exceptionsThrown, "\n")));
}
/**
* Upload the file from {@code recordsData} to S3 and simplify the filename as <partId>.<extension>.
*
* <p>
* Method mirrors similarly named method within
* {@link io.airbyte.integrations.destination.s3.S3StorageOperations}
* </p>
*
* @param objectPath filepath to the object
* @param recordsData serialized {@link io.airbyte.protocol.models.AirbyteRecordMessage}s
* @return the uploaded filename, which is different from the serialized buffer filename
*/
private String loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException {
final String fullObjectKey = objectPath + recordsData.getFilename();
fullObjectKeys.add(fullObjectKey);
final var blobId = BlobId.of(gcsConfig.getBucketName(), fullObjectKey);
final var blobInfo = BlobInfo.newBuilder(blobId).build();
final var blob = storageClient.create(blobInfo);
final var channel = blob.writer();
try (channel) {
final OutputStream outputStream = Channels.newOutputStream(channel);
final InputStream dataInputStream = recordsData.getInputStream();
dataInputStream.transferTo(outputStream);
} catch (final Exception e) {
LOGGER.error("Failed to load data into storage {}", objectPath, e);
throw new RuntimeException(e);
}
return recordsData.getFilename();
}
@Override
public void copyIntoTableFromStage(final JdbcDatabase database,
final String stageName,
final String stagingPath,
final List<String> stagedFiles,
final String tableName,
final String schemaName)
throws Exception {
LOGGER.info("Starting copy to target table from stage: {} in destination from stage: {}, schema: {}, .",
tableName, stagingPath, schemaName);
// Print actual SQL query if user needs to manually force reload from staging
Exceptions.toRuntime(() -> database.execute(getCopyQuery(stagingPath, stagedFiles,
tableName, schemaName)));
LOGGER.info("Copy to target table {}.{} in destination complete.", schemaName, tableName);
}
private String getCopyQuery(final String stagingPath, final List<String> stagedFiles, final String dstTableName, final String schemaName) {
return String.format(
"COPY INTO %s.%s FROM '%s' storage_integration = gcs_airbyte_integration "
+ " file_format = (type = csv compression = auto field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"' NULL_IF=('') ) "
+ generateFilesList(stagedFiles) + ";",
schemaName,
dstTableName,
generateBucketPath(stagingPath));
}
private String generateBucketPath(final String stagingPath) {
return "gcs://" + gcsConfig.getBucketName() + "/" + stagingPath;
}
@Override
public void cleanUpStage(final JdbcDatabase database, final String stageName, final List<String> stagedFiles) throws Exception {
cleanUpBucketObject(stagedFiles);
}
private void cleanUpBucketObject(final List<String> currentStagedFiles) {
currentStagedFiles.forEach(candidate -> fullObjectKeys.forEach(fullBlobPath -> {
if (fullBlobPath.contains(candidate)) {
removeBlob(fullBlobPath);
}
}));
}
private void removeBlob(final String file) {
final var blobId = BlobId.of(gcsConfig.getBucketName(), file);
storageClient.delete(blobId);
}
@Override
public void dropStageIfExists(final JdbcDatabase database, final String stageName) throws Exception {
dropBucketObject();
}
private void dropBucketObject() {
if (!fullObjectKeys.isEmpty()) {
final Iterator<String> iterator = fullObjectKeys.iterator();
while (iterator.hasNext()) {
final String element = iterator.next();
if (element != null) {
removeBlob(element);
iterator.remove();
}
}
}
}
}

View File

@@ -4,14 +4,11 @@
package io.airbyte.integrations.destination.snowflake;
import static io.airbyte.integrations.destination.snowflake.SnowflakeDestinationResolver.getNumberOfFileBuffers;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
@@ -24,8 +21,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOpe
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
@@ -124,54 +119,6 @@ public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination
return Jsons.emptyObject();
}
@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final String defaultNamespace = config.get("schema").asText();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
if (StringUtils.isEmpty(stream.getStream().getNamespace())) {
stream.getStream().setNamespace(defaultNamespace);
}
}
final SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
final ParsedCatalog parsedCatalog;
final TyperDeduper typerDeduper;
final JdbcDatabase database = getDatabase(getDataSource(config));
if (TypingAndDedupingFlag.isDestinationV2()) {
final String databaseName = config.get(JdbcUtils.DATABASE_KEY).asText();
final SnowflakeDestinationHandler snowflakeDestinationHandler = new SnowflakeDestinationHandler(databaseName, database);
final CatalogParser catalogParser;
if (TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent()) {
catalogParser = new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get());
} else {
catalogParser = new CatalogParser(sqlGenerator);
}
parsedCatalog = catalogParser.parseCatalog(catalog);
// TODO make a SnowflakeV1V2Migrator
NoOpDestinationV1V2Migrator migrator = new NoOpDestinationV1V2Migrator();
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}
return new StagingConsumerFactory().create(
outputRecordCollector,
database,
new SnowflakeInternalStagingSqlOperations(getNamingResolver()),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX, getNumberOfFileBuffers(config))),
config,
catalog,
true,
new TypeAndDedupeOperationValve(),
typerDeduper,
parsedCatalog,
defaultNamespace);
}
@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,

View File

@@ -1,178 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.snowflake;
import static io.airbyte.integrations.destination.snowflake.SnowflakeDestinationResolver.getNumberOfFileBuffers;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.NoOpDestinationV1V2Migrator;
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType;
import io.airbyte.integrations.destination.s3.EncryptionConfig;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnowflakeS3StagingDestination extends AbstractJdbcDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StagingDestination.class);
private final String airbyteEnvironment;
public SnowflakeS3StagingDestination(final String airbyteEnvironment) {
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
}
public SnowflakeS3StagingDestination(final SnowflakeSQLNameTransformer nameTransformer, final String airbyteEnvironment) {
super("", nameTransformer, new SnowflakeSqlOperations());
this.airbyteEnvironment = airbyteEnvironment;
}
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
final EncryptionConfig encryptionConfig = EncryptionConfig.fromJson(config.get("loading_method").get("encryption"));
if (!isPurgeStagingData(config) && encryptionConfig instanceof final AesCbcEnvelopeEncryption c && c.keyType() == KeyType.EPHEMERAL) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage(
"You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt.");
}
if (TypingAndDedupingFlag.isDestinationV2()) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("S3 staging is slated for deprecation and will not be upgraded to Destinations v2. Please use the Snowflake internal staging destination instead.");
}
final NamingConventionTransformer nameTransformer = getNamingResolver();
final SnowflakeS3StagingSqlOperations snowflakeS3StagingSqlOperations =
new SnowflakeS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, encryptionConfig);
final DataSource dataSource = getDataSource(config);
try {
final JdbcDatabase database = getDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
attemptTableOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations,
true);
attemptStageOperations(outputSchema, database, nameTransformer, snowflakeS3StagingSqlOperations);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
} finally {
try {
DataSourceFactory.close(dataSource);
} catch (final Exception e) {
LOGGER.warn("Unable to close data source.", e);
}
}
}
private static void attemptStageOperations(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer namingResolver,
final SnowflakeS3StagingSqlOperations sqlOperations)
throws Exception {
// verify we have permissions to create/drop stage
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID());
final String stageName = sqlOperations.getStageName(outputSchema, outputTableName);
sqlOperations.createStageIfNotExists(database, stageName);
// try to make test write to make sure we have required role
try {
sqlOperations.attemptWriteToStage(outputSchema, stageName, database);
} finally {
// drop created tmp stage
sqlOperations.dropStageIfExists(database, stageName);
}
}
@Override
protected DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}
@Override
protected JdbcDatabase getDatabase(final DataSource dataSource) {
return SnowflakeDatabase.getDatabase(dataSource);
}
@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
return Collections.emptyMap();
}
// this is a no op since we override getDatabase.
@Override
public JsonNode toJdbcConfig(final JsonNode config) {
return Jsons.emptyObject();
}
@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3DestinationConfig s3Config = getS3DestinationConfig(config);
final EncryptionConfig encryptionConfig = EncryptionConfig.fromJson(config.get("loading_method").get("encryption"));
return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(getDataSource(config)),
new SnowflakeS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX, getNumberOfFileBuffers(config))),
config,
catalog,
isPurgeStagingData(config),
new TypeAndDedupeOperationValve(),
new NoopTyperDeduper(),
null,
null);
}
private S3DestinationConfig getS3DestinationConfig(final JsonNode config) {
final JsonNode loadingMethod = config.get("loading_method");
return S3DestinationConfig.getS3DestinationConfig(loadingMethod);
}
public static boolean isPurgeStagingData(final JsonNode config) {
final JsonNode loadingMethod = config.get("loading_method");
if (!loadingMethod.has("purge_staging_data")) {
return true;
} else {
return loadingMethod.get("purge_staging_data").asBoolean();
}
}
}

View File

@@ -1,138 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.snowflake;
import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryptionBlobDecorator;
import io.airbyte.integrations.destination.s3.EncryptionConfig;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3StorageOperations;
import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnowflakeS3StagingSqlOperations extends SnowflakeSqlStagingOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class);
private static final Encoder BASE64_ENCODER = Base64.getEncoder();
private static final String COPY_QUERY =
"""
COPY INTO %s.%s FROM '%s'
CREDENTIALS=(aws_key_id='%s' aws_secret_key='%s')
file_format = (type = csv compression = auto field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '"' NULL_IF=('') )
""";
private final NamingConventionTransformer nameTransformer;
private final S3StorageOperations s3StorageOperations;
private final S3DestinationConfig s3Config;
private final byte[] keyEncryptingKey;
public SnowflakeS3StagingSqlOperations(final NamingConventionTransformer nameTransformer,
final AmazonS3 s3Client,
final S3DestinationConfig s3Config,
final EncryptionConfig encryptionConfig) {
this.nameTransformer = nameTransformer;
this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config);
this.s3Config = s3Config;
if (encryptionConfig instanceof AesCbcEnvelopeEncryption e) {
this.s3StorageOperations.addBlobDecorator(new AesCbcEnvelopeEncryptionBlobDecorator(e.key()));
this.keyEncryptingKey = e.key();
} else {
this.keyEncryptingKey = null;
}
}
@Override
public String getStageName(final String namespace, final String streamName) {
return nameTransformer.applyDefaultCase(String.join(".",
nameTransformer.convertStreamName(namespace),
nameTransformer.convertStreamName(streamName)));
}
@Override
public String getStagingPath(final UUID connectionId, final String namespace, final String streamName, final DateTime writeDatetime) {
// see https://docs.snowflake.com/en/user-guide/data-load-considerations-stage.html
return nameTransformer.applyDefaultCase(String.format("%s/%s/%02d/%02d/%02d/%s/",
getStageName(namespace, streamName),
writeDatetime.year().get(),
writeDatetime.monthOfYear().get(),
writeDatetime.dayOfMonth().get(),
writeDatetime.hourOfDay().get(),
connectionId));
}
@Override
public String uploadRecordsToStage(final JdbcDatabase database,
final SerializableBuffer recordsData,
final String schemaName,
final String stageName,
final String stagingPath) {
return s3StorageOperations.uploadRecordsToBucket(recordsData, schemaName, stageName, stagingPath);
}
@Override
public void createStageIfNotExists(final JdbcDatabase database, final String stageName) {
s3StorageOperations.createBucketIfNotExists();
}
@Override
public void copyIntoTableFromStage(final JdbcDatabase database,
final String stageName,
final String stagingPath,
final List<String> stagedFiles,
final String tableName,
final String schemaName) {
LOGGER.info("Starting copy to target table from stage: {} in destination from stage: {}, schema: {}, .",
tableName, stagingPath, schemaName);
// Print actual SQL query if user needs to manually force reload from staging
Exceptions.toRuntime(() -> database.execute(getCopyQuery(stagingPath, stagedFiles,
tableName, schemaName)));
LOGGER.info("Copy to target table {}.{} in destination complete.", schemaName, tableName);
}
protected String getCopyQuery(final String stagingPath,
final List<String> stagedFiles,
final String dstTableName,
final String schemaName) {
final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig();
final String encryptionClause;
if (keyEncryptingKey == null) {
encryptionClause = "";
} else {
encryptionClause = String.format(" encryption = (type = 'aws_cse' master_key = '%s')", BASE64_ENCODER.encodeToString(keyEncryptingKey));
}
return String.format(COPY_QUERY + generateFilesList(stagedFiles) + encryptionClause + ";",
schemaName,
dstTableName,
generateBucketPath(stagingPath),
credentialConfig.getAccessKeyId(),
credentialConfig.getSecretAccessKey());
}
private String generateBucketPath(final String stagingPath) {
return "s3://" + s3Config.getBucketName() + "/" + stagingPath;
}
@Override
public void dropStageIfExists(final JdbcDatabase database, final String stageName) {
s3StorageOperations.dropBucketObject(stageName);
}
@Override
public void cleanUpStage(final JdbcDatabase database, final String stageName, final List<String> stagedFiles) {
s3StorageOperations.cleanUpBucketObject(stageName, stagedFiles);
}
}

View File

@@ -161,231 +161,6 @@
"type": "string",
"order": 7
},
"loading_method": {
"type": "object",
"title": "Data Staging Method",
"description": "Select a data staging method",
"order": 8,
"oneOf": [
{
"title": "Select another option",
"description": "Select another option",
"required": ["method"],
"properties": {
"method": {
"title": "",
"description": "",
"type": "string",
"enum": ["Standard"],
"default": "Standard"
}
}
},
{
"title": "[Recommended] Internal Staging",
"description": "Recommended for large production workloads for better speed and scalability.",
"required": ["method"],
"properties": {
"method": {
"title": "",
"description": "",
"type": "string",
"enum": ["Internal Staging"],
"default": "Internal Staging"
}
}
},
{
"title": "AWS S3 Staging",
"description": "Recommended for large production workloads for better speed and scalability.",
"required": [
"method",
"s3_bucket_name",
"access_key_id",
"secret_access_key"
],
"properties": {
"method": {
"title": "",
"description": "",
"type": "string",
"enum": ["S3 Staging"],
"default": "S3 Staging",
"order": 0
},
"s3_bucket_name": {
"title": "S3 Bucket Name",
"type": "string",
"description": "Enter your S3 bucket name",
"examples": ["airbyte.staging"],
"order": 1
},
"s3_bucket_region": {
"title": "S3 Bucket Region",
"type": "string",
"default": "",
"description": "Enter the region where your S3 bucket resides",
"enum": [
"",
"us-east-1",
"us-east-2",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-southeast-1",
"ap-southeast-2",
"ca-central-1",
"cn-north-1",
"cn-northwest-1",
"eu-central-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"eu-south-1",
"eu-north-1",
"sa-east-1",
"me-south-1"
],
"order": 2
},
"access_key_id": {
"type": "string",
"description": "Enter your <a href=\"https://docs.aws.amazon.com/powershell/latest/userguide/pstools-appendix-sign-up.html\">AWS access key ID</a>. Airbyte requires Read and Write permissions on your S3 bucket ",
"title": "AWS access key ID",
"airbyte_secret": true,
"order": 3
},
"secret_access_key": {
"type": "string",
"description": "Enter your <a href=\"https://docs.aws.amazon.com/powershell/latest/userguide/pstools-appendix-sign-up.html\">AWS secret access key</a>",
"title": "AWS secret access key",
"airbyte_secret": true,
"order": 4
},
"purge_staging_data": {
"title": "Purge Staging Files and Tables",
"type": "boolean",
"description": "Toggle to delete staging files from the S3 bucket after a successful sync",
"default": true,
"order": 5
},
"encryption": {
"title": "Encryption",
"type": "object",
"description": "Choose a data encryption method for the staging data",
"default": { "encryption_type": "none" },
"order": 6,
"oneOf": [
{
"title": "No encryption",
"description": "Staging data will be stored in plaintext.",
"type": "object",
"required": ["encryption_type"],
"properties": {
"encryption_type": {
"type": "string",
"const": "none",
"enum": ["none"],
"default": "none"
}
}
},
{
"title": "AES-CBC envelope encryption",
"description": "Staging data will be encrypted using AES-CBC envelope encryption.",
"type": "object",
"required": ["encryption_type"],
"properties": {
"encryption_type": {
"type": "string",
"const": "aes_cbc_envelope",
"enum": ["aes_cbc_envelope"],
"default": "aes_cbc_envelope"
},
"key_encrypting_key": {
"type": "string",
"title": "Key",
"description": "The key, base64-encoded. Must be either 128, 192, or 256 bits. Leave blank to have Airbyte generate an ephemeral key for each sync.",
"airbyte_secret": true
}
}
}
]
},
"file_name_pattern": {
"type": "string",
"description": "The pattern allows you to set the file-name format for the S3 staging file(s)",
"title": "S3 Filename pattern",
"examples": [
"{date}",
"{date:yyyy_MM}",
"{timestamp}",
"{part_number}",
"{sync_id}"
],
"order": 7
}
}
},
{
"title": "Google Cloud Storage Staging",
"description": "Recommended for large production workloads for better speed and scalability.",
"required": [
"method",
"project_id",
"bucket_name",
"credentials_json"
],
"properties": {
"method": {
"title": "",
"description": "",
"type": "string",
"enum": ["GCS Staging"],
"default": "GCS Staging",
"order": 0
},
"project_id": {
"title": "Google Cloud project ID",
"type": "string",
"description": "Enter the <a href=\"https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects\">Google Cloud project ID</a>",
"examples": ["my-project"],
"order": 1
},
"bucket_name": {
"title": "Cloud Storage bucket name",
"type": "string",
"description": "Enter the <a href=\"https://cloud.google.com/storage/docs/creating-buckets\">Cloud Storage bucket name</a>",
"examples": ["airbyte-staging"],
"order": 2
},
"credentials_json": {
"title": "Google Application Credentials",
"type": "string",
"description": "Enter your <a href=\"https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys\">Google Cloud service account key</a> in the JSON format with read/write access to your Cloud Storage staging bucket",
"airbyte_secret": true,
"multiline": true,
"order": 3
}
}
}
]
},
"file_buffer_count": {
"title": "File Buffer Count",
"type": "integer",
"minimum": 10,
"maximum": 50,
"default": 10,
"description": "Number of file buffers allocated for writing data. Increasing this number is beneficial for connections using Change Data Capture (CDC) and up to the number of streams within a connection. Increasing the number of file buffers past the maximum number of streams has deteriorating effects",
"examples": ["10"],
"order": 9
},
"use_1s1t_format": {
"type": "boolean",
"description": "(Beta) Use <a href=\"https://github.com/airbytehq/airbyte/issues/26028\" target=\"_blank\">Destinations V2</a>. Contact Airbyte Support to participate in the beta program.",

View File

@@ -1,44 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.snowflake;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.configoss.StandardCheckConnectionOutput;
import io.airbyte.configoss.StandardCheckConnectionOutput.Status;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
public class SnowflakeGcsCopyDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {
private static final String NO_GCS_PRIVILEGES_ERR_MSG =
"Permission 'storage.objects.create' denied on resource (or it may not exist).";
@Override
public JsonNode getStaticConfig() {
final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_gcs_config.json")));
Preconditions.checkArgument(SnowflakeDestinationResolver.isGcsCopy(copyConfig));
Preconditions.checkArgument(!SnowflakeDestinationResolver.isS3Copy(copyConfig));
return copyConfig;
}
@Test
public void testCheckWithNoProperGcsPermissionConnection() throws Exception {
// Config to user (creds) that has no permission to schema
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/copy_insufficient_gcs_roles_config.json")));
final StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);
assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_GCS_PRIVILEGES_ERR_MSG);
}
}

View File

@@ -104,8 +104,6 @@ public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAccepta
public JsonNode getStaticConfig() {
final JsonNode insertConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/insert_config.json")));
Preconditions.checkArgument(!SnowflakeDestinationResolver.isS3Copy(insertConfig));
Preconditions.checkArgument(!SnowflakeDestinationResolver.isGcsCopy(insertConfig));
return insertConfig;
}

View File

@@ -31,10 +31,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
public class SnowflakeInternalStagingDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {
public JsonNode getStaticConfig() {
final JsonNode internalStagingConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/internal_staging_config.json")));
Preconditions.checkArgument(!SnowflakeDestinationResolver.isS3Copy(internalStagingConfig));
Preconditions.checkArgument(!SnowflakeDestinationResolver.isGcsCopy(internalStagingConfig));
return internalStagingConfig;
return Jsons.deserialize(IOs.readFile(Path.of("secrets/internal_staging_config.json")));
}
@Disabled("See README for why this test is disabled")

View File

@@ -1,23 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.snowflake;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import java.nio.file.Path;
public class SnowflakeS3CopyDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {
@Override
public JsonNode getStaticConfig() {
final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_s3_config.json")));
Preconditions.checkArgument(SnowflakeDestinationResolver.isS3Copy(copyConfig));
Preconditions.checkArgument(!SnowflakeDestinationResolver.isGcsCopy(copyConfig));
return copyConfig;
}
}

View File

@@ -1,43 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.snowflake;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.configoss.StandardCheckConnectionOutput;
import io.airbyte.configoss.StandardCheckConnectionOutput.Status;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
public class SnowflakeS3CopyEncryptedDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {
private static final String NO_S3_PRIVILEGES_ERR_MSG = "Could not connect with provided configuration.";
@Override
public JsonNode getStaticConfig() {
final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_s3_encrypted_config.json")));
Preconditions.checkArgument(SnowflakeDestinationResolver.isS3Copy(copyConfig));
Preconditions.checkArgument(!SnowflakeDestinationResolver.isGcsCopy(copyConfig));
return copyConfig;
}
@Test
public void testCheckWithNoProperS3PermissionConnection() throws Exception {
// Config to user (creds) that has no permission to schema
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/copy_s3_wrong_location_config.json")));
final StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);
assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_S3_PRIVILEGES_ERR_MSG);
}
}

View File

@@ -40,8 +40,6 @@ import org.junit.jupiter.params.provider.MethodSource;
public class SnowflakeDestinationTest {
private static final ObjectMapper mapper = MoreMappers.initMapper();
@BeforeEach
public void setup() {
DestinationConfig.initialize(Jsons.emptyObject());
@@ -89,43 +87,6 @@ public class SnowflakeDestinationTest {
assertEquals(isMatch, matcher.find());
}
@Test
@DisplayName("When given S3 credentials should use COPY")
public void useS3CopyStrategyTest() {
final var stubLoadingMethod = mapper.createObjectNode();
stubLoadingMethod.put("s3_bucket_name", "fake-bucket");
stubLoadingMethod.put("access_key_id", "test");
stubLoadingMethod.put("secret_access_key", "test key");
final var stubConfig = mapper.createObjectNode();
stubConfig.set("loading_method", stubLoadingMethod);
assertTrue(SnowflakeDestinationResolver.isS3Copy(stubConfig));
}
@Test
@DisplayName("When given GCS credentials should use COPY")
public void useGcsCopyStrategyTest() {
final var stubLoadingMethod = mapper.createObjectNode();
stubLoadingMethod.put("project_id", "my-project");
stubLoadingMethod.put("bucket_name", "my-bucket");
stubLoadingMethod.put("credentials_json", "hunter2");
final var stubConfig = mapper.createObjectNode();
stubConfig.set("loading_method", stubLoadingMethod);
assertTrue(SnowflakeDestinationResolver.isGcsCopy(stubConfig));
}
@Test
@DisplayName("When not given S3 credentials should use INSERT")
public void useInsertStrategyTest() {
final var stubLoadingMethod = mapper.createObjectNode();
final var stubConfig = mapper.createObjectNode();
stubConfig.set("loading_method", stubLoadingMethod);
assertFalse(SnowflakeDestinationResolver.isS3Copy(stubConfig));
}
@ParameterizedTest
@MethodSource("destinationTypeToConfig")
public void testS3ConfigType(final String configFileName, final DestinationType expectedDestinationType) throws Exception {
@@ -135,10 +96,7 @@ public class SnowflakeDestinationTest {
}
private static Stream<Arguments> destinationTypeToConfig() {
return Stream.of(
arguments("copy_gcs_config.json", DestinationType.COPY_GCS),
arguments("copy_s3_config.json", DestinationType.COPY_S3),
arguments("insert_config.json", DestinationType.INTERNAL_STAGING));
return Stream.of(arguments("insert_config.json", DestinationType.INTERNAL_STAGING));
}
@Test
@@ -149,43 +107,4 @@ public class SnowflakeDestinationTest {
assertEquals(AsyncStreamConsumer.class, consumer.getClass());
}
static class TestEnableAsyncArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
final var mapper = new ObjectMapper();
final var standard = mapper.createObjectNode();
final var internalStagingSpace = mapper.createObjectNode();
final var internalStagingSpaceCapital = mapper.createObjectNode();
final var internalStagingDash = mapper.createObjectNode();
final var internalStagingUnderscore = mapper.createObjectNode();
final var noLoadingMethod = mapper.createObjectNode();
standard.put("loading_method", mapper.createObjectNode().put("method", "standard"));
internalStagingSpace.put("loading_method", mapper.createObjectNode().put("method", "internal staging"));
internalStagingSpaceCapital.put("loading_method", mapper.createObjectNode().put("method", "INTERNAL STAGING"));
internalStagingDash.put("loading_method", mapper.createObjectNode().put("method", "internal-staging"));
internalStagingUnderscore.put("loading_method", mapper.createObjectNode().put("method", "internal_staging"));
noLoadingMethod.put("loading_method", "standard");
return Stream.of(
Arguments.of(standard, false),
Arguments.of(internalStagingSpace, true),
Arguments.of(internalStagingSpaceCapital, true),
Arguments.of(internalStagingDash, true),
Arguments.of(internalStagingUnderscore, true),
Arguments.of(mapper.createObjectNode(), false),
Arguments.of(noLoadingMethod, false)
);
}
}
@ParameterizedTest
@ArgumentsSource(TestEnableAsyncArgumentsProvider.class)
public void testEnableAsync(final JsonNode config, boolean expected) {
final var actual = SnowflakeDestination.useAsyncSnowflake(config);
Assertions.assertEquals(expected, actual);
}
}

View File

@@ -1,57 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.snowflake;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.DestinationConfig;
import io.airbyte.integrations.destination.s3.NoEncryption;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class SnowflakeS3StagingSqlOperationsTest {
private static final String SCHEMA_NAME = "schemaName";
private static final String STAGE_PATH = "stagePath/2022/";
private static final String TABLE_NAME = "tableName";
private static final String BUCKET_NAME = "bucket_name";
private final AmazonS3 s3Client = mock(AmazonS3.class);
private final S3DestinationConfig s3Config = mock(S3DestinationConfig.class);
private final S3AccessKeyCredentialConfig credentialConfig = mock(S3AccessKeyCredentialConfig.class);
private SnowflakeS3StagingSqlOperations snowflakeStagingSqlOperations;
@BeforeEach
public void setup() {
DestinationConfig.initialize(Jsons.emptyObject());
snowflakeStagingSqlOperations =
new SnowflakeS3StagingSqlOperations(new SnowflakeSQLNameTransformer(), s3Client, s3Config, new NoEncryption());
}
@Test
void copyIntoTmpTableFromStage() {
final String expectedQuery = """
COPY INTO schemaName.tableName FROM 's3://bucket_name/stagePath/2022/'
CREDENTIALS=(aws_key_id='aws_access_key_id' aws_secret_key='aws_secret_access_key')
file_format = (type = csv compression = auto field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '"' NULL_IF=('') )
files = ('filename1','filename2');""";
when(s3Config.getBucketName()).thenReturn(BUCKET_NAME);
when(s3Config.getS3CredentialConfig()).thenReturn(credentialConfig);
when(credentialConfig.getAccessKeyId()).thenReturn("aws_access_key_id");
when(credentialConfig.getSecretAccessKey()).thenReturn("aws_secret_access_key");
final String actualCopyQuery =
snowflakeStagingSqlOperations.getCopyQuery(STAGE_PATH, List.of("filename1", "filename2"), TABLE_NAME, SCHEMA_NAME);
assertEquals(expectedQuery, actualCopyQuery);
}
}

View File

@@ -1,17 +0,0 @@
{
"host": "testhost.snowflakecomputing.com",
"role": "AIRBYTE_ROLE",
"warehouse": "AIRBYTE_WAREHOUSE",
"database": "AIRBYTE_DATABASE",
"schema": "AIRBYTE_SCHEMA",
"username": "AIRBYTE_USER",
"credentials": {
"password": "test"
},
"loading_method": {
"method": "GCS Staging",
"project_id": "test",
"bucket_name": "test",
"credentials_json": "{\n\"type\": \"test\"}\n"
}
}

View File

@@ -1,18 +0,0 @@
{
"host": "testhost.snowflakecomputing.com",
"role": "AIRBYTE_ROLE",
"warehouse": "AIRBYTE_WAREHOUSE",
"database": "AIRBYTE_DATABASE",
"schema": "AIRBYTE_SCHEMA",
"username": "AIRBYTE_USER",
"credentials": {
"password": "test"
},
"loading_method": {
"method": "S3 Staging",
"s3_bucket_name": "airbyte-snowflake-integration-tests",
"s3_bucket_region": "us-east-2",
"access_key_id": "test",
"secret_access_key": "test"
}
}

View File

@@ -1,21 +0,0 @@
{
"host": "testhost.snowflakecomputing.com",
"role": "AIRBYTE_ROLE",
"warehouse": "AIRBYTE_WAREHOUSE",
"database": "AIRBYTE_DATABASE",
"schema": "AIRBYTE_SCHEMA",
"username": "AIRBYTE_USER",
"credentials": {
"password": "test"
},
"loading_method": {
"method": "S3 Staging",
"s3_bucket_name": "airbyte-snowflake-integration-tests",
"s3_bucket_region": "us-east-2",
"access_key_id": "test",
"secret_access_key": "test",
"encryption": {
"encryption_type": "aes_cbc_envelope"
}
}
}

View File

@@ -0,0 +1,4 @@
# Snowflake Migration Guide
## Upgrading to 2.0.0
Snowflake no longer supports GCS/S3. Please migrate to the Internal Staging option. This is recommended by Snowflake and is cheaper and faster.

View File

@@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n
| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.0 | 2023-08-09 | [\#28894](https://github.com/airbytehq/airbyte/pull/29236) | Remove support for Snowflake GCS/S3 loading method in favor of Snowflake Internal staging |
| 1.3.3 | 2023-08-15 | [\#29461](https://github.com/airbytehq/airbyte/pull/29461) | Changing a static constant reference |
| 1.3.2 | 2023-08-11 | [\#29381](https://github.com/airbytehq/airbyte/pull/29381) | Destinations v2: Add support for streams with no columns |
| 1.3.1 | 2023-08-04 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Update SqlGenerator |