1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🐛Destination-Snowflake: updated check method to handle more possible s3 and gcs stagings issues (#21450)

* [18312] Destination-Snowflake: updated check method to handle more possible s3 and gcs stagings issues
This commit is contained in:
Eugene
2023-01-21 00:37:07 +02:00
committed by GitHub
parent 5257f69e04
commit 2d65d62a65
12 changed files with 115 additions and 19 deletions

View File

@@ -348,7 +348,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.42
dockerImageTag: 0.4.43
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationConfig:

View File

@@ -6109,7 +6109,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.42"
- dockerImage: "airbyte/destination-snowflake:0.4.43"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:

View File

@@ -31,9 +31,9 @@ public class CsvSerializedBuffer extends BaseSerializedBuffer {
private CSVPrinter csvPrinter;
private CSVFormat csvFormat;
protected CsvSerializedBuffer(final BufferStorage bufferStorage,
final CsvSheetGenerator csvSheetGenerator,
final boolean compression)
public CsvSerializedBuffer(final BufferStorage bufferStorage,
final CsvSheetGenerator csvSheetGenerator,
final boolean compression)
throws Exception {
super(bufferStorage);
this.csvSheetGenerator = csvSheetGenerator;

View File

@@ -97,7 +97,7 @@ public abstract class CopyDestination extends BaseConnector implements Destinati
final JdbcDatabase database,
final NamingConventionTransformer nameTransformer)
throws Exception {
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());
AbstractJdbcDestination.attemptTableOperations(outputSchema, database, nameTransformer, getSqlOperations(), true);
}
}

View File

@@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
ENV ENABLE_SENTRY true
LABEL io.airbyte.version=0.4.42
LABEL io.airbyte.version=0.4.43
LABEL io.airbyte.name=airbyte/destination-snowflake

View File

@@ -5,9 +5,11 @@
package io.airbyte.integrations.destination.snowflake;
import static io.airbyte.integrations.destination.snowflake.SnowflakeS3StagingDestination.isPurgeStagingData;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
@@ -29,6 +31,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
@@ -83,12 +86,19 @@ public class SnowflakeGcsStagingDestination extends AbstractJdbcDestination impl
}
private static void attemptWriteAndDeleteGcsObject(final GcsConfig gcsConfig, final String outputTableName) throws IOException {
final var storage = getStorageClient(gcsConfig);
final var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName);
final var blobInfo = BlobInfo.newBuilder(blobId).build();
final Storage storageClient = getStorageClient(gcsConfig);
final BlobId blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName);
final BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
storage.create(blobInfo, "".getBytes(StandardCharsets.UTF_8));
storage.delete(blobId);
storageClient.create(blobInfo);
try (WriteChannel writer = storageClient.writer(blobInfo)) {
// Try to write a dummy message to make sure user has all required permissions
final byte[] content = "Hello, World!".getBytes(UTF_8);
writer.write(ByteBuffer.wrap(content, 0, content.length));
} finally {
storageClient.delete(blobId);
}
}
public static Storage getStorageClient(final GcsConfig gcsConfig) throws IOException {

View File

@@ -134,7 +134,8 @@ public class SnowflakeInternalStagingSqlOperations extends SnowflakeSqlOperation
}
/**
* Creates a SQL query to create a staging folder. This query will create a staging folder if one previously did not exist
* Creates a SQL query to create a staging folder. This query will create a staging folder if one
* previously did not exist
*
* @param stageName name of the staging folder
* @return SQL query string
@@ -157,8 +158,8 @@ public class SnowflakeInternalStagingSqlOperations extends SnowflakeSqlOperation
}
/**
* Creates a SQL query to bulk copy data into fully qualified destination table
* See https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html for more context
* Creates a SQL query to bulk copy data into fully qualified destination table See
* https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html for more context
*
* @param stageName name of staging folder
* @param stagingPath path of staging folder to data files
@@ -200,8 +201,8 @@ public class SnowflakeInternalStagingSqlOperations extends SnowflakeSqlOperation
}
/**
* Creates a SQL query used to remove staging files that were just staged
* See https://docs.snowflake.com/en/sql-reference/sql/remove.html for more context
* Creates a SQL query used to remove staging files that were just staged See
* https://docs.snowflake.com/en/sql-reference/sql/remove.html for more context
*
* @param stageName name of staging folder
* @return SQL query string

View File

@@ -13,15 +13,18 @@ import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType;
import io.airbyte.integrations.destination.s3.EncryptionConfig;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.Collections;
import java.util.Map;
@@ -90,7 +93,26 @@ public class SnowflakeS3StagingDestination extends AbstractJdbcDestination imple
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID());
final String stageName = sqlOperations.getStageName(outputSchema, outputTableName);
sqlOperations.createStageIfNotExists(database, stageName);
sqlOperations.dropStageIfExists(database, stageName);
// try to make test write to make sure we have required role
try {
final CsvSerializedBuffer csvSerializedBuffer = new CsvSerializedBuffer(
new InMemoryBuffer(".csv"),
new StagingDatabaseCsvSheetGenerator(),
true);
// create a dummy stream\records that will bed used to test uploading
csvSerializedBuffer.accept(new AirbyteRecordMessage()
.withData(Jsons.jsonNode(Map.of("testKey", "testValue")))
.withEmittedAt(System.currentTimeMillis()));
csvSerializedBuffer.flush();
sqlOperations.uploadRecordsToStage(database, csvSerializedBuffer, outputSchema, stageName,
stageName.endsWith("/") ? stageName : stageName + "/");
} finally {
// drop created tmp stage
sqlOperations.dropStageIfExists(database, stageName);
}
}
@Override

View File

@@ -4,14 +4,23 @@
package io.airbyte.integrations.destination.snowflake;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
public class SnowflakeGcsCopyDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {
private static final String NO_GCS_PRIVILEGES_ERR_MSG =
"Permission 'storage.objects.create' denied on resource (or it may not exist).";
@Override
public JsonNode getStaticConfig() {
final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_gcs_config.json")));
@@ -20,4 +29,16 @@ public class SnowflakeGcsCopyDestinationAcceptanceTest extends SnowflakeInsertDe
return copyConfig;
}
@Test
public void testCheckWithNoProperGcsPermissionConnection() throws Exception {
// Config to user (creds) that has no permission to schema
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/copy_insufficient_gcs_roles_config.json")));
StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);
assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_GCS_PRIVILEGES_ERR_MSG);
}
}

View File

@@ -4,14 +4,22 @@
package io.airbyte.integrations.destination.snowflake;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
public class SnowflakeS3CopyEncryptedDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {
private static final String NO_S3_PRIVILEGES_ERR_MSG = "Could not connect with provided configuration.";
@Override
public JsonNode getStaticConfig() {
final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_s3_encrypted_config.json")));
@@ -20,4 +28,16 @@ public class SnowflakeS3CopyEncryptedDestinationAcceptanceTest extends Snowflake
return copyConfig;
}
@Test
public void testCheckWithNoProperS3PermissionConnection() throws Exception {
// Config to user (creds) that has no permission to schema
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/copy_s3_wrong_location_config.json")));
StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);
assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_S3_PRIVILEGES_ERR_MSG);
}
}

View File

@@ -4,6 +4,9 @@
package io.airbyte.integrations.destination.snowflake;
import static io.airbyte.db.jdbc.DateTimeConverter.putJavaSQLDate;
import static io.airbyte.db.jdbc.DateTimeConverter.putJavaSQLTime;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTestUtils;
@@ -17,4 +20,22 @@ public class SnowflakeTestSourceOperations extends JdbcSourceOperations {
DestinationAcceptanceTestUtils.putStringIntoJson(resultSet.getString(index), columnName, node);
}
@Override
protected void putDate(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index)
throws SQLException {
putJavaSQLDate(node, columnName, resultSet, index);
}
@Override
protected void putTime(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index)
throws SQLException {
putJavaSQLTime(node, columnName, resultSet, index);
}
}

View File

@@ -277,7 +277,8 @@ Now that you have set up the Snowflake destination connector, check out the foll
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.41 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams |
| 0.4.43 | 2023-01-20 | [\#21450](https://github.com/airbytehq/airbyte/pull/21450) | Updated Check methods to handle more possible s3 and gcs stagings issues |
| 0.4.42 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams |
| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards |
| 0.4.40 | 2022-11-11 | [\#19302](https://github.com/airbytehq/airbyte/pull/19302) | Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud |
| 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors |