1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🐛 Destinations snowflake + bigquery: only parse catalog in 1s1t mode (#28976)

* only parse catalog in 1s1t mode

* one more thing?

* logistics
This commit is contained in:
Edward Gao
2023-08-02 15:19:52 -07:00
committed by GitHub
parent 991c90735a
commit 3af7f3b6fb
11 changed files with 30 additions and 11 deletions

View File

@@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery
COPY --from=build /airbyte /airbyte
LABEL io.airbyte.version=1.7.1
LABEL io.airbyte.version=1.7.2
LABEL io.airbyte.name=airbyte/destination-bigquery
ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 1.7.1
dockerImageTag: 1.7.2
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg

View File

@@ -235,16 +235,18 @@ public class BigQueryDestination extends BaseConnector implements Destination {
} else {
catalogParser = new CatalogParser(sqlGenerator);
}
ParsedCatalog parsedCatalog = catalogParser.parseCatalog(catalog);
final ParsedCatalog parsedCatalog;
final BigQuery bigquery = getBigQuery(config);
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = catalogParser.parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(
sqlGenerator,
new BigQueryDestinationHandler(bigquery, datasetLocation),
parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}
@@ -268,13 +270,15 @@ public class BigQueryDestination extends BaseConnector implements Destination {
final Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap = new HashMap<>();
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
final AirbyteStream stream = configStream.getStream();
StreamConfig parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
final StreamConfig parsedStream;
final String streamName = stream.getName();
String targetTableName;
if (use1s1t) {
parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
targetTableName = parsedStream.id().rawName();
} else {
parsedStream = null;
targetTableName = getTargetTableName(streamName);
}

View File

@@ -107,7 +107,12 @@ public class BigQueryStagingConsumerFactory {
Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode");
final AirbyteStream stream = configuredStream.getStream();
StreamConfig streamConfig = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
final StreamConfig streamConfig;
if (TypingAndDedupingFlag.isDestinationV2()) {
streamConfig = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
} else {
streamConfig = null;
}
final String streamName = stream.getName();
final BigQueryRecordFormatter recordFormatter = recordFormatterCreator.apply(stream.getJsonSchema());

View File

@@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
ENV ENABLE_SENTRY true
LABEL io.airbyte.version=1.2.6
LABEL io.airbyte.version=1.2.7
LABEL io.airbyte.name=airbyte/destination-snowflake
ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 1.2.6
dockerImageTag: 1.2.7
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg

View File

@@ -150,11 +150,13 @@ public class SnowflakeGcsStagingDestination extends AbstractJdbcDestination impl
final GcsConfig gcsConfig = GcsConfig.getGcsConfig(config);
SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
final ParsedCatalog parsedCatalog;
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}

View File

@@ -124,11 +124,13 @@ public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
final ParsedCatalog parsedCatalog;
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}
@@ -151,11 +153,13 @@ public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
final ParsedCatalog parsedCatalog;
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}

View File

@@ -140,11 +140,13 @@ public class SnowflakeS3StagingDestination extends AbstractJdbcDestination imple
final EncryptionConfig encryptionConfig = EncryptionConfig.fromJson(config.get("loading_method").get("encryption"));
SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
final ParsedCatalog parsedCatalog;
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}

View File

@@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.7.2 | 2023-08-02 | [\#28976](https://github.com/airbytehq/airbyte/pull/28976) | Fix composite PK handling in v1 mode |
| 1.7.1 | 2023-08-02 | [\#28959](https://github.com/airbytehq/airbyte/pull/28959) | Destinations v2: Fix CDC syncs in non-dedup mode |
| 1.7.0 | 2023-08-01 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Open up early access program opt-in |
| 1.6.0 | 2023-07-26 | [\#28723](https://github.com/airbytehq/airbyte/pull/28723) | Destinations v2: Change raw table dataset and naming convention |

View File

@@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n
| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.2.7 | 2023-08-02 | [\#28976](https://github.com/airbytehq/airbyte/pull/28976) | Fix composite PK handling in v1 mode |
| 1.2.6 | 2023-08-01 | [\#28618](https://github.com/airbytehq/airbyte/pull/28618) | Reduce logging noise |
| 1.2.5 | 2023-07-24 | [\#28618](https://github.com/airbytehq/airbyte/pull/28618) | Add hooks in preparation for destinations v2 implementation |
| 1.2.4 | 2023-07-21 | [\#28584](https://github.com/airbytehq/airbyte/pull/28584) | Install dependencies in preparation for destinations v2 work |