diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index 9c4ed391ed8..c9d44154170 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -79,6 +79,7 @@ jobs: SMARTSHEETS_TEST_CREDS: ${{ secrets.SMARTSHEETS_TEST_CREDS }} SNOWFLAKE_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_INTEGRATION_TEST_CREDS }} SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS }} + SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS }} SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }} SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }} STRIPE_INTEGRATION_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_TEST_CREDS }} diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 2c3dc59aa11..f57ab6640ce 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -79,6 +79,7 @@ jobs: SMARTSHEETS_TEST_CREDS: ${{ secrets.SMARTSHEETS_TEST_CREDS }} SNOWFLAKE_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_INTEGRATION_TEST_CREDS }} SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS }} + SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS }} SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }} SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }} STRIPE_INTEGRATION_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_TEST_CREDS }} diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json index 35cdb62e63d..d3ddddb8714 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba", "name": "Snowflake", "dockerRepository": "airbyte/destination-snowflake", - "dockerImageTag": "0.3.2", + "dockerImageTag": "0.3.3", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 3a1a36189d3..485bc67cde7 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -22,7 +22,7 @@ - destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba name: Snowflake dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.3.2 + dockerImageTag: 0.3.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake - destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc name: Redshift diff --git a/airbyte-integrations/connectors/destination-jdbc/build.gradle b/airbyte-integrations/connectors/destination-jdbc/build.gradle index a027f3256eb..cec590fe388 100644 --- a/airbyte-integrations/connectors/destination-jdbc/build.gradle +++ b/airbyte-integrations/connectors/destination-jdbc/build.gradle @@ -9,6 +9,9 @@ application { } dependencies { + implementation 'com.google.cloud:google-cloud-storage:1.113.16' + implementation 'com.google.auth:google-auth-library-oauth2-http:0.25.5' + implementation project(':airbyte-db') implementation project(':airbyte-integrations:bases:base-java') implementation project(':airbyte-protocol:models') diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsConfig.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsConfig.java new file mode 100644 index 00000000000..bcddf1efa61 --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsConfig.java @@ -0,0 +1,60 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.jdbc.copy.gcs; + +import com.fasterxml.jackson.databind.JsonNode; + +public class GcsConfig { + + private final String projectId; + private final String bucketName; + private final String credentialsJson; + + public GcsConfig(String projectId, String bucketName, String credentialsJson) { + this.projectId = projectId; + this.bucketName = bucketName; + this.credentialsJson = credentialsJson; + } + + public String getProjectId() { + return projectId; + } + + public String getBucketName() { + return bucketName; + } + + public String getCredentialsJson() { + return credentialsJson; + } + + public static GcsConfig getGcsConfig(JsonNode config) { + return new GcsConfig( + config.get("loading_method").get("project_id").asText(), + config.get("loading_method").get("bucket_name").asText(), + config.get("loading_method").get("credentials_json").asText()); + } + +} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java new file mode 100644 index 00000000000..06d68a3c261 --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -0,0 +1,209 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.jdbc.copy.gcs; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.UUID; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class GcsStreamCopier implements StreamCopier { + + private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class); + + private final String gcsStagingFile; + private final Storage storageClient; + private final GcsConfig gcsConfig; + private final WriteChannel channel; + private final CSVPrinter csvPrinter; + private final String tmpTableName; + private final DestinationSyncMode destSyncMode; + private final String schemaName; + private final String streamName; + private final JdbcDatabase db; + private final ExtendedNameTransformer nameTransformer; + private final SqlOperations sqlOperations; + + public GcsStreamCopier(String stagingFolder, + DestinationSyncMode destSyncMode, + String schema, + String streamName, + Storage storageClient, + JdbcDatabase db, + GcsConfig gcsConfig, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations) { + this.destSyncMode = destSyncMode; + this.schemaName = schema; + this.streamName = streamName; + this.db = db; + this.nameTransformer = nameTransformer; + this.sqlOperations = sqlOperations; + this.tmpTableName = nameTransformer.getTmpTableName(streamName); + this.storageClient = storageClient; + this.gcsConfig = gcsConfig; + + this.gcsStagingFile = String.join("/", stagingFolder, schemaName, streamName); + + var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile); + var blobInfo = BlobInfo.newBuilder(blobId).build(); + var blob = storageClient.create(blobInfo); + this.channel = blob.writer(); + OutputStream outputStream = Channels.newOutputStream(channel); + + var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); + try { + this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception { + csvPrinter.printRecord(id, jsonDataString, emittedAt); + } + + @Override + public void closeStagingUploader(boolean hasFailed) throws Exception { + LOGGER.info("Uploading remaining data for {} stream.", streamName); + csvPrinter.close(); + channel.close(); + LOGGER.info("All data for {} stream uploaded.", streamName); + } + + @Override + public void copyStagingFileToTemporaryTable() throws Exception { + LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}.", tmpTableName, streamName, schemaName); + copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.getBucketName(), gcsStagingFile), schemaName, tmpTableName, gcsConfig); + LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); + } + + @Override + public void removeFileAndDropTmpTable() throws Exception { + LOGGER.info("Begin cleaning gcs staging file {}.", gcsStagingFile); + var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile); + if (storageClient.get(blobId).exists()) { + storageClient.delete(blobId); + } + LOGGER.info("GCS staging file {} cleaned.", gcsStagingFile); + + LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName); + sqlOperations.dropTableIfExists(db, schemaName, tmpTableName); + LOGGER.info("{} tmp table in destination cleaned.", tmpTableName); + } + + @Override + public void createDestinationSchema() throws Exception { + LOGGER.info("Creating schema in destination if it doesn't exist: {}", schemaName); + sqlOperations.createSchemaIfNotExists(db, schemaName); + } + + @Override + public void createTemporaryTable() throws Exception { + LOGGER.info("Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.", streamName, schemaName, tmpTableName); + sqlOperations.createTableIfNotExists(db, schemaName, tmpTableName); + } + + @Override + public String createDestinationTable() throws Exception { + var destTableName = nameTransformer.getRawTableName(streamName); + LOGGER.info("Preparing table {} in destination.", destTableName); + sqlOperations.createTableIfNotExists(db, schemaName, destTableName); + LOGGER.info("Table {} in destination prepared.", tmpTableName); + + return destTableName; + } + + @Override + public String generateMergeStatement(String destTableName) throws Exception { + LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName); + var queries = new StringBuilder(); + if (destSyncMode.equals(DestinationSyncMode.OVERWRITE)) { + queries.append(sqlOperations.truncateTableQuery(schemaName, destTableName)); + LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, will be truncated.", destTableName, schemaName); + } + queries.append(sqlOperations.copyTableQuery(schemaName, tmpTableName, destTableName)); + return queries.toString(); + } + + private static String getFullGcsPath(String bucketName, String stagingFile) { + // this is intentionally gcs:/ not gcs:// since the join adds the additional slash + return String.join("/", "gcs:/", bucketName, stagingFile); + } + + public static void attemptWriteToPersistence(GcsConfig gcsConfig) throws IOException { + final String outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""); + attemptWriteAndDeleteGcsObject(gcsConfig, outputTableName); + } + + private static void attemptWriteAndDeleteGcsObject(GcsConfig gcsConfig, String outputTableName) throws IOException { + var storage = getStorageClient(gcsConfig); + var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName); + var blobInfo = BlobInfo.newBuilder(blobId).build(); + + storage.create(blobInfo, "".getBytes()); + storage.delete(blobId); + } + + public static Storage getStorageClient(GcsConfig gcsConfig) throws IOException { + InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes()); + GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream); + return StorageOptions.newBuilder() + .setCredentials(credentials) + .setProjectId(gcsConfig.getProjectId()) + .build() + .getService(); + } + + public abstract void copyGcsCsvFileIntoTable(JdbcDatabase database, + String gcsFileLocation, + String schema, + String tableName, + GcsConfig gcsConfig) + throws SQLException; + +} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java new file mode 100644 index 00000000000..594eaf85285 --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.java @@ -0,0 +1,95 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.jdbc.copy.gcs; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.io.ByteArrayInputStream; +import java.io.InputStream; + +public abstract class GcsStreamCopierFactory implements StreamCopierFactory { + + /** + * Used by the copy consumer. + */ + @Override + public StreamCopier create(String configuredSchema, + GcsConfig gcsConfig, + String stagingFolder, + DestinationSyncMode syncMode, + AirbyteStream stream, + ExtendedNameTransformer nameTransformer, + JdbcDatabase db, + SqlOperations sqlOperations) { + try { + var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); + var schema = getSchema(stream, configuredSchema, nameTransformer); + + InputStream credentialsInputStream = new ByteArrayInputStream(gcsConfig.getCredentialsJson().getBytes()); + GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsInputStream); + Storage storageClient = StorageOptions.newBuilder() + .setCredentials(credentials) + .setProjectId(gcsConfig.getProjectId()) + .build() + .getService(); + + return create(stagingFolder, syncMode, schema, pair.getName(), storageClient, db, gcsConfig, nameTransformer, sqlOperations); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * For specific copier suppliers to implement. + */ + public abstract StreamCopier create(String stagingFolder, + DestinationSyncMode syncMode, + String schema, + String streamName, + Storage storageClient, + JdbcDatabase db, + GcsConfig gcsConfig, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations) + throws Exception; + + private String getSchema(AirbyteStream stream, String configuredSchema, ExtendedNameTransformer nameTransformer) { + if (stream.getNamespace() != null) { + return nameTransformer.convertStreamName(stream.getNamespace()); + } else { + return nameTransformer.convertStreamName(configuredSchema); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 9583dea8b70..5f37f8002a0 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -58,18 +58,18 @@ public abstract class S3StreamCopier implements StreamCopier { public static final int DEFAULT_PART_SIZE_MB = 10; private final String s3StagingFile; - private final DestinationSyncMode destSyncMode; - private final String schemaName; - private final String streamName; private final AmazonS3 s3Client; - private final JdbcDatabase db; private final S3Config s3Config; - private final ExtendedNameTransformer nameTransformer; - private final SqlOperations sqlOperations; private final StreamTransferManager multipartUploadManager; private final MultiPartOutputStream outputStream; private final CSVPrinter csvPrinter; private final String tmpTableName; + private final DestinationSyncMode destSyncMode; + private final String schemaName; + private final String streamName; + private final JdbcDatabase db; + private final ExtendedNameTransformer nameTransformer; + private final SqlOperations sqlOperations; public S3StreamCopier(String stagingFolder, DestinationSyncMode destSyncMode, @@ -83,15 +83,15 @@ public abstract class S3StreamCopier implements StreamCopier { this.destSyncMode = destSyncMode; this.schemaName = schema; this.streamName = streamName; - this.s3Client = client; this.db = db; - this.s3Config = s3Config; this.nameTransformer = nameTransformer; this.sqlOperations = sqlOperations; + this.tmpTableName = nameTransformer.getTmpTableName(streamName); + this.s3Client = client; + this.s3Config = s3Config; this.s3StagingFile = String.join("/", stagingFolder, schemaName, streamName); LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); - this.tmpTableName = nameTransformer.getTmpTableName(streamName); // The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not // have support for streaming multipart uploads; // The alternative is first writing the entire output to disk before loading into S3. This is not diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index ec077d4efea..6b93ddf7070 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.2 +LABEL io.airbyte.version=0.3.3 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index e15b1e99370..7275ba952c2 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -9,6 +9,8 @@ application { } dependencies { + implementation 'com.google.cloud:google-cloud-storage:1.113.16' + implementation 'com.google.auth:google-auth-library-oauth2-http:0.25.5' implementation 'net.snowflake:snowflake-jdbc:3.12.14' implementation 'org.apache.commons:commons-csv:1.4' implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2' diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyGcsDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyGcsDestination.java new file mode 100644 index 00000000000..0e6da1db880 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyGcsDestination.java @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.snowflake; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.CopyConsumer; +import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; +import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsConfig; +import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsStreamCopier; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; + +public class SnowflakeCopyGcsDestination extends CopyDestination { + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception { + return new CopyConsumer<>( + getConfiguredSchema(config), + GcsConfig.getGcsConfig(config), + catalog, + getDatabase(config), + new SnowflakeGcsStreamCopierFactory(), + getSqlOperations(), + getNameTransformer()); + } + + @Override + public void checkPersistence(JsonNode config) throws Exception { + GcsStreamCopier.attemptWriteToPersistence(GcsConfig.getGcsConfig(config)); + } + + @Override + public ExtendedNameTransformer getNameTransformer() { + return new SnowflakeSQLNameTransformer(); + } + + @Override + public JdbcDatabase getDatabase(JsonNode config) throws Exception { + return SnowflakeDatabase.getDatabase(config); + } + + @Override + public SqlOperations getSqlOperations() { + return new SnowflakeSqlOperations(); + } + + private String getConfiguredSchema(JsonNode config) { + return config.get("schema").asText(); + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index a8bf011d865..331ba7cfe65 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -39,7 +39,8 @@ public class SnowflakeDestination extends SwitchingDestination getTypeToDestination() { final SnowflakeInsertDestination insertDestination = new SnowflakeInsertDestination(); final SnowflakeCopyS3Destination copyS3Destination = new SnowflakeCopyS3Destination(); + final SnowflakeCopyGcsDestination copyGcsDestination = new SnowflakeCopyGcsDestination(); return ImmutableMap.of( DestinationType.INSERT, insertDestination, - DestinationType.COPY_S3, copyS3Destination); + DestinationType.COPY_S3, copyS3Destination, + DestinationType.COPY_GCS, copyGcsDestination); } public static void main(String[] args) throws Exception { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopier.java new file mode 100644 index 00000000000..304d58c669c --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopier.java @@ -0,0 +1,66 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.snowflake; + +import com.google.cloud.storage.Storage; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsConfig; +import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsStreamCopier; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.sql.SQLException; + +public class SnowflakeGcsStreamCopier extends GcsStreamCopier { + + public SnowflakeGcsStreamCopier(String stagingFolder, + DestinationSyncMode destSyncMode, + String schema, + String streamName, + Storage storageClient, + JdbcDatabase db, + GcsConfig gcsConfig, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations) { + super(stagingFolder, destSyncMode, schema, streamName, storageClient, db, gcsConfig, nameTransformer, sqlOperations); + } + + @Override + public void copyGcsCsvFileIntoTable(JdbcDatabase database, + String gcsFileLocation, + String schema, + String tableName, + GcsConfig gcsConfig) + throws SQLException { + final var copyQuery = String.format( + "COPY INTO %s.%s FROM '%s' storage_integration = gcs_airbyte_integration file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"');", + schema, + tableName, + gcsFileLocation); + + database.execute(copyQuery); + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopierFactory.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopierFactory.java new file mode 100644 index 00000000000..ed4976cea37 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopierFactory.java @@ -0,0 +1,61 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.snowflake; + +import com.google.cloud.storage.Storage; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsConfig; +import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsStreamCopierFactory; +import io.airbyte.protocol.models.DestinationSyncMode; + +public class SnowflakeGcsStreamCopierFactory extends GcsStreamCopierFactory { + + @Override + public StreamCopier create(String stagingFolder, + DestinationSyncMode syncMode, + String schema, + String streamName, + Storage storageClient, + JdbcDatabase db, + GcsConfig gcsConfig, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations) + throws Exception { + return new SnowflakeGcsStreamCopier( + stagingFolder, + syncMode, + schema, + streamName, + storageClient, + db, + gcsConfig, + nameTransformer, + sqlOperations); + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json index 4e4355b490f..57739221737 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json @@ -145,6 +145,36 @@ "order": 3 } } + }, + { + "title": "GCS Staging", + "additionalProperties": false, + "description": "Writes large batches of records to a file, uploads the file to GCS, then uses
COPY INTO table
to upload the file. Recommended for large production workloads for better speed and scalability.", + "required": ["project_id", "bucket_name", "credentials_json"], + "properties": { + "project_id": { + "title": "GCP Project ID", + "type": "string", + "description": "The name of the GCP project ID for your credentials.", + "examples": ["my-project"], + "order": 0 + }, + "bucket_name": { + "title": "GCS Bucket Name", + "type": "string", + "description": "The name of the staging GCS bucket. Airbyte will write files to this bucket and read them via
COPY
statements on Snowflake.", + "examples": ["airbyte-staging"], + "order": 1 + }, + "credentials_json": { + "title": "Google Application Credentials", + "type": "string", + "description": "The contents of the JSON key file that has read/write permissions to the staging GCS bucket. You will separately need to grant bucket access to your Snowflake GCP service account. See the GCP docs for more information on how to generate a JSON key for your service account.", + "airbyte_secret": true, + "multiline": true, + "order": 3 + } + } } ] }, diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyIntegrationTest.java new file mode 100644 index 00000000000..64a17c4f028 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyIntegrationTest.java @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.snowflake; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.nio.file.Path; + +public class SnowflakeGcsCopyIntegrationTest extends SnowflakeInsertIntegrationTest { + + @Override + public JsonNode getStaticConfig() { + final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_gcs_config.json"))); + Preconditions.checkArgument(SnowflakeDestination.isGcsCopy(copyConfig)); + Preconditions.checkArgument(!SnowflakeDestination.isS3Copy(copyConfig)); + return copyConfig; + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertIntegrationTest.java index b096fd8ff60..742de5ed153 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertIntegrationTest.java @@ -60,7 +60,8 @@ public class SnowflakeInsertIntegrationTest extends TestDestination { public JsonNode getStaticConfig() { final JsonNode insertConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/insert_config.json"))); - Preconditions.checkArgument(!SnowflakeDestination.isCopy(insertConfig)); + Preconditions.checkArgument(!SnowflakeDestination.isS3Copy(insertConfig)); + Preconditions.checkArgument(!SnowflakeDestination.isGcsCopy(insertConfig)); return insertConfig; } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyIntegrationTest.java index 4749b086fdf..90c57b2b20b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3CopyIntegrationTest.java @@ -35,7 +35,8 @@ public class SnowflakeS3CopyIntegrationTest extends SnowflakeInsertIntegrationTe @Override public JsonNode getStaticConfig() { final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_s3_config.json"))); - Preconditions.checkArgument(SnowflakeDestination.isCopy(copyConfig)); + Preconditions.checkArgument(SnowflakeDestination.isS3Copy(copyConfig)); + Preconditions.checkArgument(!SnowflakeDestination.isGcsCopy(copyConfig)); return copyConfig; } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java index 9d2f417b122..7b1a704d165 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java @@ -37,7 +37,7 @@ public class SnowflakeDestinationTest { @Test @DisplayName("When given S3 credentials should use COPY") - public void useCopyStrategyTest() { + public void useS3CopyStrategyTest() { var stubLoadingMethod = mapper.createObjectNode(); stubLoadingMethod.put("s3_bucket_name", "fake-bucket"); stubLoadingMethod.put("access_key_id", "test"); @@ -46,7 +46,21 @@ public class SnowflakeDestinationTest { var stubConfig = mapper.createObjectNode(); stubConfig.set("loading_method", stubLoadingMethod); - assertTrue(SnowflakeDestination.isCopy(stubConfig)); + assertTrue(SnowflakeDestination.isS3Copy(stubConfig)); + } + + @Test + @DisplayName("When given GCS credentials should use COPY") + public void useGcsCopyStrategyTest() { + var stubLoadingMethod = mapper.createObjectNode(); + stubLoadingMethod.put("project_id", "my-project"); + stubLoadingMethod.put("bucket_name", "my-bucket"); + stubLoadingMethod.put("credentials_json", "hunter2"); + + var stubConfig = mapper.createObjectNode(); + stubConfig.set("loading_method", stubLoadingMethod); + + assertTrue(SnowflakeDestination.isGcsCopy(stubConfig)); } @Test @@ -55,7 +69,7 @@ public class SnowflakeDestinationTest { var stubLoadingMethod = mapper.createObjectNode(); var stubConfig = mapper.createObjectNode(); stubConfig.set("loading_method", stubLoadingMethod); - assertFalse(SnowflakeDestination.isCopy(stubConfig)); + assertFalse(SnowflakeDestination.isS3Copy(stubConfig)); } } diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 9437389bf4d..8ec210b6b42 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -143,3 +143,42 @@ When an identifier is double-quoted, it is stored and resolved exactly as entere Therefore, Airbyte Snowflake destination will create tables and schemas using the Unquoted identifiers when possible or fallback to Quoted Identifiers if the names are containing special characters. +## Cloud Storage Staging +By default, Airbyte uses batches of `INSERT` commands to add data to a temporary table before copying it over to the final table in Snowflake. This is too slow for larger/multi-GB replications. For those larger replications we recommend configuring using cloud storage to allow batch writes and loading. + +### AWS S3 + +For AWS S3, you will need to create a bucket and provide credentials to access the bucket. We recommend creating a bucket that is only used for Airbyte to stage data to Snowflake. Airbyte needs read/write access to interact with this bucket. + +### Google Cloud Storage (GCS) + +First you will need to create a GCS bucket. + +Then you will need to run the script below: +* You must run the script as the account admin for Snowflake. +* You should replace `AIRBYTE_ROLE` with the role you used for Airbyte's Snowflake configuration. +* Replace `YOURBUCKETNAME` with your bucket name +* The stage name can be modified to any valid name. +* `gcs_airbyte_integration` must be used + +The script: +``` +create storage INTEGRATION gcs_airbyte_integration + TYPE = EXTERNAL_STAGE + STORAGE_PROVIDER = GCS + ENABLED = TRUE + STORAGE_ALLOWED_LOCATIONS = ('gcs://YOURBUCKETNAME'); + +create stage gcs_airbyte_stage + url = 'gcs://io_airbyte_test_staging' + storage_integration = gcs_airbyte_integration; + +GRANT USAGE ON integration gcs_airbyte_integration TO ROLE AIRBYTE_ROLE; +GRANT USAGE ON stage gcs_airbyte_stage TO ROLE AIRBYTE_ROLE; + +DESC STORAGE INTEGRATION gcs_airbyte_integration; +``` + +The final query should show a `STORAGE_GCP_SERVICE_ACCOUNT` property with an email as the property value. + +Finally, you need to add read/write permissions to your bucket with that email. diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index fe023c7c230..898cdfc2ab2 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -24,6 +24,7 @@ function write_standard_creds() { write_standard_creds destination-bigquery "$BIGQUERY_INTEGRATION_TEST_CREDS" "credentials.json" write_standard_creds destination-snowflake "$SNOWFLAKE_INTEGRATION_TEST_CREDS" "insert_config.json" write_standard_creds destination-snowflake "$SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS" "copy_s3_config.json" +write_standard_creds destination-snowflake "$SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS" "copy_gcs_config.json" write_standard_creds destination-redshift "$AWS_REDSHIFT_INTEGRATION_TEST_CREDS" write_standard_creds base-normalization "$BIGQUERY_INTEGRATION_TEST_CREDS" "bigquery.json"