1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Destination BigQuery: Adapt to newer interface for Sync operations (#38132)

This commit is contained in:
Gireesh Sreepathi
2024-05-22 15:34:51 -07:00
committed by GitHub
parent 90cb262d7f
commit 5b7873a0de
32 changed files with 990 additions and 1573 deletions

View File

@@ -3,7 +3,7 @@ plugins {
}
airbyteJavaConnector {
cdkVersionRequired = '0.35.0'
cdkVersionRequired = '0.35.7'
features = [
'db-destinations',
'datastore-bigquery',
@@ -22,7 +22,7 @@ java {
}
application {
mainClass = 'io.airbyte.integrations.destination.bigquery.BigQueryDestination'
mainClass = 'io.airbyte.integrations.destination.bigquery.BigQueryDestinationKt'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0',
'-XX:NativeMemoryTracking=detail', '-XX:+UnlockDiagnosticVMOptions',
'-XX:GCLockerRetryAllocationCount=100',
@@ -37,6 +37,5 @@ application {
}
dependencies {
implementation 'com.codepoetics:protonpack:1.13'
implementation 'org.apache.commons:commons-text:1.10.0'
}

View File

@@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.4.20
dockerImageTag: 2.5.0
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery

View File

@@ -1,105 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery;
import com.google.cloud.bigquery.TableId;
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns;
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Map;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
/**
* Async flushing logic. Flushing async prevents backpressure and is the superior flushing strategy.
*/
@Slf4j
class BigQueryAsyncFlush implements DestinationFlushFunction {
private final Map<StreamDescriptor, StreamConfig> streamConfigMap;
private final BigQueryGcsOperations stagingOperations;
private final ConfiguredAirbyteCatalog catalog;
public BigQueryAsyncFlush(
final Map<StreamDescriptor, StreamConfig> streamConfigMap,
final BigQueryGcsOperations stagingOperations,
final ConfiguredAirbyteCatalog catalog) {
this.streamConfigMap = streamConfigMap;
this.stagingOperations = stagingOperations;
this.catalog = catalog;
}
@Override
public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessage> stream) throws Exception {
final SerializableBuffer writer;
try {
writer = new CsvSerializedBuffer(
new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
new StagingDatabaseCsvSheetGenerator(DestinationColumns.V2_WITHOUT_META),
true);
stream.forEach(record -> {
try {
writer.accept(record.getSerialized(), Jsons.serialize(record.getRecord().getMeta()), record.getRecord().getEmittedAt());
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
} catch (final Exception e) {
throw new RuntimeException(e);
}
writer.flush();
log.info("Flushing CSV buffer for stream {} ({}) to staging", decs.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount()));
if (!streamConfigMap.containsKey(decs)) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog)));
}
final StreamId streamId = streamConfigMap.get(decs).getId();
try {
final String stagedFileName = stagingOperations.uploadRecordsToStage(streamId.getRawNamespace(), streamId.getOriginalName(), writer);
stagingOperations.copyIntoTableFromStage(
streamId.getRawNamespace(),
streamId.getOriginalName(),
TableId.of(streamId.getRawNamespace(), streamId.getRawName()),
BigQueryRecordFormatter.SCHEMA_V2,
stagedFileName);
} catch (final Exception e) {
log.error("Failed to flush and commit buffer data into destination's raw table", e);
throw new RuntimeException("Failed to upload buffer to stage and commit to destination", e);
}
writer.close();
}
@Override
public long getOptimalBatchSizeBytes() {
// Chosen arbitrarily (mostly to match legacy behavior). We have no reason to believe a larger
// number would be worse.
// This was previously set to 25MB, which ran into rate-limiting issues:
// https://cloud.google.com/bigquery/quotas#standard_tables
// > Your project can make up to 1,500 table modifications per table per day
return 200 * 1024 * 1024;
}
@Override
public long getQueueFlushThresholdBytes() {
return 200 * 1024 * 1024;
}
}

View File

@@ -1,59 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery;
import com.google.common.util.concurrent.RateLimiter;
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class BigQueryAsyncStandardFlush implements DestinationFlushFunction {
// TODO remove this once the async framework supports rate-limiting/backpressuring
private static final RateLimiter rateLimiter = RateLimiter.create(0.07);
private final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap;
public BigQueryAsyncStandardFlush(final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
this.uploaderMap = uploaderMap;
}
@Override
public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessage> stream) throws Exception {
rateLimiter.acquire();
final ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMapSupplied = uploaderMap.get();
final AtomicInteger recordCount = new AtomicInteger();
stream.forEach(aibyteMessage -> {
try {
final AirbyteStreamNameNamespacePair sd = new AirbyteStreamNameNamespacePair(aibyteMessage.getRecord().getStream(),
aibyteMessage.getRecord().getNamespace());
uploaderMapSupplied.get(sd).upload(aibyteMessage);
recordCount.getAndIncrement();
} catch (final Exception e) {
log.error("An error happened while trying to flush a record to big query", e);
throw e;
}
});
uploaderMapSupplied.values().forEach(test -> test.closeAfterPush());
}
@Override
public long getOptimalBatchSizeBytes() {
// todo(ryankfu): this should be per-destination specific. currently this is for Snowflake.
// The size chosen is currently for improving the performance of low memory connectors. With 1 Gi of
// resource the connector will usually at most fill up around 150 MB in a single queue. By lowering
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
// freed earlier similar to a sliding window effect
return Double.valueOf(Runtime.getRuntime().maxMemory() * 0.2).longValue();
}
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery;
public class BigQueryConsts {
public static final int MiB = 1024 * 1024;
public static final String CONFIG_DATASET_ID = "dataset_id";
public static final String CONFIG_PROJECT_ID = "project_id";
public static final String CONFIG_DATASET_LOCATION = "dataset_location";
public static final String CONFIG_CREDS = "credentials_json";
public static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb";
public static final String LOADING_METHOD = "loading_method";
public static final String METHOD = "method";
public static final String GCS_STAGING = "GCS Staging";
public static final String GCS_BUCKET_NAME = "gcs_bucket_name";
public static final String GCS_BUCKET_PATH = "gcs_bucket_path";
public static final String GCS_BUCKET_REGION = "gcs_bucket_region";
public static final String CREDENTIAL = "credential";
public static final String FORMAT = "format";
public static final String KEEP_GCS_FILES = "keep_files_in_gcs-bucket";
public static final String KEEP_GCS_FILES_VAL = "Keep all tmp files in GCS";
public static final String DISABLE_TYPE_DEDUPE = "disable_type_dedupe";
public static final String NAMESPACE_PREFIX = "n";
// tests
public static final String BIGQUERY_BASIC_CONFIG = "basic_bigquery_config";
public static final String GCS_CONFIG = "gcs_config";
public static final String CREDENTIAL_TYPE = "credential_type";
public static final String HMAC_KEY_ACCESS_ID = "hmac_key_access_id";
public static final String HMAC_KEY_ACCESS_SECRET = "hmac_key_secret";
}

View File

@@ -1,473 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery;
import com.codepoetics.protonpack.StreamUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.Charsets;
import io.airbyte.cdk.integrations.BaseConnector;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.cdk.integrations.destination.gcs.BaseGcsDestination;
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer;
import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.jetbrains.annotations.NotNull;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BigQueryDestination extends BaseConnector implements Destination {
private static final String RAW_DATA_DATASET = "raw_data_dataset";
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestination.class);
private static final List<String> REQUIRED_PERMISSIONS = List.of(
"storage.multipartUploads.abort",
"storage.multipartUploads.create",
"storage.objects.create",
"storage.objects.delete",
"storage.objects.get",
"storage.objects.list");
protected final BigQuerySQLNameTransformer namingResolver;
public BigQueryDestination() {
namingResolver = new BigQuerySQLNameTransformer();
}
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final String datasetId = BigQueryUtils.getDatasetId(config);
final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuery bigquery = getBigQuery(config);
final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config);
BigQueryUtils.checkHasCreateAndDeleteDatasetRole(bigquery, datasetId, datasetLocation);
final Dataset dataset = BigQueryUtils.getOrCreateDataset(bigquery, datasetId, datasetLocation);
if (!dataset.getLocation().equals(datasetLocation)) {
throw new ConfigErrorException("Actual dataset location doesn't match to location from config");
}
final QueryJobConfiguration queryConfig = QueryJobConfiguration
.newBuilder(String.format("SELECT * FROM `%s.INFORMATION_SCHEMA.TABLES` LIMIT 1;", datasetId))
.setUseLegacySql(false)
.build();
if (UploadingMethod.GCS.equals(uploadingMethod)) {
final AirbyteConnectionStatus status = checkGcsPermission(config);
if (!status.getStatus().equals(Status.SUCCEEDED)) {
return status;
}
}
final ImmutablePair<Job, String> result = BigQueryUtils.executeQuery(bigquery, queryConfig);
if (result.getLeft() != null) {
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} else {
throw new ConfigErrorException(result.getRight());
}
} catch (final Exception e) {
LOGGER.error("Check failed.", e);
throw new ConfigErrorException(e.getMessage() != null ? e.getMessage() : e.toString());
}
}
/**
* This method does two checks: 1) permissions related to the bucket, and 2) the ability to create
* and delete an actual file. The latter is important because even if the service account may have
* the proper permissions, the HMAC keys can only be verified by running the actual GCS check.
*/
private AirbyteConnectionStatus checkGcsPermission(final JsonNode config) {
final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD);
final String bucketName = loadingMethod.get(BigQueryConsts.GCS_BUCKET_NAME).asText();
final List<String> missingPermissions = new ArrayList<>();
try {
final GoogleCredentials credentials = getServiceAccountCredentials(config);
final Storage storage = StorageOptions.newBuilder()
.setProjectId(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText())
.setCredentials(credentials)
.setHeaderProvider(BigQueryUtils.getHeaderProvider())
.build().getService();
final List<Boolean> permissionsCheckStatusList = storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS);
missingPermissions.addAll(StreamUtils
.zipWithIndex(permissionsCheckStatusList.stream())
.filter(i -> !i.getValue())
.map(i -> REQUIRED_PERMISSIONS.get(Math.toIntExact(i.getIndex())))
.toList());
final BaseGcsDestination gcsDestination = new BaseGcsDestination() {};
final JsonNode gcsJsonNodeConfig = BigQueryUtils.getGcsJsonNodeConfig(config);
return gcsDestination.check(gcsJsonNodeConfig);
} catch (final Exception e) {
final StringBuilder message = new StringBuilder("Cannot access the GCS bucket.");
if (!missingPermissions.isEmpty()) {
message.append(" The following permissions are missing on the service account: ")
.append(String.join(", ", missingPermissions))
.append(".");
}
message.append(" Please make sure the service account can access the bucket path, and the HMAC keys are correct.");
LOGGER.error(message.toString(), e);
throw new ConfigErrorException("Could not access the GCS bucket with the provided configuration.\n", e);
}
}
public static BigQuery getBigQuery(final JsonNode config) {
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
try {
final BigQueryOptions.Builder bigQueryBuilder = BigQueryOptions.newBuilder();
final GoogleCredentials credentials = getServiceAccountCredentials(config);
return bigQueryBuilder
.setProjectId(projectId)
.setCredentials(credentials)
.setHeaderProvider(BigQueryUtils.getHeaderProvider())
.build()
.getService();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
public static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException {
final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS);
// Follows this order of resolution:
// https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials#com_google_auth_oauth2_GoogleCredentials_getApplicationDefault
if (serviceAccountKey == null) {
LOGGER.info("No service account key json is provided. It is required if you are using Airbyte cloud.");
LOGGER.info("Using the default service account credential from environment.");
return GoogleCredentials.getApplicationDefault();
}
// The JSON credential can either be a raw JSON object, or a serialized JSON object.
final String credentialsString = serviceAccountKey.isObject()
? Jsons.serialize(serviceAccountKey)
: serviceAccountKey.asText();
return GoogleCredentials.fromStream(
new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8)));
}
/**
* Returns a {@link AirbyteMessageConsumer} based on whether the uploading mode is STANDARD INSERTS
* or using STAGING
*
* @param config - integration-specific configuration object as json. e.g. { "username": "airbyte",
* "password": "super secure" }
* @param catalog - schema of the incoming messages.
*/
@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
throw new UnsupportedOperationException("Should use getSerializedMessageConsumer");
}
@Override
@SuppressWarnings("deprecation")
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final @NotNull JsonNode config,
final @NotNull ConfiguredAirbyteCatalog catalog,
final @NotNull Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config);
final String defaultNamespace = BigQueryUtils.getDatasetId(config);
setDefaultStreamNamespace(catalog, defaultNamespace);
final boolean disableTypeDedupe = BigQueryUtils.getDisableTypeDedupFlag(config);
final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET);
final ParsedCatalog parsedCatalog = parseCatalog(sqlGenerator, defaultNamespace,
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE), catalog);
final BigQuery bigquery = getBigQuery(config);
final TyperDeduper typerDeduper =
buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe);
AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config);
final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS);
if (serviceAccountKey != null) {
// If the service account key is a non-null string, we will try to
// deserialize it. Otherwise, we will let the Google library find it in
// the environment during the client initialization.
if (serviceAccountKey.isTextual()) {
// There are cases where we fail to deserialize the service account key. In these cases, we
// shouldn't do anything.
// Google's creds library is more lenient with JSON-parsing than Jackson, and I'd rather just let it
// go.
Jsons.tryDeserialize(serviceAccountKey.asText())
.ifPresent(AirbyteExceptionHandler::addAllStringsInConfigForDeinterpolation);
} else {
AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(serviceAccountKey);
}
}
if (uploadingMethod == UploadingMethod.STANDARD) {
LOGGER.warn("The \"standard\" upload mode is not performant, and is not recommended for production. " +
"Please use the GCS upload mode if you are syncing a large amount of data.");
return getStandardRecordConsumer(bigquery, config, catalog, parsedCatalog, outputRecordCollector, typerDeduper);
}
final StandardNameTransformer gcsNameTransformer = new GcsNameTransformer();
final GcsDestinationConfig gcsConfig = BigQueryUtils.getGcsCsvDestinationConfig(config);
final UUID stagingId = UUID.randomUUID();
final DateTime syncDatetime = DateTime.now(DateTimeZone.UTC);
final boolean keepStagingFiles = BigQueryUtils.isKeepFilesInGcs(config);
final GcsStorageOperations gcsOperations = new GcsStorageOperations(gcsNameTransformer, gcsConfig.getS3Client(), gcsConfig);
final BigQueryGcsOperations bigQueryGcsOperations = new BigQueryGcsOperations(
bigquery,
gcsNameTransformer,
gcsConfig,
gcsOperations,
datasetLocation,
stagingId,
syncDatetime,
keepStagingFiles);
return new BigQueryStagingConsumerFactory().createAsync(
catalog,
outputRecordCollector,
bigQueryGcsOperations,
typerDeduper,
parsedCatalog,
BigQueryUtils.getDatasetId(config));
}
protected Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> getUploaderMap(
final BigQuery bigquery,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog)
throws IOException {
return () -> {
final ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMap = new ConcurrentHashMap<>();
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
final AirbyteStream stream = configStream.getStream();
final StreamConfig parsedStream;
final String targetTableName;
parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
targetTableName = parsedStream.getId().getRawName();
final UploaderConfig uploaderConfig = UploaderConfig
.builder()
.bigQuery(bigquery)
.configStream(configStream)
.parsedStream(parsedStream)
.config(config)
.formatterMap(getFormatterMap())
.targetTableName(targetTableName)
// This refers to whether this is BQ denormalized or not
.isDefaultAirbyteTmpSchema(isDefaultAirbyteTmpTableSchema())
.build();
try {
putStreamIntoUploaderMap(stream, uploaderConfig, uploaderMap);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
return uploaderMap;
};
}
protected void putStreamIntoUploaderMap(final AirbyteStream stream,
final UploaderConfig uploaderConfig,
final Map<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMap)
throws IOException {
uploaderMap.put(
AirbyteStreamNameNamespacePair.fromAirbyteStream(stream),
BigQueryUploaderFactory.getUploader(uploaderConfig));
}
/**
* BigQuery might have different structure of the Temporary table. If this method returns TRUE,
* temporary table will have only three common Airbyte attributes. In case of FALSE, temporary table
* structure will be in line with Airbyte message JsonSchema.
*
* @return use default AirbyteSchema or build using JsonSchema
*/
protected boolean isDefaultAirbyteTmpTableSchema() {
return true;
}
protected Map<UploaderType, BigQueryRecordFormatter> getFormatterMap() {
return Map.of(
UploaderType.STANDARD, new BigQueryRecordFormatter(namingResolver),
UploaderType.CSV, new BigQueryRecordFormatter(namingResolver));
}
private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuery bigquery,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final TyperDeduper typerDeduper)
throws Exception {
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> writeConfigs = getUploaderMap(
bigquery,
config,
catalog,
parsedCatalog);
final String bqNamespace = BigQueryUtils.getDatasetId(config);
return new BigQueryRecordStandardConsumer(
outputRecordCollector,
() -> {
typerDeduper.prepareSchemasAndRunMigrations();
// Set up our raw tables
writeConfigs.get().forEach((streamId, uploader) -> {
final StreamConfig stream = parsedCatalog.getStream(streamId);
if (stream.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) {
// For streams in overwrite mode, truncate the raw table.
// non-1s1t syncs actually overwrite the raw table at the end of the sync, so we only do this in
// 1s1t mode.
final TableId rawTableId = TableId.of(stream.getId().getRawNamespace(), stream.getId().getRawName());
LOGGER.info("Deleting Raw table {}", rawTableId);
if (!bigquery.delete(rawTableId)) {
LOGGER.info("Raw table {} not found, continuing with creation", rawTableId);
}
LOGGER.info("Creating table {}", rawTableId);
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, BigQueryRecordFormatter.SCHEMA_V2);
} else {
uploader.createRawTable();
}
});
typerDeduper.prepareFinalTables();
},
(hasFailed, streamSyncSummaries) -> {
try {
Thread.sleep(30 * 1000);
typerDeduper.typeAndDedupe(streamSyncSummaries);
typerDeduper.commitFinalTables();
typerDeduper.cleanup();
} catch (final Exception e) {
throw new RuntimeException(e);
}
},
catalog,
bqNamespace,
writeConfigs);
}
private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, final String namespace) {
// Set the default originalNamespace on streams with null originalNamespace. This means we don't
// need to repeat this
// logic in the rest of the connector.
// (record messages still need to handle null namespaces though, which currently happens in e.g.
// AsyncStreamConsumer#accept)
// This probably should be shared logic amongst destinations eventually.
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
if (StringUtils.isEmpty(stream.getStream().getNamespace())) {
stream.getStream().withNamespace(namespace);
}
}
}
private ParsedCatalog parseCatalog(final BigQuerySqlGenerator sqlGenerator,
final String defaultNamespace,
final String rawNamespaceOverride,
final ConfiguredAirbyteCatalog catalog) {
final CatalogParser catalogParser = new CatalogParser(sqlGenerator, rawNamespaceOverride);
return catalogParser.parseCatalog(catalog);
}
private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator,
final ParsedCatalog parsedCatalog,
final BigQuery bigquery,
final String datasetLocation,
final boolean disableTypeDedupe) {
final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver);
final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery);
final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(
bigquery,
datasetLocation);
if (disableTypeDedupe) {
return new NoOpTyperDeduperWithV1V2Migrations<>(
sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, List.of());
}
return new DefaultTyperDeduper<>(
sqlGenerator,
destinationHandler,
parsedCatalog,
migrator,
v2RawTableMigrator,
List.of());
}
@Override
public boolean isV2Destination() {
return true;
}
public static void main(final String[] args) throws Exception {
AirbyteExceptionHandler.addThrowableForDeinterpolation(BigQueryException.class);
final Destination destination = new BigQueryDestination();
new IntegrationRunner(destination).run(args);
}
}

View File

@@ -1,206 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableId;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations;
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil;
import io.airbyte.commons.exceptions.ConfigErrorException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BigQueryGcsOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsOperations.class);
private final BigQuery bigQuery;
private final StandardNameTransformer gcsNameTransformer;
private final GcsDestinationConfig gcsConfig;
private final GcsStorageOperations gcsStorageOperations;
private final String datasetLocation;
private final UUID randomStagingId;
private final DateTime syncDatetime;
private final boolean keepStagingFiles;
private final Set<String> existingSchemas = new HashSet<>();
public BigQueryGcsOperations(final BigQuery bigQuery,
final StandardNameTransformer gcsNameTransformer,
final GcsDestinationConfig gcsConfig,
final GcsStorageOperations gcsStorageOperations,
final String datasetLocation,
final UUID randomStagingId,
final DateTime syncDatetime,
final boolean keepStagingFiles) {
this.bigQuery = bigQuery;
this.gcsNameTransformer = gcsNameTransformer;
this.gcsConfig = gcsConfig;
this.gcsStorageOperations = gcsStorageOperations;
this.datasetLocation = datasetLocation;
this.randomStagingId = randomStagingId;
this.syncDatetime = syncDatetime;
this.keepStagingFiles = keepStagingFiles;
}
/**
* @return {@code <bucket-path>/<dataset-id>_<stream-name>}
*/
private String getStagingRootPath(final String datasetId, final String stream) {
return gcsNameTransformer.applyDefaultCase(String.format("%s/%s_%s",
gcsConfig.getBucketPath(),
gcsNameTransformer.convertStreamName(datasetId),
gcsNameTransformer.convertStreamName(stream)));
}
/**
* @return {@code <bucket-path>/<dataset-id>_<stream-name>/<year>/<month>/<day>/<hour>/<uuid>/}
*/
public String getStagingFullPath(final String datasetId, final String stream) {
return gcsNameTransformer.applyDefaultCase(String.format("%s/%s/%02d/%02d/%02d/%s/",
getStagingRootPath(datasetId, stream),
syncDatetime.year().get(),
syncDatetime.monthOfYear().get(),
syncDatetime.dayOfMonth().get(),
syncDatetime.hourOfDay().get(),
randomStagingId));
}
public void createSchemaIfNotExists(final String datasetId) {
if (!existingSchemas.contains(datasetId)) {
LOGGER.info("Creating dataset {}", datasetId);
try {
BigQueryUtils.getOrCreateDataset(bigQuery, datasetId, datasetLocation);
} catch (final BigQueryException e) {
if (ConnectorExceptionUtil.HTTP_AUTHENTICATION_ERROR_CODES.contains(e.getCode())) {
throw new ConfigErrorException(e.getMessage(), e);
} else {
throw e;
}
}
existingSchemas.add(datasetId);
}
}
public void createTableIfNotExists(final TableId tableId, final Schema tableSchema) {
LOGGER.info("Creating target table {}", tableId);
BigQueryUtils.createPartitionedTableIfNotExists(bigQuery, tableId, tableSchema);
}
public void createStageIfNotExists(final String datasetId, final String stream) {
final String objectPath = getStagingFullPath(datasetId, stream);
LOGGER.info("Creating staging path for stream {} (dataset {}): {}", stream, datasetId, objectPath);
gcsStorageOperations.createBucketIfNotExists();
}
public String uploadRecordsToStage(final String datasetId, final String stream, final SerializableBuffer writer) {
final String objectPath = getStagingFullPath(datasetId, stream);
LOGGER.info("Uploading records to staging for stream {} (dataset {}): {}", stream, datasetId, objectPath);
return gcsStorageOperations.uploadRecordsToBucket(writer, datasetId, objectPath);
}
/**
* Similar to COPY INTO within
* {@link io.airbyte.cdk.integrations.destination.staging.StagingOperations} which loads the data
* stored in the stage area into a target table in the destination
*
* Reference
* https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
*/
public void copyIntoTableFromStage(final String datasetId,
final String stream,
final TableId tableId,
final Schema tableSchema,
final String stagedFileName) {
LOGGER.info("Uploading records from staging files to target table {} (dataset {}): {}",
tableId, datasetId, stagedFileName);
final String fullFilePath = String.format("gs://%s/%s%s", gcsConfig.getBucketName(), getStagingFullPath(datasetId, stream), stagedFileName);
LOGGER.info("Uploading staged file: {}", fullFilePath);
final LoadJobConfiguration configuration = LoadJobConfiguration.builder(tableId, fullFilePath)
.setFormatOptions(FormatOptions.csv())
.setSchema(tableSchema)
.setWriteDisposition(WriteDisposition.WRITE_APPEND)
.setJobTimeoutMs(600000L) // 10 min
.build();
final Job loadJob = this.bigQuery.create(JobInfo.of(configuration));
LOGGER.info("[{}] Created a new job to upload record(s) to target table {} (dataset {}): {}", loadJob.getJobId(),
tableId, datasetId, loadJob);
try {
BigQueryUtils.waitForJobFinish(loadJob);
LOGGER.info("[{}] Target table {} (dataset {}) is successfully appended with staging files", loadJob.getJobId(),
tableId, datasetId);
} catch (final BigQueryException | InterruptedException e) {
throw new RuntimeException(
String.format("[%s] Failed to upload staging files to destination table %s (%s)", loadJob.getJobId(),
tableId, datasetId),
e);
}
}
@Deprecated
public void cleanUpStage(final String datasetId, final String stream, final List<String> stagedFiles) {
if (keepStagingFiles) {
return;
}
LOGGER.info("Deleting staging files for stream {} (dataset {}): {}", stream, datasetId, stagedFiles);
gcsStorageOperations.cleanUpBucketObject(getStagingRootPath(datasetId, stream), stagedFiles);
}
public void dropTableIfExists(final String datasetId, final TableId tableId) {
LOGGER.info("Deleting target table {} (dataset {})", tableId, datasetId);
bigQuery.delete(tableId);
}
public void dropStageIfExists(final String datasetId, final String stream) {
if (keepStagingFiles) {
return;
}
final String stagingDatasetPath = getStagingRootPath(datasetId, stream);
LOGGER.info("Cleaning up staging path for stream {} (dataset {}): {}", stream, datasetId, stagingDatasetPath);
gcsStorageOperations.dropBucketObject(stagingDatasetPath);
}
/**
* "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where the
* table's partition filter must be turned off to truncate. Since deleting a table is a free
* operation this option re-uses functions that already exist
*
* <p>
* See: https://cloud.google.com/bigquery/pricing#free
* </p>
*
* @param datasetId equivalent to schema name
* @param tableId table name
* @param schema schema of the table to be deleted/created
*/
public void truncateTableIfExists(final String datasetId,
final TableId tableId,
final Schema schema) {
LOGGER.info("Truncating target table {} (dataset {})", tableId, datasetId);
dropTableIfExists(datasetId, tableId);
createTableIfNotExists(tableId, schema);
}
}

View File

@@ -1,44 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery;
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@SuppressWarnings("try")
public class BigQueryRecordStandardConsumer extends AsyncStreamConsumer {
public BigQueryRecordStandardConsumer(Consumer<AirbyteMessage> outputRecordCollector,
OnStartFunction onStart,
OnCloseFunction onClose,
ConfiguredAirbyteCatalog catalog,
String defaultNamespace,
Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
super(outputRecordCollector,
onStart,
onClose,
new BigQueryAsyncStandardFlush(uploaderMap),
catalog,
new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5)),
Optional.ofNullable(defaultNamespace),
new FlushFailure(),
Executors.newFixedThreadPool(2));
}
}

View File

@@ -1,158 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
import com.google.cloud.bigquery.TableId;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class mimics the same functionality as
* {@link io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory} which likely
* should be placed into a commons package to be utilized across all ConsumerFactories
*/
public class BigQueryStagingConsumerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryStagingConsumerFactory.class);
public SerializedAirbyteMessageConsumer createAsync(
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final BigQueryGcsOperations bigQueryGcsOperations,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace) {
final Map<StreamDescriptor, StreamConfig> streamConfigMap = createWriteConfigs(
catalog,
parsedCatalog);
final DestinationFlushFunction flusher = new BigQueryAsyncFlush(streamConfigMap, bigQueryGcsOperations, catalog);
return new AsyncStreamConsumer(
outputRecordCollector,
onStartFunction(bigQueryGcsOperations, parsedCatalog.getStreams(), typerDeduper),
onCloseFunction(bigQueryGcsOperations, parsedCatalog.getStreams(), typerDeduper),
flusher,
catalog,
new BufferManager(getBigQueryBufferMemoryLimit()),
Optional.ofNullable(defaultNamespace));
}
/**
* Out BigQuery's uploader threads use a fair amount of memory. We believe this is largely due to
* the sdk client we use.
*
* @return number of bytes to make available for message buffering.
*/
private long getBigQueryBufferMemoryLimit() {
return (long) (Runtime.getRuntime().maxMemory() * 0.4);
}
private Map<StreamDescriptor, StreamConfig> createWriteConfigs(final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog) {
return catalog.getStreams().stream()
.map(configuredStream -> {
Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode");
final AirbyteStream stream = configuredStream.getStream();
return parsedCatalog.getStream(stream.getNamespace(), stream.getName());
})
.collect(Collectors.toMap(
c -> new StreamDescriptor().withName(c.getId().getOriginalName()).withNamespace(c.getId().getOriginalNamespace()),
Functions.identity()));
}
/**
* @param bigQueryGcsOperations collection of Google Cloud Storage Operations
* @param streamConfigs configuration settings used to describe how to write data and where it
* exists
*/
private OnStartFunction onStartFunction(final BigQueryGcsOperations bigQueryGcsOperations,
final List<StreamConfig> streamConfigs,
final TyperDeduper typerDeduper) {
return () -> {
LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", streamConfigs.size());
typerDeduper.prepareSchemasAndRunMigrations();
for (final StreamConfig streamConfig : streamConfigs) {
final var tableId = TableId.of(streamConfig.getId().getRawNamespace(), streamConfig.getId().getRawName());
LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}",
BigQueryRecordFormatter.SCHEMA_V2, streamConfig.getId().getOriginalName(),
tableId, streamConfig.getId().getOriginalName());
// In Destinations V2, we will always use the 'airbyte_internal' schema/originalNamespace for raw
// tables
final String rawDatasetId = DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
// Regardless, ensure the schema the customer wants to write to exists
bigQueryGcsOperations.createSchemaIfNotExists(streamConfig.getId().getRawNamespace());
// Schema used for raw and airbyte internal tables
bigQueryGcsOperations.createSchemaIfNotExists(rawDatasetId);
// Customer's destination schema
// With checkpointing, we will be creating the target table earlier in the setup such that
// the data can be immediately loaded from the staging area
bigQueryGcsOperations.createTableIfNotExists(tableId, BigQueryRecordFormatter.SCHEMA_V2);
bigQueryGcsOperations.createStageIfNotExists(rawDatasetId, streamConfig.getId().getOriginalName());
// When OVERWRITE mode, truncate the destination's raw table prior to syncing data
if (streamConfig.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) {
// TODO: this might need special handling during the migration
bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, tableId, BigQueryRecordFormatter.SCHEMA_V2);
}
}
typerDeduper.prepareFinalTables();
LOGGER.info("Preparing tables in destination completed.");
};
}
/**
* Tear down process, will attempt to clean out any staging area
*
* @param bigQueryGcsOperations collection of staging operations
* @param streamConfigs configuration settings used to describe how to write data and where it
* exists
*/
private OnCloseFunction onCloseFunction(final BigQueryGcsOperations bigQueryGcsOperations,
final List<StreamConfig> streamConfigs,
final TyperDeduper typerDeduper) {
return (hasFailed, streamSyncSummaries) -> {
/*
* Previously the hasFailed value was used to commit any remaining staged files into destination,
* however, with the changes to checkpointing this will no longer be necessary since despite partial
* successes, we'll be committing the target table (aka airbyte_raw) table throughout the sync
*/
typerDeduper.typeAndDedupe(streamSyncSummaries);
LOGGER.info("Cleaning up destination started for {} streams", streamConfigs.size());
for (final StreamConfig streamConfig : streamConfigs) {
bigQueryGcsOperations.dropStageIfExists(streamConfig.getId().getRawNamespace(), streamConfig.getId().getOriginalName());
}
typerDeduper.commitFinalTables();
typerDeduper.cleanup();
LOGGER.info("Cleaning up destination completed.");
};
}
}

View File

@@ -4,10 +4,9 @@
package io.airbyte.integrations.destination.bigquery;
import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
@@ -40,12 +39,14 @@ import io.airbyte.commons.json.Jsons;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;
public class BigQueryUtils {
@@ -85,16 +86,6 @@ public class BigQueryUtils {
}
}
public static void createSchemaAndTableIfNeeded(final BigQuery bigquery,
final Set<String> existingSchemas,
final String schemaName,
final String datasetLocation) {
if (!existingSchemas.contains(schemaName)) {
getOrCreateDataset(bigquery, schemaName, datasetLocation);
existingSchemas.add(schemaName);
}
}
public static Dataset getOrCreateDataset(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
Dataset dataset = bigquery.getDataset(datasetId);
if (dataset == null || !dataset.exists()) {
@@ -185,7 +176,7 @@ public class BigQueryUtils {
* @return Table BigQuery table object to be referenced for deleting, otherwise empty meaning table
* was not successfully created
*/
static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) {
public static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) {
try {
final var chunkingColumn = JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT;
final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
@@ -214,7 +205,8 @@ public class BigQueryUtils {
}
} catch (final BigQueryException e) {
LOGGER.error("Partitioned table was not created: " + tableId, e);
LOGGER.error("Partitioned table was not created: {}", tableId, e);
throw e;
}
}
@@ -316,17 +308,41 @@ public class BigQueryUtils {
if (job != null) {
AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag());
try {
LOGGER.info("Waiting for job finish {}. Status: {}", job, job.getStatus());
job.waitFor();
LOGGER.info("Job finish {} with status {}", job, job.getStatus());
LOGGER.info("Waiting for Job {} to finish. Status: {}", job.getJobId(), job.getStatus());
// Default totalTimeout is 12 Hours, 30 minutes seems reasonable
final Job completedJob = job.waitFor(RetryOption.totalTimeout(Duration.ofMinutes(30)));
if (completedJob == null) {
// job no longer exists
LOGGER.warn("Job {} No longer exists", job.getJobId());
} else if (completedJob.getStatus().getError() != null) {
// job failed, handle error
LOGGER.error("Job {} failed with errors {}", completedJob.getJobId(), completedJob.getStatus().getError().toString());
throw new RuntimeException(
"Fail to complete a load job in big query, Job id: " + completedJob.getJobId() +
", with error: " + completedJob.getStatus().getError());
} else {
// job completed successfully
LOGGER.info("Job {} completed successfully, job info {}", completedJob.getJobId(), completedJob);
}
} catch (final BigQueryException e) {
final String errorMessage = getJobErrorMessage(e.getErrors(), job);
LOGGER.error(errorMessage);
throw new BigQueryException(e.getCode(), errorMessage, e);
}
} else {
LOGGER.warn("Received null value for Job, nothing to waitFor");
}
}
private static String getJobErrorMessage(List<BigQueryError> errors, Job job) {
if (errors == null || errors.isEmpty()) {
return StringUtils.EMPTY;
}
return String.format("An error occurred during execution of job: %s, \n For more details see Big Query Error collection: %s:", job,
errors.stream().map(BigQueryError::toString).collect(Collectors.joining(",\n ")));
}
public static HeaderProvider getHeaderProvider() {
final String connectorName = getConnectorNameOrDefault();
return () -> ImmutableMap.of("user-agent", String.format(USER_AGENT_FORMAT, connectorName));

View File

@@ -9,15 +9,12 @@ import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage;
import io.airbyte.commons.json.Jsons;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The class formats incoming JsonSchema and AirbyteRecord in order to be inline with a
@@ -30,13 +27,8 @@ public class BigQueryRecordFormatter {
Field.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, StandardSQLTypeName.TIMESTAMP),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, StandardSQLTypeName.TIMESTAMP),
Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING));
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordFormatter.class);
protected final StandardNameTransformer namingResolver;
public BigQueryRecordFormatter(final StandardNameTransformer namingResolver) {
this.namingResolver = namingResolver;
}
public BigQueryRecordFormatter() {}
public String formatRecord(PartialAirbyteMessage recordMessage) {
// Map.of has a @NonNull requirement, so creating a new Hash map

View File

@@ -1,41 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.helpers;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.Job;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoggerHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerHelper.class);
private LoggerHelper() {}
public static void printHeapMemoryConsumption() {
final int mb = 1024 * 1024;
final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
final long xmx = memoryBean.getHeapMemoryUsage().getMax() / mb;
final long xms = memoryBean.getHeapMemoryUsage().getInit() / mb;
LOGGER.info("Initial Memory (xms) mb = {}", xms);
LOGGER.info("Max Memory (xmx) : mb = {}", xmx);
}
public static String getJobErrorMessage(List<BigQueryError> errors, Job job) {
if (errors == null || errors.isEmpty()) {
return StringUtils.EMPTY;
}
return String.format("An error occurred during execution of job: %s, \n For more details see Big Query Error collection: %s:", job,
errors.stream().map(BigQueryError::toString).collect(Collectors.joining(",\n ")));
}
}

View File

@@ -33,6 +33,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
@@ -42,8 +44,8 @@ import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState.Impl;
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
@@ -58,11 +60,11 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.text.StringSubstitutor;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// TODO this stuff almost definitely exists somewhere else in our codebase.
public class BigQueryDestinationHandler implements DestinationHandler<MinimumDestinationState.Impl> {
public class BigQueryDestinationHandler implements DestinationHandler<BigQueryDestinationState> {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class);
@@ -98,7 +100,7 @@ public class BigQueryDestinationHandler implements DestinationHandler<MinimumDes
FROM ${raw_table}
WHERE _airbyte_loaded_at IS NULL
"""))
.build()).iterateAll().iterator().next().get(0);
.build()).iterateAll().iterator().next().getFirst();
// If this value is null, then there are no records with null loaded_at.
// If it's not null, then we can return immediately - we've found some unprocessed records and their
// timestamp.
@@ -112,7 +114,7 @@ public class BigQueryDestinationHandler implements DestinationHandler<MinimumDes
SELECT MAX(_airbyte_extracted_at)
FROM ${raw_table}
"""))
.build()).iterateAll().iterator().next().get(0);
.build()).iterateAll().iterator().next().getFirst();
// We know (from the previous query) that all records have been processed by T+D already.
// So we just need to get the timestamp of the most recent record.
if (loadedRecordTimestamp.isNull()) {
@@ -192,8 +194,8 @@ public class BigQueryDestinationHandler implements DestinationHandler<MinimumDes
}
@Override
public List<DestinationInitialStatus<Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final List<DestinationInitialStatus<MinimumDestinationState.Impl>> initialStates = new ArrayList<>();
public List<DestinationInitialStatus<BigQueryDestinationState>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final List<DestinationInitialStatus<BigQueryDestinationState>> initialStates = new ArrayList<>();
for (final StreamConfig streamConfig : streamConfigs) {
final StreamId id = streamConfig.getId();
final Optional<TableDefinition> finalTable = findExistingTable(id);
@@ -205,13 +207,13 @@ public class BigQueryDestinationHandler implements DestinationHandler<MinimumDes
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
finalTable.isEmpty() || isFinalTableEmpty(id),
// Return a default state blob since we don't actually track state.
new MinimumDestinationState.Impl(false)));
new BigQueryDestinationState(false)));
}
return initialStates;
}
@Override
public void commitDestinationStates(Map<StreamId, ? extends MinimumDestinationState.Impl> destinationStates) throws Exception {
public void commitDestinationStates(Map<StreamId, ? extends BigQueryDestinationState> destinationStates) throws Exception {
// Intentionally do nothing. Bigquery doesn't actually support destination states.
}
@@ -321,4 +323,22 @@ public class BigQueryDestinationHandler implements DestinationHandler<MinimumDes
: Collections.emptySet();
}
@Override
public void createNamespaces(@NotNull Set<String> schemas) {
schemas.forEach(this::createDataset);
}
private void createDataset(final String dataset) {
LOGGER.info("Creating dataset if not present {}", dataset);
try {
BigQueryUtils.getOrCreateDataset(bq, dataset, datasetLocation);
} catch (BigQueryException e) {
if (ConnectorExceptionUtil.HTTP_AUTHENTICATION_ERROR_CODES.contains(e.getCode())) {
throw new ConfigErrorException(e.getMessage(), e);
} else {
throw e;
}
}
}
}

View File

@@ -6,7 +6,7 @@ package io.airbyte.integrations.destination.bigquery.typing_deduping;
import static io.airbyte.integrations.base.destination.typing_deduping.Sql.separately;
import static io.airbyte.integrations.base.destination.typing_deduping.Sql.transactionally;
import static io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.SOFT_RESET_SUFFIX;
import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.SOFT_RESET_SUFFIX;
import static java.util.stream.Collectors.joining;
import com.google.cloud.bigquery.StandardSQLTypeName;
@@ -183,13 +183,12 @@ public class BigQuerySqlGenerator implements SqlGenerator {
// the SQLGenerator?
public static StandardSQLTypeName toDialectType(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> StandardSQLTypeName.STRING;
case STRING, TIME_WITH_TIMEZONE -> StandardSQLTypeName.STRING;
case NUMBER -> StandardSQLTypeName.NUMERIC;
case INTEGER -> StandardSQLTypeName.INT64;
case BOOLEAN -> StandardSQLTypeName.BOOL;
case TIMESTAMP_WITH_TIMEZONE -> StandardSQLTypeName.TIMESTAMP;
case TIMESTAMP_WITHOUT_TIMEZONE -> StandardSQLTypeName.DATETIME;
case TIME_WITH_TIMEZONE -> StandardSQLTypeName.STRING;
case TIME_WITHOUT_TIMEZONE -> StandardSQLTypeName.TIME;
case DATE -> StandardSQLTypeName.DATE;
case UNKNOWN -> StandardSQLTypeName.JSON;

View File

@@ -1,81 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.typing_deduping;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator;
import java.util.Map;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BigQueryV2TableMigrator implements V2TableMigrator {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2TableMigrator.class);
private final BigQuery bq;
public BigQueryV2TableMigrator(final BigQuery bq) {
this.bq = bq;
}
@Override
public void migrateIfNecessary(final StreamConfig streamConfig) throws InterruptedException {
final Table rawTable = bq.getTable(TableId.of(streamConfig.getId().getRawNamespace(), streamConfig.getId().getRawName()));
if (rawTable != null && rawTable.exists()) {
final Schema existingRawSchema = rawTable.getDefinition().getSchema();
final FieldList fields = existingRawSchema.getFields();
if (fields.stream().noneMatch(f -> JavaBaseConstants.COLUMN_NAME_DATA.equals(f.getName()))) {
throw new IllegalStateException(
"Table does not have a column named _airbyte_data. We are likely colliding with a completely different table.");
}
final Field dataColumn = fields.get(JavaBaseConstants.COLUMN_NAME_DATA);
if (dataColumn.getType() == LegacySQLTypeName.JSON) {
LOGGER.info("Raw table has _airbyte_data of type JSON. Migrating to STRING.");
final String tmpRawTableId = BigQuerySqlGenerator.QUOTE + streamConfig.getId().getRawNamespace() + BigQuerySqlGenerator.QUOTE + "."
+ BigQuerySqlGenerator.QUOTE + streamConfig.getId().getRawName() + "_airbyte_tmp" + BigQuerySqlGenerator.QUOTE;
bq.query(QueryJobConfiguration.of(
new StringSubstitutor(Map.of(
"raw_table", streamConfig.getId().rawTableId(BigQuerySqlGenerator.QUOTE),
"tmp_raw_table", tmpRawTableId,
"real_raw_table", BigQuerySqlGenerator.QUOTE + streamConfig.getId().getRawName() + BigQuerySqlGenerator.QUOTE)).replace(
// In full refresh / append mode, standard inserts is creating a non-partitioned raw table.
// (possibly also in overwrite mode?).
// We can't just CREATE OR REPLACE the table because bigquery will complain that we're trying to
// change the partitioning scheme.
// Do an explicit CREATE tmp + DROP + RENAME, similar to how we overwrite the final tables in
// OVERWRITE mode.
"""
CREATE TABLE ${tmp_raw_table}
PARTITION BY DATE(_airbyte_extracted_at)
CLUSTER BY _airbyte_extracted_at
AS (
SELECT
_airbyte_raw_id,
_airbyte_extracted_at,
_airbyte_loaded_at,
to_json_string(_airbyte_data) as _airbyte_data
FROM ${raw_table}
);
DROP TABLE IF EXISTS ${raw_table};
ALTER TABLE ${tmp_raw_table} RENAME TO ${real_raw_table};
""")));
LOGGER.info("Completed Data column Migration for stream {}", streamConfig.getId().getRawName());
} else {
LOGGER.info("No Data column Migration Required for stream {}", streamConfig.getId().getRawName());
}
}
}
}

View File

@@ -1,81 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.uploader;
import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.printHeapMemoryConsumption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BigQueryDirectUploader {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDirectUploader.class);
protected final TableId table;
protected final BigQueryTableWriter writer;
protected final BigQuery bigQuery;
protected final BigQueryRecordFormatter recordFormatter;
BigQueryDirectUploader(final TableId table,
final BigQueryTableWriter writer,
final BigQuery bigQuery,
final BigQueryRecordFormatter recordFormatter) {
this.table = table;
this.writer = writer;
this.bigQuery = bigQuery;
this.recordFormatter = recordFormatter;
}
public void upload(final PartialAirbyteMessage airbyteMessage) {
try {
writer.write(recordFormatter.formatRecord(airbyteMessage));
} catch (final IOException | RuntimeException e) {
LOGGER.error("Got an error while writing message: {}", e.getMessage(), e);
LOGGER.error(String.format(
"Failed to process a message for job: %s",
writer.toString()));
printHeapMemoryConsumption();
throw new RuntimeException(e);
}
}
public void closeAfterPush() {
try {
this.writer.close();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
public void createRawTable() {
// Ensure that this table exists.
final Table rawTable = bigQuery.getTable(table);
if (rawTable == null) {
LOGGER.info("Creating raw table {}.", table);
bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(BigQueryRecordFormatter.SCHEMA_V2)).build());
} else {
LOGGER.info("Found raw table {}.", rawTable.getTableId());
}
}
@Override
public String toString() {
return "BigQueryDirectUploader{" +
"table=" + table.getTable() +
", writer=" + writer.getClass() +
", recordFormatter=" + recordFormatter.getClass() +
'}';
}
}

View File

@@ -1,121 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.uploader;
import static io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BigQueryUploaderFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUploaderFactory.class);
private static final int HTTP_STATUS_CODE_FORBIDDEN = 403;
private static final int HTTP_STATUS_CODE_NOT_FOUND = 404;
private static final String CONFIG_ERROR_MSG = """
Failed to write to destination schema.
1. Make sure you have all required permissions for writing to the schema.
2. Make sure that the actual destination schema's location corresponds to location provided
in connector's config.
3. Try to change the "Destination schema" from "Mirror Source Structure" (if it's set) tp the
"Destination Default" option.
More details:
""";
public static BigQueryDirectUploader getUploader(final UploaderConfig uploaderConfig)
throws IOException {
final String dataset = uploaderConfig.getParsedStream().getId().getRawNamespace();
final String datasetLocation = BigQueryUtils.getDatasetLocation(uploaderConfig.getConfig());
final Set<String> existingDatasets = new HashSet<>();
final BigQueryRecordFormatter recordFormatter = uploaderConfig.getFormatter();
final TableId targetTable = TableId.of(dataset, uploaderConfig.getTargetTableName());
BigQueryUtils.createSchemaAndTableIfNeeded(
uploaderConfig.getBigQuery(),
existingDatasets,
dataset,
datasetLocation);
return getBigQueryDirectUploader(
uploaderConfig.getConfig(),
targetTable,
uploaderConfig.getBigQuery(),
datasetLocation,
recordFormatter);
}
private static BigQueryDirectUploader getBigQueryDirectUploader(
final JsonNode config,
final TableId targetTable,
final BigQuery bigQuery,
final String datasetLocation,
final BigQueryRecordFormatter formatter) {
// https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source
LOGGER.info("Will write raw data to {} with schema {}", targetTable, SCHEMA_V2);
final WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(targetTable)
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(SCHEMA_V2)
.setFormatOptions(FormatOptions.json())
.build(); // new-line delimited json.
final JobId job = JobId.newBuilder()
.setRandomJob()
.setLocation(datasetLocation)
.setProject(bigQuery.getOptions().getProjectId())
.build();
final TableDataWriteChannel writer;
try {
writer = bigQuery.writer(job, writeChannelConfiguration);
} catch (final BigQueryException e) {
if (e.getCode() == HTTP_STATUS_CODE_FORBIDDEN || e.getCode() == HTTP_STATUS_CODE_NOT_FOUND) {
throw new ConfigErrorException(CONFIG_ERROR_MSG + e);
} else {
throw new BigQueryException(e.getCode(), e.getMessage());
}
}
// this this optional value. If not set - use default client's value (15MiG)
final Integer bigQueryClientChunkSizeFomConfig =
BigQueryUtils.getBigQueryClientChunkSize(config);
if (bigQueryClientChunkSizeFomConfig != null) {
writer.setChunkSize(bigQueryClientChunkSizeFomConfig);
}
return new BigQueryDirectUploader(
targetTable,
new BigQueryTableWriter(writer),
bigQuery,
formatter);
}
}

View File

@@ -1,11 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.uploader;
public enum UploaderType {
STANDARD,
AVRO,
CSV
}

View File

@@ -1,50 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.uploader.config;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import io.airbyte.integrations.destination.bigquery.UploadingMethod;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.Map;
import lombok.Builder;
import lombok.Getter;
@Builder
@Getter
public class UploaderConfig {
private JsonNode config;
/**
* Taken directly from the {@link ConfiguredAirbyteStream}, except if the originalNamespace was
* null, we set it to the destination default originalNamespace.
*/
private ConfiguredAirbyteStream configStream;
/**
* Parsed directly from {@link #configStream}.
*/
private StreamConfig parsedStream;
private String targetTableName;
private BigQuery bigQuery;
private Map<UploaderType, BigQueryRecordFormatter> formatterMap;
private boolean isDefaultAirbyteTmpSchema;
public boolean isGcsUploadingMode() {
return BigQueryUtils.getLoadingMethod(config) == UploadingMethod.GCS;
}
public UploaderType getUploaderType() {
return (isGcsUploadingMode() ? UploaderType.CSV : UploaderType.STANDARD);
}
public BigQueryRecordFormatter getFormatter() {
return formatterMap.get(getUploaderType());
}
}

View File

@@ -1,38 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.writer;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.common.base.Charsets;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public record BigQueryTableWriter(TableDataWriteChannel writeChannel) {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTableWriter.class);
public void write(final String formattedData) throws IOException {
writeChannel.write(ByteBuffer.wrap((formattedData + "\n").getBytes(Charsets.UTF_8)));
}
public void close() throws IOException {
this.writeChannel.close();
try {
final Job job = writeChannel.getJob();
if (job != null && job.getStatus().getError() != null) {
AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag());
throw new RuntimeException("Fail to complete a load job in big query, Job id: " + writeChannel.getJob().getJobId() +
", with error: " + writeChannel.getJob().getStatus().getError());
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,27 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery
object BigQueryConsts {
const val MiB: Int = 1024 * 1024
const val CONFIG_DATASET_ID: String = "dataset_id"
const val CONFIG_PROJECT_ID: String = "project_id"
const val CONFIG_DATASET_LOCATION: String = "dataset_location"
const val CONFIG_CREDS: String = "credentials_json"
const val BIG_QUERY_CLIENT_CHUNK_SIZE: String = "big_query_client_buffer_size_mb"
const val LOADING_METHOD: String = "loading_method"
const val METHOD: String = "method"
const val GCS_STAGING: String = "GCS Staging"
const val GCS_BUCKET_NAME: String = "gcs_bucket_name"
const val GCS_BUCKET_PATH: String = "gcs_bucket_path"
const val GCS_BUCKET_REGION: String = "gcs_bucket_region"
const val CREDENTIAL: String = "credential"
const val FORMAT: String = "format"
const val KEEP_GCS_FILES: String = "keep_files_in_gcs-bucket"
const val KEEP_GCS_FILES_VAL: String = "Keep all tmp files in GCS"
const val DISABLE_TYPE_DEDUPE: String = "disable_type_dedupe"
const val RAW_DATA_DATASET = "raw_data_dataset"
const val NAMESPACE_PREFIX: String = "n"
}

View File

@@ -0,0 +1,67 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
import io.airbyte.integrations.base.destination.operation.DefaultFlush
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import java.util.*
import java.util.function.Consumer
object BigQueryConsumerFactory {
fun createStagingConsumer(
outputRecordCollector: Consumer<AirbyteMessage>,
syncOperation: SyncOperation,
catalog: ConfiguredAirbyteCatalog,
defaultNamespace: String
): AsyncStreamConsumer {
// values here are resurrected from some old code.
// TODO: Find why max memory ratio is 0.4 capped
return AsyncStreamConsumer(
outputRecordCollector = outputRecordCollector,
onStart = {},
onClose = { _, streamSyncSummaries ->
syncOperation.finalizeStreams(streamSyncSummaries)
},
onFlush = DefaultFlush(200 * 1024 * 1024, syncOperation),
catalog = catalog,
bufferManager =
BufferManager(
(Runtime.getRuntime().maxMemory() * 0.4).toLong(),
),
defaultNamespace = Optional.of(defaultNamespace),
)
}
fun createDirectUploadConsumer(
outputRecordCollector: Consumer<AirbyteMessage>,
syncOperation: SyncOperation,
catalog: ConfiguredAirbyteCatalog,
defaultNamespace: String
): AsyncStreamConsumer {
// TODO: Why is Standard consumer operating at memory ratio of 0.5
// and Max 2 threads and some weird 20% max memory as the default flush size.
return AsyncStreamConsumer(
outputRecordCollector = outputRecordCollector,
onStart = {},
onClose = { _, streamSyncSummaries ->
syncOperation.finalizeStreams(streamSyncSummaries)
},
onFlush =
DefaultFlush((Runtime.getRuntime().maxMemory() * 0.2).toLong(), syncOperation),
catalog = catalog,
bufferManager =
BufferManager(
(Runtime.getRuntime().maxMemory() * 0.5).toLong(),
),
defaultNamespace = Optional.of(defaultNamespace),
)
}
}

View File

@@ -0,0 +1,397 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery
import com.fasterxml.jackson.databind.JsonNode
import com.google.auth.oauth2.GoogleCredentials
import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.BigQueryException
import com.google.cloud.bigquery.BigQueryOptions
import com.google.cloud.bigquery.QueryJobConfiguration
import com.google.cloud.storage.Storage
import com.google.cloud.storage.StorageOptions
import com.google.common.base.Charsets
import io.airbyte.cdk.integrations.BaseConnector
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addAllStringsInConfigForDeinterpolation
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addThrowableForDeinterpolation
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer
import io.airbyte.cdk.integrations.base.Destination
import io.airbyte.cdk.integrations.base.IntegrationRunner
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns.*
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.getRawNamespaceOverride
import io.airbyte.cdk.integrations.destination.gcs.BaseGcsDestination
import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer
import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.staging.operation.StagingStreamOperations
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.json.Jsons.serialize
import io.airbyte.commons.json.Jsons.tryDeserialize
import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation
import io.airbyte.integrations.base.destination.operation.StandardStreamOperation
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.destination.bigquery.BigQueryConsts as bqConstants
import io.airbyte.integrations.destination.bigquery.BigQueryConsumerFactory.createDirectUploadConsumer
import io.airbyte.integrations.destination.bigquery.BigQueryConsumerFactory.createStagingConsumer
import io.airbyte.integrations.destination.bigquery.BigQueryUtils.*
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDV2Migration
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState
import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation
import io.airbyte.integrations.destination.bigquery.operation.BigQueryGcsStorageOperation
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.ByteArrayInputStream
import java.io.IOException
import java.util.*
import java.util.function.Consumer
import org.apache.commons.lang3.StringUtils
private val log = KotlinLogging.logger {}
class BigQueryDestination : BaseConnector(), Destination {
override fun check(config: JsonNode): AirbyteConnectionStatus? {
try {
val datasetId = getDatasetId(config)
val datasetLocation = getDatasetLocation(config)
val bigquery = getBigQuery(config)
val uploadingMethod = getLoadingMethod(config)
checkHasCreateAndDeleteDatasetRole(bigquery, datasetId, datasetLocation)
val dataset = getOrCreateDataset(bigquery, datasetId, datasetLocation)
if (dataset.location != datasetLocation) {
throw ConfigErrorException(
"Actual dataset location doesn't match to location from config",
)
}
val queryConfig =
QueryJobConfiguration.newBuilder(
String.format(
"SELECT * FROM `%s.INFORMATION_SCHEMA.TABLES` LIMIT 1;",
datasetId,
),
)
.setUseLegacySql(false)
.build()
if (UploadingMethod.GCS == uploadingMethod) {
val status = checkGcsPermission(config)
if (status!!.status != AirbyteConnectionStatus.Status.SUCCEEDED) {
return status
}
}
val result = executeQuery(bigquery, queryConfig)
if (result.getLeft() != null) {
return AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.SUCCEEDED)
} else {
throw ConfigErrorException(result.getRight())
}
} catch (e: Exception) {
log.error(e) { "Check failed." }
throw ConfigErrorException((if (e.message != null) e.message else e.toString())!!)
}
}
/**
* This method does two checks: 1) permissions related to the bucket, and 2) the ability to
* create and delete an actual file. The latter is important because even if the service account
* may have the proper permissions, the HMAC keys can only be verified by running the actual GCS
* check.
*/
private fun checkGcsPermission(config: JsonNode): AirbyteConnectionStatus? {
val loadingMethod = config[bqConstants.LOADING_METHOD]
val bucketName = loadingMethod[bqConstants.GCS_BUCKET_NAME].asText()
val missingPermissions: MutableList<String> = ArrayList()
try {
val credentials = getServiceAccountCredentials(config)
val storage: Storage =
StorageOptions.newBuilder()
.setProjectId(config[bqConstants.CONFIG_PROJECT_ID].asText())
.setCredentials(credentials)
.setHeaderProvider(getHeaderProvider())
.build()
.service
val permissionsCheckStatusList: List<Boolean> =
storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS)
// testIamPermissions returns a list of booleans
// in the same order of the presented permissions list
missingPermissions.addAll(
permissionsCheckStatusList
.asSequence()
.withIndex()
.filter { !it.value }
.map { REQUIRED_PERMISSIONS[it.index] }
.toList(),
)
val gcsDestination: BaseGcsDestination = object : BaseGcsDestination() {}
val gcsJsonNodeConfig = getGcsJsonNodeConfig(config)
return gcsDestination.check(gcsJsonNodeConfig)
} catch (e: Exception) {
val message = StringBuilder("Cannot access the GCS bucket.")
if (!missingPermissions.isEmpty()) {
message
.append(" The following permissions are missing on the service account: ")
.append(java.lang.String.join(", ", missingPermissions))
.append(".")
}
message.append(
" Please make sure the service account can access the bucket path, and the HMAC keys are correct.",
)
log.error(e) { message.toString() }
throw ConfigErrorException(
"Could not access the GCS bucket with the provided configuration.\n",
e,
)
}
}
/**
* Returns a [AirbyteMessageConsumer] based on whether the uploading mode is STANDARD INSERTS or
* using STAGING
*
* @param config
* - integration-specific configuration object as json. e.g. { "username": "airbyte",
* "password": "super secure" }
* @param catalog
* - schema of the incoming messages.
*/
override fun getConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage>
): AirbyteMessageConsumer? {
throw UnsupportedOperationException("Should use getSerializedMessageConsumer")
}
@Throws(Exception::class)
override fun getSerializedMessageConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage>
): SerializedAirbyteMessageConsumer {
val uploadingMethod = getLoadingMethod(config)
val defaultNamespace = getDatasetId(config)
setDefaultStreamNamespace(catalog, defaultNamespace)
val disableTypeDedupe = getDisableTypeDedupFlag(config)
val datasetLocation = getDatasetLocation(config)
val projectId = config[bqConstants.CONFIG_PROJECT_ID].asText()
val bigquery = getBigQuery(config)
val rawNamespaceOverride = getRawNamespaceOverride(bqConstants.RAW_DATA_DATASET)
addAllStringsInConfigForDeinterpolation(config)
val serviceAccountKey = config[bqConstants.CONFIG_CREDS]
if (serviceAccountKey != null) {
// If the service account key is a non-null string, we will try to
// deserialize it. Otherwise, we will let the Google library find it in
// the environment during the client initialization.
if (serviceAccountKey.isTextual) {
// There are cases where we fail to deserialize the service account key. In these
// cases, we
// shouldn't do anything.
// Google's creds library is more lenient with JSON-parsing than Jackson, and I'd
// rather just let it
// go.
tryDeserialize(serviceAccountKey.asText()).ifPresent { obj: JsonNode ->
addAllStringsInConfigForDeinterpolation(obj)
}
} else {
addAllStringsInConfigForDeinterpolation(serviceAccountKey)
}
}
val sqlGenerator = BigQuerySqlGenerator(projectId, datasetLocation)
val parsedCatalog =
parseCatalog(
sqlGenerator,
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
catalog,
)
val destinationHandler = BigQueryDestinationHandler(bigquery, datasetLocation)
val migrations = listOf(BigQueryDV2Migration(sqlGenerator, bigquery))
if (uploadingMethod == UploadingMethod.STANDARD) {
val bigQueryClientChunkSize = getBigQueryClientChunkSize(config)
val bigQueryLoadingStorageOperation =
BigQueryDirectLoadingStorageOperation(
bigquery,
bigQueryClientChunkSize,
BigQueryRecordFormatter(),
sqlGenerator,
destinationHandler,
datasetLocation,
)
val syncOperation =
DefaultSyncOperation<BigQueryDestinationState>(
parsedCatalog,
destinationHandler,
defaultNamespace,
{ initialStatus: DestinationInitialStatus<BigQueryDestinationState>, disableTD
->
StandardStreamOperation(
bigQueryLoadingStorageOperation,
initialStatus,
disableTD
)
},
migrations,
disableTypeDedupe,
)
return createDirectUploadConsumer(
outputRecordCollector,
syncOperation,
catalog,
defaultNamespace,
)
}
val gcsNameTransformer = GcsNameTransformer()
val gcsConfig = getGcsCsvDestinationConfig(config)
val keepStagingFiles = isKeepFilesInGcs(config)
val gcsOperations =
GcsStorageOperations(gcsNameTransformer, gcsConfig.getS3Client(), gcsConfig)
val bigQueryGcsStorageOperations =
BigQueryGcsStorageOperation(
gcsOperations,
gcsConfig,
gcsNameTransformer,
keepStagingFiles,
bigquery,
sqlGenerator,
destinationHandler,
)
val syncOperation: SyncOperation =
DefaultSyncOperation<BigQueryDestinationState>(
parsedCatalog,
destinationHandler,
defaultNamespace,
{ initialStatus: DestinationInitialStatus<BigQueryDestinationState>, disableTD ->
StagingStreamOperations(
bigQueryGcsStorageOperations,
initialStatus,
FileUploadFormat.CSV,
V2_WITHOUT_META,
disableTD
)
},
migrations,
disableTypeDedupe,
)
return createStagingConsumer(
outputRecordCollector,
syncOperation,
catalog,
defaultNamespace,
)
}
private fun setDefaultStreamNamespace(catalog: ConfiguredAirbyteCatalog, namespace: String) {
// Set the default originalNamespace on streams with null originalNamespace. This means we
// don't
// need to repeat this
// logic in the rest of the connector.
// (record messages still need to handle null namespaces though, which currently happens in
// e.g.
// AsyncStreamConsumer#accept)
// This probably should be shared logic amongst destinations eventually.
for (stream in catalog.streams) {
if (StringUtils.isEmpty(stream.stream.namespace)) {
stream.stream.withNamespace(namespace)
}
}
}
private fun parseCatalog(
sqlGenerator: BigQuerySqlGenerator,
rawNamespaceOverride: String,
catalog: ConfiguredAirbyteCatalog
): ParsedCatalog {
val catalogParser = CatalogParser(sqlGenerator, rawNamespaceOverride)
return catalogParser.parseCatalog(catalog)
}
override val isV2Destination: Boolean
get() = true
companion object {
private val REQUIRED_PERMISSIONS =
listOf(
"storage.multipartUploads.abort",
"storage.multipartUploads.create",
"storage.objects.create",
"storage.objects.delete",
"storage.objects.get",
"storage.objects.list",
)
@JvmStatic
fun getBigQuery(config: JsonNode): BigQuery {
val projectId = config[bqConstants.CONFIG_PROJECT_ID].asText()
try {
val bigQueryBuilder = BigQueryOptions.newBuilder()
val credentials = getServiceAccountCredentials(config)
return bigQueryBuilder
.setProjectId(projectId)
.setCredentials(credentials)
.setHeaderProvider(getHeaderProvider())
.build()
.service
} catch (e: IOException) {
throw RuntimeException(e)
}
}
@JvmStatic
@Throws(IOException::class)
fun getServiceAccountCredentials(config: JsonNode): GoogleCredentials {
val serviceAccountKey = config[bqConstants.CONFIG_CREDS]
// Follows this order of resolution:
// https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials#com_google_auth_oauth2_GoogleCredentials_getApplicationDefault
if (serviceAccountKey == null) {
log.info {
"No service account key json is provided. It is required if you are using Airbyte cloud."
}
log.info { "Using the default service account credential from environment." }
return GoogleCredentials.getApplicationDefault()
}
// The JSON credential can either be a raw JSON object, or a serialized JSON object.
val credentialsString =
if (serviceAccountKey.isObject) serialize(serviceAccountKey)
else serviceAccountKey.asText()
return GoogleCredentials.fromStream(
ByteArrayInputStream(credentialsString.toByteArray(Charsets.UTF_8)),
)
}
}
}
fun main(args: Array<String>) {
addThrowableForDeinterpolation(BigQueryException::class.java)
val destination: Destination = BigQueryDestination()
log.info { "Starting Destination : ${destination.javaClass}" }
IntegrationRunner(destination).run(args)
log.info { "Completed Destination : ${destination.javaClass}" }
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.migrators
import com.google.cloud.bigquery.BigQuery
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlin.math.log
class BigQueryDV2Migration(private val sqlGenerator: BigQuerySqlGenerator, bigQuery: BigQuery) :
Migration<BigQueryDestinationState> {
private val log = KotlinLogging.logger {}
private val legacyV1V2migrator = BigQueryV1V2Migrator(bigQuery, BigQuerySQLNameTransformer())
override fun migrateIfNecessary(
destinationHandler: DestinationHandler<BigQueryDestinationState>,
stream: StreamConfig,
state: DestinationInitialStatus<BigQueryDestinationState>
): Migration.MigrationResult<BigQueryDestinationState> {
log.info { "Initializing DV2 Migration check" }
legacyV1V2migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream)
// Invalidate state because rawTableExists could be false but we don't use it yet for
// anything ?
return Migration.MigrationResult(BigQueryDestinationState(false), true)
}
}

View File

@@ -0,0 +1,18 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.migrators
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
data class BigQueryDestinationState(private val needsSoftReset: Boolean) : MinimumDestinationState {
override fun needsSoftReset(): Boolean {
return needsSoftReset
}
@Suppress("UNCHECKED_CAST")
override fun <T : MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T {
return copy(needsSoftReset = needsSoftReset) as T
}
}

View File

@@ -0,0 +1,112 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.operation
import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.BigQueryException
import com.google.cloud.bigquery.FormatOptions
import com.google.cloud.bigquery.JobId
import com.google.cloud.bigquery.JobInfo
import com.google.cloud.bigquery.TableDataWriteChannel
import com.google.cloud.bigquery.TableId
import com.google.cloud.bigquery.WriteChannelConfiguration
import com.google.common.util.concurrent.RateLimiter
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.destination.bigquery.BigQueryUtils
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter.SCHEMA_V2
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.stream.Stream
private val log = KotlinLogging.logger {}
class BigQueryDirectLoadingStorageOperation(
bigquery: BigQuery,
private val bigQueryClientChunkSize: Int?,
private val bigQueryRecordFormatter: BigQueryRecordFormatter,
sqlGenerator: BigQuerySqlGenerator,
destinationHandler: BigQueryDestinationHandler,
datasetLocation: String
) :
BigQueryStorageOperation<Stream<PartialAirbyteMessage>>(
bigquery,
sqlGenerator,
destinationHandler,
datasetLocation,
) {
private val rateLimiter: RateLimiter = RateLimiter.create(0.07)
companion object {
private const val HTTP_STATUS_CODE_FORBIDDEN = 403
private const val HTTP_STATUS_CODE_NOT_FOUND = 404
private val CONFIG_ERROR_MSG =
"""
|Failed to write to destination schema.
| 1. Make sure you have all required permissions for writing to the schema.
| 2. Make sure that the actual destination schema's location corresponds to location provided in connector's config.
| 3. Try to change the "Destination schema" from "Mirror Source Structure" (if it's set) tp the "Destination Default" option.
|More details:
|""".trimMargin()
}
override fun writeToStage(streamId: StreamId, data: Stream<PartialAirbyteMessage>) {
// TODO: why do we need ratelimiter, and using unstable API from Google's guava
rateLimiter.acquire()
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
log.info { "Writing data to table $tableId with schema $SCHEMA_V2" }
val writeChannel = initWriteChannel(tableId)
writeChannel.use {
data.forEach { record ->
val byteArray =
"${bigQueryRecordFormatter.formatRecord(record)} ${System.lineSeparator()}".toByteArray(
StandardCharsets.UTF_8,
)
it.write(ByteBuffer.wrap(byteArray))
}
}
log.info { "Writing to channel completed for $tableId" }
val job = writeChannel.job
BigQueryUtils.waitForJobFinish(job)
}
private fun initWriteChannel(tableId: TableId): TableDataWriteChannel {
val writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId)
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(SCHEMA_V2)
.setFormatOptions(FormatOptions.json())
.build() // new-line delimited json.
val job =
JobId.newBuilder()
.setRandomJob()
.setLocation(datasetLocation)
.setProject(bigquery.options.projectId)
.build()
val writer: TableDataWriteChannel
try {
writer = bigquery.writer(job, writeChannelConfiguration)
} catch (e: BigQueryException) {
if (e.code == HTTP_STATUS_CODE_FORBIDDEN || e.code == HTTP_STATUS_CODE_NOT_FOUND) {
throw ConfigErrorException(CONFIG_ERROR_MSG + e)
} else {
throw BigQueryException(e.code, e.message)
}
}
// this this optional value. If not set - use default client's value (15MiG)
if (bigQueryClientChunkSize != null) {
writer.setChunkSize(bigQueryClientChunkSize)
}
return writer
}
}

View File

@@ -0,0 +1,143 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.operation
import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.BigQueryException
import com.google.cloud.bigquery.FormatOptions
import com.google.cloud.bigquery.Job
import com.google.cloud.bigquery.JobInfo
import com.google.cloud.bigquery.LoadJobConfiguration
import com.google.cloud.bigquery.TableId
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer
import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.destination.bigquery.BigQueryUtils
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
private val log = KotlinLogging.logger {}
class BigQueryGcsStorageOperation(
private val gcsStorageOperations: GcsStorageOperations,
private val gcsConfig: GcsDestinationConfig,
private val gcsNameTransformer: GcsNameTransformer,
private val keepStagingFiles: Boolean,
bigquery: BigQuery,
sqlGenerator: BigQuerySqlGenerator,
destinationHandler: BigQueryDestinationHandler
) :
BigQueryStorageOperation<SerializableBuffer>(
bigquery,
sqlGenerator,
destinationHandler,
datasetLocation = gcsConfig.bucketRegion!!
) {
private val connectionId = UUID.randomUUID()
private val syncDateTime = DateTime.now(DateTimeZone.UTC)
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
super.prepareStage(streamId, destinationSyncMode)
// prepare staging bucket
log.info { "Creating bucket ${gcsConfig.bucketName}" }
gcsStorageOperations.createBucketIfNotExists()
}
override fun cleanupStage(streamId: StreamId) {
if (keepStagingFiles) return
val stagingRootPath = stagingRootPath(streamId)
log.info { "Cleaning up staging path at $stagingRootPath" }
gcsStorageOperations.dropBucketObject(stagingRootPath)
}
override fun writeToStage(streamId: StreamId, data: SerializableBuffer) {
val stagedFileName: String = uploadRecordsToStage(streamId, data)
copyIntoTableFromStage(streamId, stagedFileName)
}
private fun uploadRecordsToStage(streamId: StreamId, buffer: SerializableBuffer): String {
val objectPath: String = stagingFullPath(streamId)
log.info {
"Uploading records to for ${streamId.rawNamespace}.${streamId.rawName} to path $objectPath"
}
return gcsStorageOperations.uploadRecordsToBucket(buffer, streamId.rawNamespace, objectPath)
}
private fun copyIntoTableFromStage(streamId: StreamId, stagedFileName: String) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
val stagingPath = stagingFullPath(streamId)
val fullFilePath = "gs://${gcsConfig.bucketName}/$stagingPath$stagedFileName"
log.info { "Uploading records from file $fullFilePath to target Table $tableId" }
val configuration =
LoadJobConfiguration.builder(tableId, fullFilePath)
.setFormatOptions(FormatOptions.csv())
.setSchema(BigQueryRecordFormatter.SCHEMA_V2)
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
.setJobTimeoutMs(600000L) // 10 min
.build()
val loadJob: Job = this.bigquery.create(JobInfo.of(configuration))
log.info {
"[${loadJob.jobId}] Created a new job to upload record(s) to target table $tableId: $loadJob"
}
try {
BigQueryUtils.waitForJobFinish(loadJob)
log.info {
"[${loadJob.jobId}] Target table $tableId is successfully appended with staging files"
}
} catch (e: BigQueryException) {
throw RuntimeException(
String.format(
"[%s] Failed to upload staging files to destination table %s",
loadJob.jobId,
tableId
),
e
)
} catch (e: InterruptedException) {
throw RuntimeException(
String.format(
"[%s] Failed to upload staging files to destination table %s",
loadJob.jobId,
tableId
),
e
)
}
}
private fun stagingFullPath(streamId: StreamId): String {
return gcsNameTransformer.applyDefaultCase(
String.format(
"%s/%s/%02d/%02d/%02d/%s/",
stagingRootPath(streamId),
syncDateTime.year().get(),
syncDateTime.monthOfYear().get(),
syncDateTime.dayOfMonth().get(),
syncDateTime.hourOfDay().get(),
connectionId
)
)
}
private fun stagingRootPath(streamId: StreamId): String {
return gcsNameTransformer.applyDefaultCase(
String.format(
"%s/%s_%s",
gcsConfig.bucketPath,
gcsNameTransformer.convertStreamName(streamId.rawNamespace),
gcsNameTransformer.convertStreamName(streamId.rawName)
)
)
}
}

View File

@@ -0,0 +1,114 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.bigquery.operation
import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.TableId
import io.airbyte.integrations.base.destination.operation.StorageOperation
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil
import io.airbyte.integrations.destination.bigquery.BigQueryUtils
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
private val log = KotlinLogging.logger {}
abstract class BigQueryStorageOperation<Data>(
protected val bigquery: BigQuery,
private val sqlGenerator: BigQuerySqlGenerator,
private val destinationHandler: BigQueryDestinationHandler,
protected val datasetLocation: String
) : StorageOperation<Data> {
private val existingSchemas = ConcurrentHashMap.newKeySet<String>()
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
// Prepare staging table. For overwrite, it does drop-create so we can skip explicit create.
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
truncateStagingTable(streamId)
} else {
createStagingTable(streamId)
}
}
private fun createStagingTable(streamId: StreamId) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
BigQueryUtils.createPartitionedTableIfNotExists(
bigquery,
tableId,
BigQueryRecordFormatter.SCHEMA_V2
)
}
private fun dropStagingTable(streamId: StreamId) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
bigquery.delete(tableId)
}
/**
* "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where
* the table's partition filter must be turned off to truncate. Since deleting a table is a free
* operation this option re-uses functions that already exist
*/
private fun truncateStagingTable(streamId: StreamId) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
log.info { "Truncating raw table $tableId" }
dropStagingTable(streamId)
createStagingTable(streamId)
}
override fun cleanupStage(streamId: StreamId) {
log.info { "Nothing to cleanup in stage for Streaming inserts" }
}
abstract override fun writeToStage(streamId: StreamId, data: Data)
override fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) {
destinationHandler.execute(sqlGenerator.createTable(streamConfig, suffix, replace))
}
override fun softResetFinalTable(streamConfig: StreamConfig) {
TyperDeduperUtil.executeSoftReset(
sqlGenerator = sqlGenerator,
destinationHandler = destinationHandler,
streamConfig,
)
}
override fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) {
if (tmpTableSuffix.isNotBlank()) {
log.info {
"Overwriting table ${streamConfig.id.finalTableId(BigQuerySqlGenerator.QUOTE)} with ${
streamConfig.id.finalTableId(
BigQuerySqlGenerator.QUOTE,
tmpTableSuffix,
)
}"
}
destinationHandler.execute(
sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix)
)
}
}
override fun typeAndDedupe(
streamConfig: StreamConfig,
maxProcessedTimestamp: Optional<Instant>,
finalTableSuffix: String
) {
TyperDeduperUtil.executeTypeAndDedupe(
sqlGenerator = sqlGenerator,
destinationHandler = destinationHandler,
streamConfig,
maxProcessedTimestamp,
finalTableSuffix,
)
}
}

View File

@@ -135,12 +135,6 @@ class BigQueryDestinationTest {
private AmazonS3 s3Client;
/*
* TODO: Migrate all BigQuery Destination configs (GCS, Denormalized, Normalized) to no longer use
* #partitionIfUnpartitioned then recombine Base Provider. The reason for breaking this method into
* a base class is because #testWritePartitionOverUnpartitioned is no longer used only in GCS
* Staging
*/
private Stream<Arguments> successTestConfigProviderBase() {
return Stream.of(
Arguments.of("config"),

View File

@@ -91,12 +91,8 @@ public abstract class AbstractBigQueryTypingDedupingTest extends BaseTypingDedup
return new BigQuerySqlGenerator(getConfig().get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), null);
}
/**
* Run a sync using 1.9.0 (which is the highest version that still creates v2 raw tables with JSON
* _airbyte_data). Then run a sync using our current version.
*/
@Test
public void testRawTableJsonToStringMigration() throws Exception {
public void testV1V2Migration() throws Exception {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
@@ -109,23 +105,19 @@ public abstract class AbstractBigQueryTypingDedupingTest extends BaseTypingDedup
// First sync
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages.jsonl");
runSync(catalog, messages1, "airbyte/destination-bigquery:1.9.0", config -> {
runSync(catalog, messages1, "airbyte/destination-bigquery:1.10.2", config -> {
// Defensive to avoid weird behaviors or test failures if the original config is being altered by
// another thread, thanks jackson for a mutable JsonNode
JsonNode copiedConfig = Jsons.clone(config);
if (config instanceof ObjectNode) {
// Add opt-in T+D flag for older version. this is removed in newer version of the spec.
((ObjectNode) copiedConfig).put("use_1s1t_format", true);
// Opt out of T+D to run old V1 sync
((ObjectNode) copiedConfig).put("use_1s1t_format", false);
}
return copiedConfig;
});
// 1.9.0 is known-good, but we might as well check that we're in good shape before continuing.
// If this starts erroring out because we added more test records and 1.9.0 had a latent bug,
// just delete these three lines :P
final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());
// The record differ code is already adapted to V2 columns format, use the post V2 sync
// to verify that append mode preserved all the raw records and final records.
// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");

View File

@@ -33,9 +33,9 @@ import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGenerator
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.destination.bigquery.BigQueryConsts;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -57,7 +57,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Execution(ExecutionMode.CONCURRENT)
public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest<MinimumDestinationState.Impl> {
public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest<BigQueryDestinationState> {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGeneratorIntegrationTest.class);

View File

@@ -220,6 +220,7 @@ tutorials:
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.5.0 | 2024-05-17 | [38132](https://github.com/airbytehq/airbyte/pull/38132) | Major rewrite of existing code, Adapting to CDK changes introduced in [38107](https://github.com/airbytehq/airbyte/pull/38107) |
| 2.4.20 | 2024-05-13 | [38131](https://github.com/airbytehq/airbyte/pull/38131) | Cleanup `BigQueryWriteConfig` and reuse `StreamConfig`; Adapt to `StreamConfig` signature changes |
| 2.4.19 | 2024-05-10 | [38125](https://github.com/airbytehq/airbyte/pull/38125) | adopt latest CDK code |
| 2.4.18 | 2024-05-10 | [38111](https://github.com/airbytehq/airbyte/pull/38111) | No functional changes, deleting unused code |