diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java
index cbc13448cce..f8b03cd568e 100644
--- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java
+++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java
@@ -71,7 +71,8 @@ public abstract class BaseTypingDedupingTest {
private static final RecordDiffer DIFFER = new RecordDiffer(
Pair.of("id1", AirbyteProtocolType.INTEGER),
Pair.of("id2", AirbyteProtocolType.INTEGER),
- Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE));
+ Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE),
+ Pair.of("old_cursor", AirbyteProtocolType.INTEGER));
private String randomSuffix;
private JsonNode config;
@@ -488,6 +489,55 @@ public abstract class BaseTypingDedupingTest {
// evolutions)
}
+ /**
+ * Change the cursor column in the second sync to a column that doesn't exist in the first sync.
+ * Verify that we overwrite everything correctly.
+ *
+ * This essentially verifies that the destination connector correctly recognizes NULL cursors as
+ * older than non-NULL cursors.
+ */
+ @Test
+ public void incrementalDedupChangeCursor() throws Exception {
+ JsonNode mangledSchema = SCHEMA.deepCopy();
+ ((ObjectNode) mangledSchema.get("properties")).remove("updated_at");
+ ((ObjectNode) mangledSchema.get("properties")).set(
+ "old_cursor",
+ Jsons.deserialize(
+ """
+ {"type": "integer"}
+ """));
+ ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream()
+ .withSyncMode(SyncMode.INCREMENTAL)
+ .withCursorField(List.of("old_cursor"))
+ .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
+ .withPrimaryKey(List.of(List.of("id1"), List.of("id2")))
+ .withStream(new AirbyteStream()
+ .withNamespace(streamNamespace)
+ .withName(streamName)
+ .withJsonSchema(mangledSchema));
+ final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStream));
+
+ // First sync
+ final List messages1 = readMessages("sync1_cursorchange_messages.jsonl");
+
+ runSync(catalog, messages1);
+
+ final List expectedRawRecords1 = readRecords("sync1_cursorchange_expectedrecords_dedup_raw.jsonl");
+ final List expectedFinalRecords1 = readRecords("sync1_cursorchange_expectedrecords_dedup_final.jsonl");
+ verifySyncResult(expectedRawRecords1, expectedFinalRecords1);
+
+ // Second sync
+ final List messages2 = readMessages("sync2_messages.jsonl");
+ configuredStream.getStream().setJsonSchema(SCHEMA);
+ configuredStream.setCursorField(List.of("updated_at"));
+
+ runSync(catalog, messages2);
+
+ final List expectedRawRecords2 = readRecords("sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl");
+ final List expectedFinalRecords2 = readRecords("sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl");
+ verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
+ }
+
@Test
@Disabled("Not yet implemented")
public void testSyncWithLargeRecordBatch() throws Exception {
diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_final.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_final.jsonl
new file mode 100644
index 00000000000..6ea7612c5ab
--- /dev/null
+++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_final.jsonl
@@ -0,0 +1,3 @@
+{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
+{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
+{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie"}
diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_raw.jsonl
new file mode 100644
index 00000000000..a9bf479e4e3
--- /dev/null
+++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_raw.jsonl
@@ -0,0 +1,3 @@
+{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
+{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
+{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_messages.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_messages.jsonl
new file mode 100644
index 00000000000..e8262c20258
--- /dev/null
+++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_messages.jsonl
@@ -0,0 +1,4 @@
+{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}}
+{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}}
+{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}}
+{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}}
diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl
new file mode 100644
index 00000000000..7fa0d8339a6
--- /dev/null
+++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl
@@ -0,0 +1,3 @@
+{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
+// Charlie wasn't reemitted with updated_at, so it still has a null cursor
+{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "name": "Charlie"}
diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl
new file mode 100644
index 00000000000..a6bd1aee6e2
--- /dev/null
+++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl
@@ -0,0 +1,4 @@
+{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
+{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}
+// Charlie wasn't reemitted in sync2. This record still has an old_cursor value.
+{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile
index 5da3a4a4c42..8723ee66483 100644
--- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile
+++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile
@@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery
COPY --from=build /airbyte /airbyte
-LABEL io.airbyte.version=1.5.7
+LABEL io.airbyte.version=1.5.8
LABEL io.airbyte.name=airbyte/destination-bigquery
ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml
index db9015c2a05..916c43a7868 100644
--- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml
+++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml
@@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
- dockerImageTag: 1.5.7
+ dockerImageTag: 1.5.8
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java
index 8ce8e4108e3..0940075d21a 100644
--- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java
+++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java
@@ -503,7 +503,7 @@ public class BigQuerySqlGenerator implements SqlGenerator {
`_airbyte_raw_id` IN (
SELECT `_airbyte_raw_id` FROM (
SELECT `_airbyte_raw_id`, row_number() OVER (
- PARTITION BY ${pk_list} ORDER BY ${cursor_name} DESC, `_airbyte_extracted_at` DESC
+ PARTITION BY ${pk_list} ORDER BY ${cursor_name} DESC NULLS LAST, `_airbyte_extracted_at` DESC
) as row_number FROM ${final_table_id}
)
WHERE row_number != 1
diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java
index 43d365f7d8e..3c81bfeb8f8 100644
--- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java
+++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java
@@ -304,12 +304,17 @@ public class BigQuerySqlGeneratorIntegrationTest {
INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES
(JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}', 'd7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z'),
(JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'),
- (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z');
+ (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'),
+ (JSON'{"id": 3, "string": "Charlie", "integer": 123}', '22af9e56-7ebb-4f5f-ae6b-6ba53360e41e', '2023-01-01T00:00:00Z'),
+ (JSON'{"id": 3, "updated_at": "2023-01-01T04:00:00Z", "string": "Charlie", "integer": 456}', '0f2375ac-94c1-4be4-99d8-06db40a8ce3e', '2023-01-01T00:00:00Z');
INSERT INTO ${dataset}.users_final (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `updated_at`, `string`, `struct`, `integer`) values
('d7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z', JSON'{"errors":[]}', 1, '2023-01-01T01:00:00Z', 'Alice', JSON'{"city": "San Francisco", "state": "CA"}', 42),
('80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z', JSON'{"errors":[]}', 1, '2023-01-01T02:00:00Z', 'Alice', JSON'{"city": "San Diego", "state": "CA"}', 84),
- ('ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z', JSON'{"errors": ["blah blah integer"]}', 2, '2023-01-01T03:00:00Z', 'Bob', NULL, NULL);
+ ('ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z', JSON'{"errors": ["blah blah integer"]}', 2, '2023-01-01T03:00:00Z', 'Bob', NULL, NULL),
+ -- cursor=NULL should be discarded in favor of cursor=
+ ('22af9e56-7ebb-4f5f-ae6b-6ba53360e41e', '2023-01-01T00:00:00Z', JSON'{"errors": []}', 3, NULL, 'Charlie', NULL, 123),
+ ('0f2375ac-94c1-4be4-99d8-06db40a8ce3e', '2023-01-01T00:00:00Z', JSON'{"errors": []}', 3, '2023-01-01T04:00:00Z', 'Charlie', NULL, 456);
"""))
.build());
@@ -340,6 +345,17 @@ public class BigQuerySqlGeneratorIntegrationTest {
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_meta": {"errors":["blah blah integer"]}
}
+ """),
+ Jsons.deserialize(
+ """
+ {
+ "id": 3,
+ "updated_at": "2023-01-01T04:00:00Z",
+ "string": "Charlie",
+ "integer": 456,
+ "_airbyte_extracted_at": "2023-01-01T00:00:00Z",
+ "_airbyte_meta": {"errors":[]}
+ }
""")),
toJsonRecords(result));
}
diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md
index 5a1413148f3..e144d8c3015 100644
--- a/docs/integrations/destinations/bigquery.md
+++ b/docs/integrations/destinations/bigquery.md
@@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
+| 1.5.8 | 2023-07-25 | [\#28721](https://github.com/airbytehq/airbyte/pull/28721) | Destinations v2: Handle cursor change across syncs |
| 1.5.7 | 2023-07-24 | [\#28625](https://github.com/airbytehq/airbyte/pull/28625) | Destinations v2: Limit Clustering Columns to 4 |
| 1.5.6 | 2023-07-21 | [\#28580](https://github.com/airbytehq/airbyte/pull/28580) | Destinations v2: Create dataset in user-specified location |
| 1.5.5 | 2023-07-20 | [\#28490](https://github.com/airbytehq/airbyte/pull/28490) | Destinations v2: Fix schema change detection in OVERWRITE mode when existing table is empty; other code refactoring |