diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java index b211d0add96..4d227883139 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java @@ -17,6 +17,7 @@ import static org.jooq.impl.DSL.selectOne; import static org.jooq.impl.DSL.table; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition; import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; @@ -217,7 +218,14 @@ public abstract class JdbcDestinationHandler implements Destin field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)), field(quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE))).from(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)) .getSQL()) - .stream().collect(toMap( + .stream() + .peek(recordJson -> { + // Forcibly downcase all key names. + // This is to handle any destinations that upcase the column names. + // For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE. + ObjectNode record = (ObjectNode) recordJson; + record.fieldNames().forEachRemaining(fieldName -> record.set(fieldName.toLowerCase(), record.get(fieldName))); + }).collect(toMap( record -> { final JsonNode nameNode = record.get(DESTINATION_STATE_TABLE_COLUMN_NAME); final JsonNode namespaceNode = record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE); diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 09ded383dc3..3a6d0f3156a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -5,7 +5,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.23.18' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } java { diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 86f9941e848..a39797486b0 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.6.3 + dockerImageTag: 3.6.4 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index 29eb9175e98..b8ffc8be2a0 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -20,8 +20,10 @@ import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser; import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve; import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration; @@ -45,7 +47,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination { +public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class); public static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema"; @@ -139,6 +141,14 @@ public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface"); } + @Override + protected List> getMigrations(JdbcDatabase database, + String databaseName, + SqlGenerator sqlGenerator, + DestinationHandler destinationHandler) { + return List.of(); + } + @Override public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, @@ -169,7 +179,7 @@ public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName); final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler); final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); - final List> migrations = List.of(); + final List> migrations = getMigrations(database, databaseName, sqlGenerator, snowflakeDestinationHandler); if (disableTypeDedupe) { typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java index 6e8f888ef39..cf996fbb0f5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java @@ -30,6 +30,10 @@ public class SnowflakeInternalStagingSqlOperations extends SnowflakeSqlStagingOp private static final String PUT_FILE_QUERY = "PUT file://%s @%s/%s PARALLEL = %d;"; private static final String LIST_STAGE_QUERY = "LIST @%s/%s/%s;"; // the 1s1t copy query explicitly quotes the raw table+schema name. + // we set error_on_column_count_mismatch because (at time of writing), we haven't yet added + // the airbyte_meta column to the raw table. + // See also https://github.com/airbytehq/airbyte/issues/36410 for improved error handling. + // TODO remove error_on_column_count_mismatch once snowflake has airbyte_meta in raw data. private static final String COPY_QUERY_1S1T = """ COPY INTO "%s"."%s" FROM '@%s/%s' @@ -40,6 +44,7 @@ public class SnowflakeInternalStagingSqlOperations extends SnowflakeSqlStagingOp skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '"' NULL_IF=('') + error_on_column_count_mismatch=false )"""; private static final String DROP_STAGE_QUERY = "DROP STAGE IF EXISTS %s;"; private static final String REMOVE_QUERY = "REMOVE @%s;"; diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index be9ff16282f..b45a7863876 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -7,10 +7,10 @@ package io.airbyte.integrations.destination.snowflake; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils; -import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.commons.exceptions.ConfigErrorException; import java.sql.SQLException; import java.util.List; diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_v1v2_expectedrecords_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_v1v2_expectedrecords_final.jsonl new file mode 100644 index 00000000000..6a9cb026453 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_v1v2_expectedrecords_final.jsonl @@ -0,0 +1,9 @@ +// Snowflake doesn't yet support the new airbyte_meta format / copying airbyte_meta from raw table, so we still have the old `Problem with...` format. +{"ID1": 1, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "ARRAY": ["foo"], "STRUCT": {"foo": "bar"}, "STRING": "foo", "NUMBER": 42.1, "INTEGER": 42, "BOOLEAN": true, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000Z", "TIMESTAMP_WITHOUT_TIMEZONE": "2023-01-23T12:34:56.000000000", "TIME_WITH_TIMEZONE": "12:34:56Z", "TIME_WITHOUT_TIMEZONE": "12:34:56.000000000", "DATE": "2023-01-23", "UNKNOWN": {}, "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": []}} +{"ID1": 2, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "UNKNOWN": null, "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": []}} +{"ID1": 3, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": []}} +{"ID1": 4, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "UNKNOWN": null, "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors":["Problem with `struct`","Problem with `array`","Problem with `number`","Problem with `integer`","Problem with `boolean`","Problem with `timestamp_with_timezone`","Problem with `timestamp_without_timezone`","Problem with `time_with_timezone`","Problem with `time_without_timezone`","Problem with `date`"]}} +// Note: no loss of precision on the `number` column anywhere. +{"ID1": 5, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "NUMBER": 67.174118, "STRUCT": {"nested_number": 67.174118}, "ARRAY": [67.174118], "UNKNOWN": 67.174118, "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": []}} +// Note that we unconditionally upcase IAmACaseSensitiveColumnName +{"ID1": 6, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "IAMACASESENSITIVECOLUMNNAME": "Case senstive value", "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": []}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_v1v2_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_v1v2_expectedrecords_raw.jsonl new file mode 100644 index 00000000000..d2a23f103ed --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_v1v2_expectedrecords_raw.jsonl @@ -0,0 +1,6 @@ +{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}} +{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}} +{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}} +{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}} +{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}} +{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fce", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 6, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "IamACaseSensitiveColumnName": "Case senstive value"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java index 8b15f514090..220369768f1 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java @@ -10,7 +10,7 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.integrations.base.DestinationConfig; import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; -import io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer; +import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.integrations.destination.snowflake.SnowflakeDestination.DestinationType; @@ -90,7 +90,7 @@ public class SnowflakeDestinationTest { void testWriteSnowflakeInternal() throws Exception { final JsonNode config = Jsons.deserialize(MoreResources.readResource("internal_staging_config.json"), JsonNode.class); final SerializedAirbyteMessageConsumer consumer = new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS) - .getSerializedMessageConsumer(config, new ConfiguredAirbyteCatalog(), null); + .getSerializedMessageConsumer(config, new ConfiguredAirbyteCatalog(), message -> {}); assertEquals(AsyncStreamConsumer.class, consumer.getClass()); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperationsTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperationsTest.java index 1d8f16da008..f158e79148f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperationsTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperationsTest.java @@ -63,6 +63,7 @@ class SnowflakeInternalStagingSqlOperationsTest { skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '"' NULL_IF=('') + error_on_column_count_mismatch=false ) files = ('filename1','filename2');"""; final String actualCopyQuery = snowflakeStagingSqlOperations.getCopyQuery(STAGE_NAME, STAGE_PATH, List.of("filename1", "filename2"), "tableName", SCHEMA_NAME); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java index 66fa8866f23..f442034ae49 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java @@ -14,7 +14,7 @@ import static org.mockito.Mockito.verify; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.DestinationConfig; import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; +import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import java.sql.SQLException; diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index a9b0fed3bb3..5098c86106e 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -276,6 +276,7 @@ desired namespace. | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.6.4 | 2024-03-25 | [\#36396](https://github.com/airbytehq/airbyte/pull/36396) | Handle instances with `QUOTED_IDENTIFIERS_IGNORE_CASE` enabled globally | | 3.6.3 | 2024-03-25 | [\#36452](https://github.com/airbytehq/airbyte/pull/36452) | Remove Query timeout | | 3.6.2 | 2024-03-18 | [\#36240](https://github.com/airbytehq/airbyte/pull/36240) | Hide oAuth config option | | 3.6.1 | 2024-03-07 | [\#35899](https://github.com/airbytehq/airbyte/pull/35899) | Adopt CDK 0.23.18; Null safety check in state parsing |