🐛 Destination Bigquery - fix migration logic (#29461)
This commit is contained in:
@@ -41,6 +41,6 @@ public final class JavaBaseConstants {
|
||||
COLUMN_NAME_AB_EXTRACTED_AT,
|
||||
COLUMN_NAME_AB_META);
|
||||
|
||||
public static final String AIRBYTE_NAMESPACE_SCHEMA = "airbyte";
|
||||
public static final String DEFAULT_AIRBYTE_INTERNAL_NAMESPACE = "airbyte_internal";
|
||||
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator {
|
||||
|
||||
Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);
|
||||
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);
|
||||
|
||||
@Override
|
||||
public void migrateIfNecessary(
|
||||
@@ -23,10 +23,14 @@ public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implem
|
||||
final DestinationHandler destinationHandler,
|
||||
final StreamConfig streamConfig)
|
||||
throws TableNotMigratedException, UnexpectedSchemaException {
|
||||
LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName());
|
||||
if (shouldMigrate(streamConfig)) {
|
||||
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
|
||||
migrate(sqlGenerator, destinationHandler, streamConfig);
|
||||
} else {
|
||||
LOGGER.info("No Migration Required for stream: {}", streamConfig.id().finalName());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -37,9 +41,13 @@ public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implem
|
||||
*/
|
||||
protected boolean shouldMigrate(final StreamConfig streamConfig) {
|
||||
final var v1RawTable = convertToV1RawName(streamConfig);
|
||||
return isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode())
|
||||
&& !doesValidV2RawTableAlreadyExist(streamConfig)
|
||||
&& doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
|
||||
LOGGER.info("Checking whether v1 raw table {} in dataset {} exists", v1RawTable.tableName(), v1RawTable.namespace());
|
||||
final var syncModeNeedsMigration = isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode());
|
||||
final var noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig);
|
||||
final var aValidV1RawTableExists = doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
|
||||
LOGGER.info("Migration Info: Required for Sync mode: {}, No existing v2 raw tables: {}, A v1 raw table exists: {}",
|
||||
syncModeNeedsMigration, noValidV2RawTableExists, aValidV1RawTableExists);
|
||||
return syncModeNeedsMigration && noValidV2RawTableExists && aValidV1RawTableExists;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -55,7 +63,7 @@ public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implem
|
||||
final StreamConfig streamConfig)
|
||||
throws TableNotMigratedException {
|
||||
final var namespacedTableName = convertToV1RawName(streamConfig);
|
||||
final var migrateAndReset = String.join("\n",
|
||||
final var migrateAndReset = String.join("\n\n",
|
||||
sqlGenerator.migrateFromV1toV2(streamConfig.id(), namespacedTableName.namespace(),
|
||||
namespacedTableName.tableName()),
|
||||
sqlGenerator.softReset(streamConfig));
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
|
||||
package io.airbyte.integrations.base.destination.typing_deduping;
|
||||
|
||||
import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
|
||||
|
||||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
|
||||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
|
||||
import java.util.ArrayList;
|
||||
@@ -15,12 +17,11 @@ import org.apache.commons.codec.digest.DigestUtils;
|
||||
|
||||
public class CatalogParser {
|
||||
|
||||
public static final String DEFAULT_RAW_TABLE_NAMESPACE = "airbyte_internal";
|
||||
private final SqlGenerator<?> sqlGenerator;
|
||||
private final String rawNamespace;
|
||||
|
||||
public CatalogParser(final SqlGenerator<?> sqlGenerator) {
|
||||
this(sqlGenerator, DEFAULT_RAW_TABLE_NAMESPACE);
|
||||
this(sqlGenerator, DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
|
||||
}
|
||||
|
||||
public CatalogParser(final SqlGenerator<?> sqlGenerator, final String rawNamespace) {
|
||||
|
||||
@@ -73,7 +73,8 @@ public class DestinationV1V2MigratorTest {
|
||||
final var sqlGenerator = new MockSqlGenerator();
|
||||
final StreamConfig stream = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null);
|
||||
final DestinationHandler<String> handler = Mockito.mock(DestinationHandler.class);
|
||||
final var sql = String.join("\n", sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table"), sqlGenerator.softReset(stream));
|
||||
final var sql = String.join("\n\n", sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table"),
|
||||
sqlGenerator.softReset(stream));
|
||||
// All is well
|
||||
final var migrator = noIssuesMigrator();
|
||||
migrator.migrate(sqlGenerator, handler, stream);
|
||||
|
||||
@@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery
|
||||
|
||||
COPY --from=build /airbyte /airbyte
|
||||
|
||||
LABEL io.airbyte.version=1.7.7
|
||||
LABEL io.airbyte.version=1.7.8
|
||||
LABEL io.airbyte.name=airbyte/destination-bigquery
|
||||
|
||||
ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
|
||||
|
||||
@@ -2,7 +2,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: destination
|
||||
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
|
||||
dockerImageTag: 1.7.7
|
||||
dockerImageTag: 1.7.8
|
||||
dockerRepository: airbyte/destination-bigquery
|
||||
githubIssueLabel: destination-bigquery
|
||||
icon: bigquery.svg
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
package io.airbyte.integrations.destination.bigquery;
|
||||
|
||||
import static io.airbyte.integrations.base.JavaBaseConstants.AIRBYTE_NAMESPACE_SCHEMA;
|
||||
import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.base.Functions;
|
||||
@@ -154,11 +154,15 @@ public class BigQueryStagingConsumerFactory {
|
||||
final TyperDeduper typerDeduper) {
|
||||
return () -> {
|
||||
LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size());
|
||||
if (TypingAndDedupingFlag.isDestinationV2()) {
|
||||
typerDeduper.prepareTables();
|
||||
}
|
||||
for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) {
|
||||
LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}",
|
||||
writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName());
|
||||
writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName()
|
||||
);
|
||||
// In Destinations V2, we will always use the 'airbyte' schema/namespace for raw tables
|
||||
final String rawDatasetId = TypingAndDedupingFlag.isDestinationV2() ? AIRBYTE_NAMESPACE_SCHEMA : writeConfig.datasetId();
|
||||
final String rawDatasetId = TypingAndDedupingFlag.isDestinationV2() ? DEFAULT_AIRBYTE_INTERNAL_NAMESPACE : writeConfig.datasetId();
|
||||
// Regardless, ensure the schema the customer wants to write to exists
|
||||
bigQueryGcsOperations.createSchemaIfNotExists(writeConfig.datasetId(), writeConfig.datasetLocation());
|
||||
// Schema used for raw and airbyte internal tables
|
||||
@@ -174,7 +178,6 @@ public class BigQueryStagingConsumerFactory {
|
||||
bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, writeConfig.targetTableId(), writeConfig.tableSchema());
|
||||
}
|
||||
}
|
||||
typerDeduper.prepareTables();
|
||||
LOGGER.info("Preparing tables in destination completed.");
|
||||
};
|
||||
}
|
||||
|
||||
@@ -54,8 +54,8 @@ public class BigQueryV1V2Migrator extends BaseDestinationV1V2Migrator<TableDefin
|
||||
@Override
|
||||
protected NamespacedTableName convertToV1RawName(StreamConfig streamConfig) {
|
||||
return new NamespacedTableName(
|
||||
this.nameTransformer.getRawTableName(streamConfig.id().originalName()),
|
||||
this.nameTransformer.getNamespace(streamConfig.id().originalNamespace())
|
||||
this.nameTransformer.getNamespace(streamConfig.id().originalNamespace()),
|
||||
this.nameTransformer.getRawTableName(streamConfig.id().originalName())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,9 +7,8 @@ import com.google.cloud.bigquery.DatasetId;
|
||||
import com.google.cloud.bigquery.QueryJobConfiguration;
|
||||
import com.google.cloud.bigquery.TableId;
|
||||
import com.google.cloud.bigquery.TableResult;
|
||||
import io.airbyte.integrations.base.TypingAndDedupingFlag;
|
||||
import io.airbyte.integrations.base.JavaBaseConstants;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
|
||||
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
|
||||
import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils;
|
||||
@@ -71,6 +70,6 @@ public abstract class AbstractBigQueryTypingDedupingTest extends BaseTypingDedup
|
||||
* Subclasses using a config with a nonstandard raw table dataset should override this method.
|
||||
*/
|
||||
protected String getRawDataset() {
|
||||
return CatalogParser.DEFAULT_RAW_TABLE_NAMESPACE;
|
||||
return JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
|
||||
ENV ENABLE_SENTRY true
|
||||
|
||||
|
||||
LABEL io.airbyte.version=1.3.2
|
||||
LABEL io.airbyte.version=1.3.3
|
||||
LABEL io.airbyte.name=airbyte/destination-snowflake
|
||||
|
||||
ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
|
||||
|
||||
@@ -2,7 +2,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: destination
|
||||
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
|
||||
dockerImageTag: 1.3.2
|
||||
dockerImageTag: 1.3.3
|
||||
dockerRepository: airbyte/destination-snowflake
|
||||
githubIssueLabel: destination-snowflake
|
||||
icon: snowflake.svg
|
||||
|
||||
@@ -7,8 +7,8 @@ import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.db.factory.DataSourceFactory;
|
||||
import io.airbyte.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.db.jdbc.JdbcUtils;
|
||||
import io.airbyte.integrations.base.JavaBaseConstants;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
|
||||
import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts;
|
||||
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabase;
|
||||
@@ -86,7 +86,7 @@ public abstract class AbstractSnowflakeTypingDedupingTest extends BaseTypingDedu
|
||||
* Subclasses using a config with a nonstandard raw table schema should override this method.
|
||||
*/
|
||||
protected String getRawSchema() {
|
||||
return CatalogParser.DEFAULT_RAW_TABLE_NAMESPACE;
|
||||
return JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
|
||||
}
|
||||
|
||||
private String getDefaultSchema() {
|
||||
|
||||
@@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
|
||||
| 1.7.8 | 2023-08-15 | [\#29461](https://github.com/airbytehq/airbyte/pull/29461) | Migration BugFix - ensure migration happens before table creation for GCS staging. |
|
||||
| 1.7.7 | 2023-08-11 | [\#29381](https://github.com/airbytehq/airbyte/pull/29381) | Destinations v2: Add support for streams with no columns |
|
||||
| 1.7.6 | 2023-08-04 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Add v1 -> v2 migration Logic |
|
||||
| 1.7.5 | 2023-08-04 | [\#29106](https://github.com/airbytehq/airbyte/pull/29106) | Destinations v2: handle unusual CDC deletion edge case |
|
||||
|
||||
@@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 1.3.3 | 2023-08-15 | [\#29461](https://github.com/airbytehq/airbyte/pull/29461) | Changing a static constant reference |
|
||||
| 1.3.2 | 2023-08-11 | [\#29381](https://github.com/airbytehq/airbyte/pull/29381) | Destinations v2: Add support for streams with no columns |
|
||||
| 1.3.1 | 2023-08-04 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Update SqlGenerator |
|
||||
| 1.3.0 | 2023-08-07 | [\#29174](https://github.com/airbytehq/airbyte/pull/29174) | Destinations v2: early access release |
|
||||
|
||||
Reference in New Issue
Block a user