🚨 Removes defunct Azure Blob Storage laoding option for Snowflake 🚨 (#25739)
* Removes defunct Azure Blob Storage laoding option for Snowflake * Bumps to major version and removes documentation that references AzureBlobStorage * Updates the destination_definitions.yaml * Run ProcessResources to match version of 1.0.0 mismatched spec * Pinning urllib to older version since the 2.0 version removed classes
This commit is contained in:
@@ -377,7 +377,7 @@
|
||||
- name: Snowflake
|
||||
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
|
||||
dockerRepository: airbyte/destination-snowflake
|
||||
dockerImageTag: 0.4.63
|
||||
dockerImageTag: 1.0.0
|
||||
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
|
||||
icon: snowflake.svg
|
||||
normalizationConfig:
|
||||
|
||||
@@ -6604,7 +6604,7 @@
|
||||
supported_destination_sync_modes:
|
||||
- "overwrite"
|
||||
- "append"
|
||||
- dockerImage: "airbyte/destination-snowflake:0.4.63"
|
||||
- dockerImage: "airbyte/destination-snowflake:1.0.0"
|
||||
spec:
|
||||
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
|
||||
connectionSpecification:
|
||||
@@ -6965,58 +6965,6 @@
|
||||
airbyte_secret: true
|
||||
multiline: true
|
||||
order: 3
|
||||
- title: "Azure Blob Storage Staging"
|
||||
description: "Recommended for large production workloads for better speed\
|
||||
\ and scalability."
|
||||
required:
|
||||
- "method"
|
||||
- "azure_blob_storage_account_name"
|
||||
- "azure_blob_storage_container_name"
|
||||
- "azure_blob_storage_sas_token"
|
||||
properties:
|
||||
method:
|
||||
title: ""
|
||||
description: ""
|
||||
type: "string"
|
||||
enum:
|
||||
- "Azure Blob Staging"
|
||||
default: "Azure Blob Staging"
|
||||
order: 0
|
||||
azure_blob_storage_endpoint_domain_name:
|
||||
title: "Azure Blob Storage Endpoint"
|
||||
type: "string"
|
||||
default: "blob.core.windows.net"
|
||||
description: "Enter the Azure Blob Storage <a href=\"https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview#storage-account-endpoints\"\
|
||||
>endpoint domain name</a>"
|
||||
examples:
|
||||
- "blob.core.windows.net"
|
||||
order: 1
|
||||
azure_blob_storage_account_name:
|
||||
title: "Azure Blob Storage <a href=\"https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview#storage-account-endpoints\"\
|
||||
>account name</a>"
|
||||
type: "string"
|
||||
description: "Enter your Azure Blob Storage account name"
|
||||
examples:
|
||||
- "airbyte5storage"
|
||||
order: 2
|
||||
azure_blob_storage_container_name:
|
||||
title: "Azure Blob Storage Container Name"
|
||||
type: "string"
|
||||
description: "Enter your Azure Blob Storage <a href=\"https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names\"\
|
||||
>container name</a>"
|
||||
examples:
|
||||
- "airbytetestcontainername"
|
||||
order: 3
|
||||
azure_blob_storage_sas_token:
|
||||
title: "SAS Token"
|
||||
type: "string"
|
||||
airbyte_secret: true
|
||||
description: "Enter the <a href=\"https://docs.snowflake.com/en/user-guide/data-load-azure-config.html#option-2-generating-a-sas-token\"\
|
||||
>Shared access signature</a> (SAS) token to grant Snowflake limited\
|
||||
\ access to objects in your Azure Blob Storage account"
|
||||
examples:
|
||||
- "?sv=2016-05-31&ss=b&srt=sco&sp=rwdl&se=2018-06-27T10:05:50Z&st=2017-06-27T02:05:50Z&spr=https,http&sig=bgqQwoXwxzuD2GJfagRg7VOS8hzNr3QLT7rhS8OFRLQ%3D"
|
||||
order: 4
|
||||
file_buffer_count:
|
||||
title: "File Buffer Count"
|
||||
type: "integer"
|
||||
|
||||
@@ -6380,7 +6380,7 @@
|
||||
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
|
||||
"name": "Snowflake",
|
||||
"dockerRepository": "airbyte/destination-snowflake",
|
||||
"dockerImageTag": "0.4.63",
|
||||
"dockerImageTag": "1.0.0",
|
||||
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/snowflake",
|
||||
"icon": "snowflake.svg",
|
||||
"spec": {
|
||||
@@ -6701,50 +6701,6 @@
|
||||
"order": 3
|
||||
}
|
||||
}
|
||||
}, {
|
||||
"title": "Azure Blob Storage Staging",
|
||||
"description": "Recommended for large production workloads for better speed and scalability.",
|
||||
"required": [ "method", "azure_blob_storage_account_name", "azure_blob_storage_container_name", "azure_blob_storage_sas_token" ],
|
||||
"properties": {
|
||||
"method": {
|
||||
"title": "",
|
||||
"description": "",
|
||||
"type": "string",
|
||||
"enum": [ "Azure Blob Staging" ],
|
||||
"default": "Azure Blob Staging",
|
||||
"order": 0
|
||||
},
|
||||
"azure_blob_storage_endpoint_domain_name": {
|
||||
"title": "Azure Blob Storage Endpoint",
|
||||
"type": "string",
|
||||
"default": "blob.core.windows.net",
|
||||
"description": "Enter the Azure Blob Storage <a href=\"https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview#storage-account-endpoints\">endpoint domain name</a>",
|
||||
"examples": [ "blob.core.windows.net" ],
|
||||
"order": 1
|
||||
},
|
||||
"azure_blob_storage_account_name": {
|
||||
"title": "Azure Blob Storage <a href=\"https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview#storage-account-endpoints\">account name</a>",
|
||||
"type": "string",
|
||||
"description": "Enter your Azure Blob Storage account name",
|
||||
"examples": [ "airbyte5storage" ],
|
||||
"order": 2
|
||||
},
|
||||
"azure_blob_storage_container_name": {
|
||||
"title": "Azure Blob Storage Container Name",
|
||||
"type": "string",
|
||||
"description": "Enter your Azure Blob Storage <a href=\"https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names\">container name</a>",
|
||||
"examples": [ "airbytetestcontainername" ],
|
||||
"order": 3
|
||||
},
|
||||
"azure_blob_storage_sas_token": {
|
||||
"title": "SAS Token",
|
||||
"type": "string",
|
||||
"airbyte_secret": true,
|
||||
"description": "Enter the <a href=\"https://docs.snowflake.com/en/user-guide/data-load-azure-config.html#option-2-generating-a-sas-token\">Shared access signature</a> (SAS) token to grant Snowflake limited access to objects in your Azure Blob Storage account",
|
||||
"examples": [ "?sv=2016-05-31&ss=b&srt=sco&sp=rwdl&se=2018-06-27T10:05:50Z&st=2017-06-27T02:05:50Z&spr=https,http&sig=bgqQwoXwxzuD2GJfagRg7VOS8hzNr3QLT7rhS8OFRLQ%3D" ],
|
||||
"order": 4
|
||||
}
|
||||
}
|
||||
} ]
|
||||
},
|
||||
"file_buffer_count": {
|
||||
|
||||
@@ -26,6 +26,10 @@ MAIN_REQUIREMENTS = [
|
||||
"pytest-cov~=3.0.0",
|
||||
"hypothesis~=6.54.1",
|
||||
"hypothesis-jsonschema~=0.20.1", # TODO alafanechere upgrade to latest when jsonschema lib is upgraded to >= 4.0.0 in airbyte-cdk and connector acceptance tests
|
||||
# Pinning requests and urllib3 to avoid an issue with dockerpy and requests 2.
|
||||
# Related issue: https://github.com/docker/docker-py/issues/3113
|
||||
"urllib3<2.0",
|
||||
"requests<2.29.0",
|
||||
]
|
||||
|
||||
setuptools.setup(
|
||||
|
||||
@@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
|
||||
|
||||
ENV ENABLE_SENTRY true
|
||||
|
||||
LABEL io.airbyte.version=0.4.63
|
||||
LABEL io.airbyte.version=1.0.0
|
||||
LABEL io.airbyte.name=airbyte/destination-snowflake
|
||||
|
||||
@@ -24,17 +24,16 @@
|
||||
|
||||
Put the contents of the following LastPass secrets into corresponding files under the `secrets` directory:
|
||||
|
||||
| LastPass Secret | File |
|
||||
| --- | --- |
|
||||
| `destination snowflake - test creds (secrets/config.json)` | `secrets/config.json` |
|
||||
| `destination snowflake - insert test creds (secrets/insert_config.json)` | `secrets/insert_config.json` |
|
||||
| `destination snowflake - internal staging test creds (secrets/internal_staging_config.json)` | `secrets/internal_staging_config.json` |
|
||||
| `destination snowflake - internal staging key pair (secrets/config_key_pair.json)` | `secrets/config_key_pair.json` |
|
||||
| LastPass Secret | File |
|
||||
|--------------------------------------------------------------------------------------------------------|------------------------------------------|
|
||||
| `destination snowflake - test creds (secrets/config.json)` | `secrets/config.json` |
|
||||
| `destination snowflake - insert test creds (secrets/insert_config.json)` | `secrets/insert_config.json` |
|
||||
| `destination snowflake - internal staging test creds (secrets/internal_staging_config.json)` | `secrets/internal_staging_config.json` |
|
||||
| `destination snowflake - internal staging key pair (secrets/config_key_pair.json)` | `secrets/config_key_pair.json` |
|
||||
| `destination snowflake - internal staging key pair encrypted (secrets/config_key_pair_encrypted.json)` | `secrets/config_key_pair_encrypted.json` |
|
||||
| `destination snowflake - s3 staging test creds (secrets/copy_s3_config.json)` | `secrets/copy_s3_config.json` |
|
||||
| `destination snowflake - s3 staging encrypted test creds (secrets/copy_s3_encrypted_config.json)` | `secrets/copy_s3_encrypted_config.json` |
|
||||
| `destination snowflake - gcs staging test creds (secrets/copy_gcs_config.json)` | `secrets/copy_gcs_config.json` |
|
||||
| `destination snowflake - azure blob staging test creds (secrets/copy_azure_blob_config.json)` | `secrets/copy_azure_blob_config.json` |
|
||||
| `destination snowflake - s3 staging test creds (secrets/copy_s3_config.json)` | `secrets/copy_s3_config.json` |
|
||||
| `destination snowflake - s3 staging encrypted test creds (secrets/copy_s3_encrypted_config.json)` | `secrets/copy_s3_encrypted_config.json` |
|
||||
| `destination snowflake - gcs staging test creds (secrets/copy_gcs_config.json)` | `secrets/copy_gcs_config.json` |
|
||||
|
||||
The query timeout for insert data to table has been updated from 30 minutes to 3 hours.
|
||||
|
||||
@@ -90,11 +89,3 @@ DROP USER IF EXISTS INTEGRATION_TEST_USER_DESTINATION;
|
||||
DROP ROLE IF EXISTS INTEGRATION_TESTER_DESTINATION;
|
||||
DROP WAREHOUSE IF EXISTS INTEGRATION_TEST_WAREHOUSE_DESTINATION;
|
||||
```
|
||||
|
||||
## Setting up the Azure Blob Storage test infra
|
||||
1. Follow the [Destination Azure Blob Storage](../destination-azure-blob-storage/README.md#infra-setup) setup guide, with these differences:
|
||||
1. The container name is `snowflake-staging`
|
||||
1. Instead of getting an access key, you need a `Shared access token`. This belongs to the container (NOT the storage account).
|
||||
1. Give the key the appropriate permissions (which I believe is all of them)
|
||||
1. And a distant-future expiry - there's no way to generate a non-expiring token.
|
||||
1. The `Blob SAS token` is the `azure_blob_storage_sas_token` config field.
|
||||
|
||||
@@ -34,7 +34,6 @@ dependencies {
|
||||
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'
|
||||
implementation "io.aesy:datasize:1.0.0"
|
||||
implementation 'com.zaxxer:HikariCP:5.0.1'
|
||||
implementation 'com.azure:azure-storage-blob:12.12.0'
|
||||
|
||||
implementation project(':airbyte-config-oss:config-models-oss')
|
||||
implementation project(':airbyte-db:db-lib')
|
||||
|
||||
@@ -7,7 +7,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: destination
|
||||
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
|
||||
dockerImageTag: 0.4.61
|
||||
dockerImageTag: 1.0.0
|
||||
dockerRepository: airbyte/destination-snowflake
|
||||
githubIssueLabel: destination-snowflake
|
||||
icon: snowflake.svg
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.snowflake;
|
||||
|
||||
import static io.airbyte.integrations.destination.snowflake.SnowflakeS3StreamCopier.MAX_FILES_PER_COPY;
|
||||
|
||||
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.airbyte.commons.lang.Exceptions;
|
||||
import io.airbyte.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.integrations.destination.StandardNameTransformer;
|
||||
import io.airbyte.integrations.destination.jdbc.SqlOperations;
|
||||
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
|
||||
import io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageConfig;
|
||||
import io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageStreamCopier;
|
||||
import io.airbyte.protocol.models.v0.DestinationSyncMode;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class SnowflakeAzureBlobStorageStreamCopier extends AzureBlobStorageStreamCopier implements SnowflakeParallelCopyStreamCopier {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeAzureBlobStorageStreamCopier.class);
|
||||
|
||||
public SnowflakeAzureBlobStorageStreamCopier(final String stagingFolder,
|
||||
final DestinationSyncMode destSyncMode,
|
||||
final String schema,
|
||||
final String streamName,
|
||||
final SpecializedBlobClientBuilder specializedBlobClientBuilder,
|
||||
final JdbcDatabase db,
|
||||
final AzureBlobStorageConfig azureBlobConfig,
|
||||
final StandardNameTransformer nameTransformer,
|
||||
final SqlOperations sqlOperations,
|
||||
final StagingFilenameGenerator stagingFilenameGenerator) {
|
||||
super(stagingFolder, destSyncMode, schema, streamName, specializedBlobClientBuilder, db, azureBlobConfig, nameTransformer, sqlOperations);
|
||||
this.filenameGenerator = stagingFilenameGenerator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copyStagingFileToTemporaryTable() throws Exception {
|
||||
List<List<String>> partitions = Lists.partition(new ArrayList<>(azureStagingFiles), MAX_FILES_PER_COPY);
|
||||
LOGGER.info("Starting parallel copy to tmp table: {} in destination for stream: {}, schema: {}. Chunks count {}", tmpTableName, streamName,
|
||||
schemaName, partitions.size());
|
||||
|
||||
copyFilesInParallel(partitions);
|
||||
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copyAzureBlobCsvFileIntoTable(
|
||||
JdbcDatabase database,
|
||||
String snowflakeAzureExternalStageName,
|
||||
String schema,
|
||||
String tableName,
|
||||
AzureBlobStorageConfig config)
|
||||
throws SQLException {
|
||||
throw new RuntimeException("Snowflake Azure Stream Copier should not copy individual files without use of a parallel copy");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copyIntoStage(List<String> files) {
|
||||
|
||||
final var copyQuery = String.format(
|
||||
"COPY INTO %s.%s FROM '%s'"
|
||||
+ " credentials=(azure_sas_token='%s')"
|
||||
+ " file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"')"
|
||||
+ " files = (" + generateFilesList(files) + " );",
|
||||
schemaName,
|
||||
tmpTableName,
|
||||
generateBucketPath(),
|
||||
azureBlobConfig.getSasToken());
|
||||
|
||||
Exceptions.toRuntime(() -> db.execute(copyQuery));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String generateBucketPath() {
|
||||
return "azure://" + azureBlobConfig.getAccountName() + "." + azureBlobConfig.getEndpointDomainName()
|
||||
+ "/" + azureBlobConfig.getContainerName() + "/" + stagingFolder + "/" + schemaName + "/";
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.snowflake;
|
||||
|
||||
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
|
||||
import io.airbyte.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.integrations.destination.StandardNameTransformer;
|
||||
import io.airbyte.integrations.destination.jdbc.SqlOperations;
|
||||
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
|
||||
import io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants;
|
||||
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
|
||||
import io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageConfig;
|
||||
import io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageStreamCopierFactory;
|
||||
import io.airbyte.protocol.models.v0.DestinationSyncMode;
|
||||
|
||||
public class SnowflakeAzureBlobStorageStreamCopierFactory extends AzureBlobStorageStreamCopierFactory {
|
||||
|
||||
@Override
|
||||
public StreamCopier create(final String stagingFolder,
|
||||
final DestinationSyncMode syncMode,
|
||||
final String schema,
|
||||
final String streamName,
|
||||
final SpecializedBlobClientBuilder specializedBlobClientBuilder,
|
||||
final JdbcDatabase db,
|
||||
final AzureBlobStorageConfig azureBlobConfig,
|
||||
final StandardNameTransformer nameTransformer,
|
||||
final SqlOperations sqlOperations)
|
||||
throws Exception {
|
||||
return new SnowflakeAzureBlobStorageStreamCopier(stagingFolder,
|
||||
syncMode,
|
||||
schema,
|
||||
streamName,
|
||||
specializedBlobClientBuilder,
|
||||
db,
|
||||
azureBlobConfig,
|
||||
nameTransformer,
|
||||
sqlOperations,
|
||||
new StagingFilenameGenerator(streamName, GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,87 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.snowflake;
|
||||
|
||||
import static io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageConfig.getAzureBlobConfig;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import io.airbyte.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.integrations.base.AirbyteMessageConsumer;
|
||||
import io.airbyte.integrations.destination.NamingConventionTransformer;
|
||||
import io.airbyte.integrations.destination.StandardNameTransformer;
|
||||
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
|
||||
import io.airbyte.integrations.destination.jdbc.SqlOperations;
|
||||
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
|
||||
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
|
||||
import io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageStreamCopier;
|
||||
import io.airbyte.protocol.models.v0.AirbyteMessage;
|
||||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
|
||||
import java.util.function.Consumer;
|
||||
import javax.sql.DataSource;
|
||||
|
||||
public class SnowflakeCopyAzureBlobStorageDestination extends CopyDestination {
|
||||
|
||||
private final String airbyteEnvironment;
|
||||
|
||||
public SnowflakeCopyAzureBlobStorageDestination(final String airbyteEnvironment) {
|
||||
this.airbyteEnvironment = airbyteEnvironment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AirbyteMessageConsumer getConsumer(final JsonNode config,
|
||||
final ConfiguredAirbyteCatalog catalog,
|
||||
final Consumer<AirbyteMessage> outputRecordCollector) {
|
||||
final DataSource dataSource = getDataSource(config);
|
||||
return CopyConsumerFactory.create(
|
||||
outputRecordCollector,
|
||||
dataSource,
|
||||
getDatabase(dataSource),
|
||||
getSqlOperations(),
|
||||
getNameTransformer(),
|
||||
getAzureBlobConfig(config.get("loading_method")),
|
||||
catalog,
|
||||
new SnowflakeAzureBlobStorageStreamCopierFactory(),
|
||||
getConfiguredSchema(config));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkPersistence(final JsonNode config) {
|
||||
AzureBlobStorageStreamCopier.attemptAzureBlobWriteAndDelete(getAzureBlobConfig(config.get("loading_method")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandardNameTransformer getNameTransformer() {
|
||||
return new SnowflakeSQLNameTransformer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSource getDataSource(final JsonNode config) {
|
||||
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JdbcDatabase getDatabase(final DataSource dataSource) {
|
||||
return SnowflakeDatabase.getDatabase(dataSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SqlOperations getSqlOperations() {
|
||||
return new SnowflakeSqlOperations();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void performCreateInsertTestOnDestination(final String outputSchema,
|
||||
final JdbcDatabase database,
|
||||
final NamingConventionTransformer nameTransformer)
|
||||
throws Exception {
|
||||
AbstractJdbcDestination.attemptTableOperations(outputSchema, database, nameTransformer,
|
||||
getSqlOperations(), true);
|
||||
}
|
||||
|
||||
private String getConfiguredSchema(final JsonNode config) {
|
||||
return config.get("schema").asText();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -15,7 +15,6 @@ public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestinat
|
||||
enum DestinationType {
|
||||
COPY_S3,
|
||||
COPY_GCS,
|
||||
COPY_AZURE_BLOB,
|
||||
INTERNAL_STAGING
|
||||
}
|
||||
|
||||
|
||||
@@ -18,8 +18,6 @@ public class SnowflakeDestinationResolver {
|
||||
return DestinationType.COPY_S3;
|
||||
} else if (isGcsCopy(config)) {
|
||||
return DestinationType.COPY_GCS;
|
||||
} else if (isAzureBlobCopy(config)) {
|
||||
return DestinationType.COPY_AZURE_BLOB;
|
||||
} else {
|
||||
return DestinationType.INTERNAL_STAGING;
|
||||
}
|
||||
@@ -33,11 +31,6 @@ public class SnowflakeDestinationResolver {
|
||||
return config.has("loading_method") && config.get("loading_method").isObject() && config.get("loading_method").has("project_id");
|
||||
}
|
||||
|
||||
public static boolean isAzureBlobCopy(final JsonNode config) {
|
||||
return config.has("loading_method") && config.get("loading_method").isObject()
|
||||
&& config.get("loading_method").has("azure_blob_storage_account_name");
|
||||
}
|
||||
|
||||
public static int getNumberOfFileBuffers(final JsonNode config) {
|
||||
int numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER;
|
||||
if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) {
|
||||
@@ -53,12 +46,10 @@ public class SnowflakeDestinationResolver {
|
||||
final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination(airbyteEnvironment);
|
||||
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination(airbyteEnvironment);
|
||||
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination(airbyteEnvironment);
|
||||
final SnowflakeCopyAzureBlobStorageDestination azureBlobStorageDestination = new SnowflakeCopyAzureBlobStorageDestination(airbyteEnvironment);
|
||||
|
||||
return ImmutableMap.of(
|
||||
DestinationType.COPY_S3, s3StagingDestination,
|
||||
DestinationType.COPY_GCS, gcsStagingDestination,
|
||||
DestinationType.COPY_AZURE_BLOB, azureBlobStorageDestination,
|
||||
DestinationType.INTERNAL_STAGING, internalStagingDestination);
|
||||
}
|
||||
|
||||
|
||||
@@ -373,58 +373,6 @@
|
||||
"order": 3
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Azure Blob Storage Staging",
|
||||
"description": "Recommended for large production workloads for better speed and scalability.",
|
||||
"required": [
|
||||
"method",
|
||||
"azure_blob_storage_account_name",
|
||||
"azure_blob_storage_container_name",
|
||||
"azure_blob_storage_sas_token"
|
||||
],
|
||||
"properties": {
|
||||
"method": {
|
||||
"title": "",
|
||||
"description": "",
|
||||
"type": "string",
|
||||
"enum": ["Azure Blob Staging"],
|
||||
"default": "Azure Blob Staging",
|
||||
"order": 0
|
||||
},
|
||||
"azure_blob_storage_endpoint_domain_name": {
|
||||
"title": "Azure Blob Storage Endpoint",
|
||||
"type": "string",
|
||||
"default": "blob.core.windows.net",
|
||||
"description": "Enter the Azure Blob Storage <a href=\"https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview#storage-account-endpoints\">endpoint domain name</a>",
|
||||
"examples": ["blob.core.windows.net"],
|
||||
"order": 1
|
||||
},
|
||||
"azure_blob_storage_account_name": {
|
||||
"title": "Azure Blob Storage <a href=\"https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview#storage-account-endpoints\">account name</a>",
|
||||
"type": "string",
|
||||
"description": "Enter your Azure Blob Storage account name",
|
||||
"examples": ["airbyte5storage"],
|
||||
"order": 2
|
||||
},
|
||||
"azure_blob_storage_container_name": {
|
||||
"title": "Azure Blob Storage Container Name",
|
||||
"type": "string",
|
||||
"description": "Enter your Azure Blob Storage <a href=\"https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names\">container name</a>",
|
||||
"examples": ["airbytetestcontainername"],
|
||||
"order": 3
|
||||
},
|
||||
"azure_blob_storage_sas_token": {
|
||||
"title": "SAS Token",
|
||||
"type": "string",
|
||||
"airbyte_secret": true,
|
||||
"description": "Enter the <a href=\"https://docs.snowflake.com/en/user-guide/data-load-azure-config.html#option-2-generating-a-sas-token\">Shared access signature</a> (SAS) token to grant Snowflake limited access to objects in your Azure Blob Storage account",
|
||||
"examples": [
|
||||
"?sv=2016-05-31&ss=b&srt=sco&sp=rwdl&se=2018-06-27T10:05:50Z&st=2017-06-27T02:05:50Z&spr=https,http&sig=bgqQwoXwxzuD2GJfagRg7VOS8hzNr3QLT7rhS8OFRLQ%3D"
|
||||
],
|
||||
"order": 4
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.snowflake;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.api.client.util.Preconditions;
|
||||
import io.airbyte.commons.io.IOs;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class SnowflakeAzureBlobCopyDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {
|
||||
|
||||
@Override
|
||||
public JsonNode getStaticConfig() {
|
||||
final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_azure_blob_config.json")));
|
||||
Preconditions.checkArgument(SnowflakeDestinationResolver.isAzureBlobCopy(copyConfig));
|
||||
Preconditions.checkArgument(!SnowflakeDestinationResolver.isS3Copy(copyConfig));
|
||||
Preconditions.checkArgument(!SnowflakeDestinationResolver.isGcsCopy(copyConfig));
|
||||
return copyConfig;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,75 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.destination.snowflake;
|
||||
|
||||
import static io.airbyte.integrations.destination.jdbc.copy.gcs.GcsStreamCopier.MAX_PARTS_PER_FILE;
|
||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.airbyte.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.integrations.destination.StandardNameTransformer;
|
||||
import io.airbyte.integrations.destination.jdbc.SqlOperations;
|
||||
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
|
||||
import io.airbyte.integrations.destination.jdbc.copy.azure.AzureBlobStorageConfig;
|
||||
import io.airbyte.protocol.models.v0.DestinationSyncMode;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class SnowflakeAzureBlobStreamCopierTest {
|
||||
|
||||
private final AzureBlobStorageConfig mockedAzureBlobConfig = new AzureBlobStorageConfig(
|
||||
"fake-endpoint",
|
||||
"fake-account",
|
||||
"fake-container-name",
|
||||
"fake-sas-token");
|
||||
|
||||
private JdbcDatabase db;
|
||||
private SnowflakeAzureBlobStorageStreamCopier copier;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
SpecializedBlobClientBuilder specializedBlobClientBuilder = mock(SpecializedBlobClientBuilder.class, RETURNS_DEEP_STUBS);
|
||||
|
||||
db = mock(JdbcDatabase.class);
|
||||
SqlOperations sqlOperations = mock(SqlOperations.class);
|
||||
|
||||
copier = new SnowflakeAzureBlobStorageStreamCopier(
|
||||
"fake-staging-folder",
|
||||
DestinationSyncMode.OVERWRITE,
|
||||
"fake-schema",
|
||||
"fake-stream",
|
||||
specializedBlobClientBuilder,
|
||||
db,
|
||||
mockedAzureBlobConfig,
|
||||
new StandardNameTransformer(),
|
||||
sqlOperations,
|
||||
new StagingFilenameGenerator("fake-stream", 256L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void copiesCorrectFilesToTable() throws Exception {
|
||||
for (int i = 0; i < MAX_PARTS_PER_FILE + 1; i++) {
|
||||
copier.prepareStagingFile();
|
||||
}
|
||||
copier.copyStagingFileToTemporaryTable();
|
||||
List<List<String>> partition = Lists.partition(new ArrayList<>(copier.getAzureStagingFiles()), 1000);
|
||||
for (List<String> files : partition) {
|
||||
verify(db).execute(String.format(
|
||||
"COPY INTO fake-schema.%s FROM '%s'"
|
||||
+ " credentials=(azure_sas_token='%s')"
|
||||
+ " file_format = (type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"')"
|
||||
+ " files = (" + copier.generateFilesList(files) + " );",
|
||||
copier.getTmpTableName(),
|
||||
copier.generateBucketPath(),
|
||||
mockedAzureBlobConfig.getSasToken()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
{
|
||||
"host": "testhost.snowflakecomputing.com",
|
||||
"role": "AIRBYTE_ROLE",
|
||||
"warehouse": "AIRBYTE_WAREHOUSE",
|
||||
"database": "AIRBYTE_DATABASE",
|
||||
"schema": "AIRBYTE_SCHEMA",
|
||||
"username": "AIRBYTE_USER",
|
||||
"credentials": {
|
||||
"password": "test"
|
||||
},
|
||||
"loading_method": {
|
||||
"method": "Azure Blob Staging",
|
||||
"azure_blob_storage_account_name": "account",
|
||||
"azure_blob_storage_endpoint_domain_name": "blob.core.windows.net",
|
||||
"azure_blob_storage_container_name": "snowflake-container",
|
||||
"azure_blob_storage_sas_token": "token"
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,13 @@
|
||||
# Snowflake
|
||||
|
||||
Setting up the Snowflake destination connector involves setting up Snowflake entities (warehouse, database, schema, user, and role) in the Snowflake console, setting up the data loading method (internal stage, AWS S3, Google Cloud Storage bucket, or Azure Blob Storage), and configuring the Snowflake destination connector using the Airbyte UI.
|
||||
Setting up the Snowflake destination connector involves setting up Snowflake entities (warehouse, database, schema, user, and role) in the Snowflake console, setting up the data loading method (internal stage, AWS S3, or Google Cloud Storage bucket), and configuring the Snowflake destination connector using the Airbyte UI.
|
||||
|
||||
This page describes the step-by-step process of setting up the Snowflake destination connector.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- A Snowflake account with the [ACCOUNTADMIN](https://docs.snowflake.com/en/user-guide/security-access-control-considerations.html) role. If you don’t have an account with the `ACCOUNTADMIN` role, contact your Snowflake administrator to set one up for you.
|
||||
- (Optional) An AWS, Google Cloud Storage, or Azure account.
|
||||
- (Optional) An AWS, or Google Cloud Storage.
|
||||
|
||||
### Network policies
|
||||
|
||||
@@ -114,7 +114,7 @@ You can use the following script in a new [Snowflake worksheet](https://docs.sno
|
||||
|
||||
### Step 2: Set up a data loading method
|
||||
|
||||
By default, Airbyte uses Snowflake’s [Internal Stage](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html) to load data. You can also load data using an [Amazon S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html), a [Google Cloud Storage bucket](https://cloud.google.com/storage/docs/introduction), or [Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/blobs/).
|
||||
By default, Airbyte uses Snowflake’s [Internal Stage](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html) to load data. You can also load data using an [Amazon S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html), or [Google Cloud Storage bucket](https://cloud.google.com/storage/docs/introduction).
|
||||
|
||||
Make sure the database and schema have the `USAGE` privilege.
|
||||
|
||||
@@ -150,10 +150,6 @@ To use a Google Cloud Storage bucket:
|
||||
|
||||
4. Navigate to the Snowflake UI and run the script as a [Snowflake account admin](https://docs.snowflake.com/en/user-guide/security-access-control-considerations.html) using the [Worksheet page](https://docs.snowflake.com/en/user-guide/ui-worksheet.html) or [Snowsight](https://docs.snowflake.com/en/user-guide/ui-snowsight-gs.html).
|
||||
|
||||
#### Using Azure Blob Storage
|
||||
|
||||
To use Azure Blob Storage, [create a storage account](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal) and [container](https://docs.microsoft.com/en-us/rest/api/storageservices/create-container), and provide a [SAS Token](https://docs.snowflake.com/en/user-guide/data-load-azure-config.html#option-2-generating-a-sas-token) to access the container. We recommend creating a dedicated container for Airbyte to stage data to Snowflake. Airbyte needs read/write access to interact with this container.
|
||||
|
||||
|
||||
### Step 3: Set up Snowflake as a destination in Airbyte
|
||||
|
||||
@@ -231,16 +227,6 @@ To use a Google Cloud Storage bucket, enter the information for the bucket you c
|
||||
| GCP Bucket Name | The name of the staging bucket. Airbyte will write files to this bucket and read them via statements on Snowflake. (Example: `airbyte-staging`) |
|
||||
| Google Application Credentials | The contents of the JSON key file that has read/write permissions to the staging GCS bucket. You will separately need to grant bucket access to your Snowflake GCP service account. See the [Google Cloud docs](https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys) for more information on how to generate a JSON key for your service account. |
|
||||
|
||||
To use Azure Blob storage, enter the information for the storage you created in Step 2:
|
||||
|
||||
| Field | Description |
|
||||
|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Endpoint Domain Name | Leave default value `blob.core.windows.net` or [map a custom domain](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-custom-domain-name?tabs=azure-portal) to an Azure Blob Storage endpoint. |
|
||||
| Azure Blob Storage Account Name | The Azure storage account you created in Step 2. |
|
||||
| Azure blob storage container (Bucket) Name | The Azure blob storage container you created in Step 2. |
|
||||
| SAS Token | The SAS Token you provided in Step 2. |
|
||||
|
||||
|
||||
## Output schema
|
||||
|
||||
Airbyte outputs each stream into its own table with the following columns in Snowflake:
|
||||
@@ -287,6 +273,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 1.0.0 | 2023-05-02 | [\#25739](https://github.com/airbytehq/airbyte/pull/25739) | Removed Azure Blob Storage as a loading method |
|
||||
| 0.4.63 | 2023-04-27 | [\#25346](https://github.com/airbytehq/airbyte/pull/25346) | Added FlushBufferFunction interface |
|
||||
| 0.4.61 | 2023-03-30 | [\#24736](https://github.com/airbytehq/airbyte/pull/24736) | Improve behavior when throttled by AWS API |
|
||||
| 0.4.60 | 2023-03-30 | [\#24698](https://github.com/airbytehq/airbyte/pull/24698) | Add option in spec to allow increasing the stream buffer size to 50 |
|
||||
|
||||
@@ -50,6 +50,7 @@ setup(
|
||||
"pyyaml~=6.0",
|
||||
"analytics-python~=1.4.0",
|
||||
"python-slugify~=6.1.2",
|
||||
"urllib3<2"
|
||||
],
|
||||
python_requires=">=3.9.11",
|
||||
extras_require={
|
||||
|
||||
Reference in New Issue
Block a user