destination-postgres: add refreshes (#42540)
This is the original PR, with all known bugs fixed. Some tests were added in #42514, and some were parameterized here. This has also been tested with our test connection, and the following was run in our postgres cluster: before the version upgrade: ``` postgres=> select 'products' stream, (select count(*) from airbyte_internal.dest_v2_perf_raw__stream_products ) raw_count, (select count(*) from dest_v2_perf.products) final_count, (select max(_airbyte_extracted_at) from dest_v2_perf.products ) max_extracted_at union all select 'purchases' stream, (select count(*) from airbyte_internal.dest_v2_perf_raw__stream_purchases ) raw_count, (select count(*) from dest_v2_perf.purchases) final_count, (select max(_airbyte_extracted_at) from dest_v2_perf.purchases ) max_extracted_at union all select 'users' stream, (select count(*) from airbyte_internal.dest_v2_perf_raw__stream_users ) raw_count, (select count(*) from dest_v2_perf.users) final_count, (select max(_airbyte_extracted_at) from dest_v2_perf.users ) max_extracted_at; stream | raw_count | final_count | max_extracted_at -----------+-----------+-------------+---------------------------- products | 700 | 700 | 2024-07-30 01:03:07.739+00 purchases | 700 | 100 | 2024-07-30 01:03:07.769+00 users | 100 | 100 | 2024-07-30 01:03:07.695+00 (3 rows) ``` after the version upgrade (sync1): ``` postgres=> select 'products' stream, (select count(*) from airbyte_internal.dest_v2_perf_raw__stream_products ) raw_count, (select count(*) from dest_v2_perf.products) final_count, (select max(_airbyte_extracted_at) from dest_v2_perf.products ) max_extracted_at union all select 'purchases' stream, (select count(*) from airbyte_internal.dest_v2_perf_raw__stream_purchases ) raw_count, (select count(*) from dest_v2_perf.purchases) final_count, (select max(_airbyte_extracted_at) from dest_v2_perf.purchases ) max_extracted_at union all select 'users' stream, (select count(*) from airbyte_internal.dest_v2_perf_raw__stream_users ) raw_count, (select count(*) from dest_v2_perf.users) final_count, (select max(_airbyte_extracted_at) from dest_v2_perf.users ) max_extracted_at; stream | raw_count | final_count | max_extracted_at -----------+-----------+-------------+---------------------------- products | 800 | 800 | 2024-07-30 01:07:44.094+00 purchases | 800 | 100 | 2024-07-30 01:07:44.128+00 users | 100 | 100 | 2024-07-30 01:07:44.052+00 (3 rows) postgres=> select 'users' name, count(*) ct, _airbyte_generation_id gen_id from dest_v2_perf.users group by 1, 3 union all select 'purchases', count(*), _airbyte_generation_id from dest_v2_perf.purchases group by 1, 3 union all select 'products' name, count(*) ct, _airbyte_generation_id gen_id from dest_v2_perf.products group by 1, 3 ; name | ct | gen_id -----------+-----+-------- users | 100 | 8 purchases | 100 | 0 products | 700 | products | 100 | 0 (4 rows) postgres=> select 'users' name, count(*) ct, _airbyte_generation_id gen_id from airbyte_internal.dest_v2_perf_raw__stream_users group by 1, 3 union all select 'purchases', count(*), _airbyte_generation_id from airbyte_internal.dest_v2_perf_raw__stream_purchases group by 1, 3 union all select 'products' name, count(*) ct, _airbyte_generation_id gen_id from airbyte_internal.dest_v2_perf_raw__stream_products group by 1, 3 ; name | ct | gen_id -----------+-----+-------- users | 100 | 8 purchases | 700 | purchases | 100 | 0 products | 100 | 0 products | 700 | (5 rows) postgres=> ``` after the connector upgrade (sync2): ``` postgres=> select 'products' stream, (select count(*) from airbyte_internal.dest_v2_perf_raw__stream_products ) raw_count, (select count(*) from dest_v2_perf.products) final_count, (select max(_airbyte_extracted_at) from dest_v2_perf.products ) max_extracted_at union all select 'purchases' stream, (select count(*) from airbyte_internal.dest_v2_perf_raw__stream_purchases ) raw_count, (select count(*) from dest_v2_perf.purchases) final_count, (select max(_airbyte_extracted_at) from dest_v2_perf.purchases ) max_extracted_at union all select 'users' stream, (select count(*) from airbyte_internal.dest_v2_perf_raw__stream_users ) raw_count, (select count(*) from dest_v2_perf.users) final_count, (select max(_airbyte_extracted_at) from dest_v2_perf.users ) max_extracted_at; stream | raw_count | final_count | max_extracted_at -----------+-----------+-------------+---------------------------- products | 900 | 900 | 2024-07-30 01:12:57.448+00 purchases | 900 | 100 | 2024-07-30 01:12:57.475+00 users | 100 | 100 | 2024-07-30 01:12:57.421+00 (3 rows) postgres=> select 'users' name, count(*) ct, _airbyte_generation_id gen_id from dest_v2_perf.users group by 1, 3 union all select 'purchases', count(*), _airbyte_generation_id from dest_v2_perf.purchases group by 1, 3 union all select 'products' name, count(*) ct, _airbyte_generation_id gen_id from dest_v2_perf.products group by 1, 3 ; name | ct | gen_id -----------+-----+-------- users | 100 | 9 purchases | 100 | 0 products | 200 | 0 products | 700 | (4 rows) postgres=> select 'users' name, count(*) ct, _airbyte_generation_id gen_id from airbyte_internal.dest_v2_perf_raw__stream_users group by 1, 3 union all select 'purchases', count(*), _airbyte_generation_id from airbyte_internal.dest_v2_perf_raw__stream_purchases group by 1, 3 union all select 'products' name, count(*) ct, _airbyte_generation_id gen_id from airbyte_internal.dest_v2_perf_raw__stream_products group by 1, 3 ; name | ct | gen_id -----------+-----+-------- users | 100 | 9 purchases | 700 | purchases | 200 | 0 products | 200 | 0 products | 700 | (5 rows) postgres=> ```
This commit is contained in:
@@ -267,6 +267,7 @@ _where_ it is deployed.
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------|
|
||||
| 2.3.0 | 2024-07-22 | [\#41954](https://github.com/airbytehq/airbyte/pull/41954) | Support for [refreshes](../../operator-guides/refreshes.md) and resumable full refresh. WARNING: You must upgrade to platform 0.63.7 before upgrading to this connector version. |
|
||||
| 2.2.1 | 2024-07-22 | [\#42423](https://github.com/airbytehq/airbyte/pull/42423) | no-op. Bumping to a clean image |
|
||||
| 2.2.0 | 2024-07-22 | [\#42423](https://github.com/airbytehq/airbyte/pull/42423) | Revert refreshes support |
|
||||
| 2.1.1 | 2024-07-22 | [\#42415](https://github.com/airbytehq/airbyte/pull/42415) | fixing PostgresSqlOperations.isOtherGenerationIdInTable to close the streams coming from JdbcDatabase.unsafeQuery |
|
||||
|
||||
Reference in New Issue
Block a user