1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Handle ints and longs in normalization (#14362)

* generate airbyte_type:integer

* normalization accepts `airbyte_type: integer`

* handles ints+longs

* update avro for consistency

* delete long type for now, treat all ints as longs

* update avro type mappings

{type:number, airbyte_type:integer} -> long
{type:number, airbyte_type:big_integer} -> string (i.e. "unbounded integer")

* fix test

* remove long handling

* Revert "remove long handling"

This reverts commit 33ade8d2831e675c3545ac6019d200ec312e54d9.

* Revert "update avro type mappings"

This reverts commit 5b0349badad7545efe8e1191291a628445fe1c84.

* Revert "delete long type for now, treat all ints as longs"

This reverts commit 018efd4a5d0c59f392fd8e3b0d0967c666b72947.

* Revert "update avro for consistency"

This reverts commit bcf47c6799b5906deb4f219d7f6e64ea73b41b74.

* newline@eof

* update test

* slightly better local tests

* fix test

* missed a few cases

* postgres tests use correct hostnames

* fix normalization

* fix int macro

* add test case

* normalization test output

* handle int/long correctly

* fix types for other DBs

* uint32 -> bigint; tests

* add type value assertions

* more test updates

* regenerate output

* reconcile big_integer to match docs

* update comment

* fix type

* fix mysql constructor call

* bigint only has 38 digits

* fix s3 ints, fix DAT test case

* big_integer should be string

* reduce to 28 digit big_ints

* fix test setup, mysql

* kill big_integer tests

* regenerate output

* version bumps

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Edward Gao
2022-07-26 16:40:14 -07:00
committed by GitHub
parent d465461c13
commit b2dd470d3d
143 changed files with 1247 additions and 615 deletions

View File

@@ -96,7 +96,7 @@
- name: BigQuery
sourceDefinitionId: bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c
dockerRepository: airbyte/source-bigquery
dockerImageTag: 0.1.9
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.io/integrations/sources/bigquery
icon: bigquery.svg
sourceType: database
@@ -604,7 +604,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.5.17
dockerImageTag: 0.6.0
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
@@ -754,7 +754,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.37
dockerImageTag: 0.4.38
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
@@ -961,7 +961,7 @@
- name: TiDB
sourceDefinitionId: 0dad1a35-ccf8-4d03-b73e-6788c00b13ae
dockerRepository: airbyte/source-tidb
dockerImageTag: 0.1.5
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.io/integrations/sources/tidb
icon: tidb.svg
sourceType: database

View File

@@ -829,7 +829,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-bigquery:0.1.9"
- dockerImage: "airbyte/source-bigquery:0.2.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/bigquery"
connectionSpecification:
@@ -5755,7 +5755,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.5.17"
- dockerImage: "airbyte/source-mysql:0.6.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
@@ -7040,7 +7040,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.37"
- dockerImage: "airbyte/source-postgres:0.4.38"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
@@ -9343,7 +9343,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-tidb:0.1.5"
- dockerImage: "airbyte/source-tidb:0.2.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/tidb"
connectionSpecification:

View File

@@ -119,7 +119,8 @@ public class BigQuerySourceOperations implements SourceOperations<BigQueryResult
public JsonSchemaType getJsonType(final StandardSQLTypeName bigQueryType) {
return switch (bigQueryType) {
case BOOL -> JsonSchemaType.BOOLEAN;
case INT64, FLOAT64, NUMERIC, BIGNUMERIC -> JsonSchemaType.NUMBER;
case INT64 -> JsonSchemaType.INTEGER;
case FLOAT64, NUMERIC, BIGNUMERIC -> JsonSchemaType.NUMBER;
case STRING, BYTES, TIMESTAMP, DATE, TIME, DATETIME -> JsonSchemaType.STRING;
case ARRAY -> JsonSchemaType.ARRAY;
case STRUCT -> JsonSchemaType.OBJECT;

View File

@@ -102,12 +102,12 @@ public class JdbcSourceOperations extends AbstractJdbcCompatibleSourceOperations
}
@Override
public JsonSchemaType getJsonType(JDBCType jdbcType) {
public JsonSchemaType getJsonType(final JDBCType jdbcType) {
return switch (jdbcType) {
case BIT, BOOLEAN -> JsonSchemaType.BOOLEAN;
case TINYINT, SMALLINT -> JsonSchemaType.NUMBER;
case INTEGER -> JsonSchemaType.NUMBER;
case BIGINT -> JsonSchemaType.NUMBER;
case TINYINT, SMALLINT -> JsonSchemaType.INTEGER;
case INTEGER -> JsonSchemaType.INTEGER;
case BIGINT -> JsonSchemaType.INTEGER;
case FLOAT, DOUBLE -> JsonSchemaType.NUMBER;
case REAL -> JsonSchemaType.NUMBER;
case NUMERIC, DECIMAL -> JsonSchemaType.NUMBER;

View File

@@ -301,9 +301,9 @@ class TestJdbcUtils {
final Map<String, JsonSchemaType> expected = ImmutableMap.<String, JsonSchemaType>builder()
.put("bit", JsonSchemaType.BOOLEAN)
.put("boolean", JsonSchemaType.BOOLEAN)
.put("smallint", JsonSchemaType.NUMBER)
.put("int", JsonSchemaType.NUMBER)
.put("bigint", JsonSchemaType.NUMBER)
.put("smallint", JsonSchemaType.INTEGER)
.put("int", JsonSchemaType.INTEGER)
.put("bigint", JsonSchemaType.INTEGER)
.put("float", JsonSchemaType.NUMBER)
.put("double", JsonSchemaType.NUMBER)
.put("real", JsonSchemaType.NUMBER)

View File

@@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]
LABEL io.airbyte.version=0.2.11
LABEL io.airbyte.name=airbyte/normalization
LABEL io.airbyte.version=0.2.12
LABEL io.airbyte.name=airbyte/normalization

View File

@@ -80,6 +80,10 @@
{# int ------------------------------------------------- #}
{% macro default__type_int() %}
int
{% endmacro %}
{% macro mysql__type_int() %}
signed
{% endmacro %}
@@ -116,6 +120,48 @@
{% endmacro %}
{# very_large_integer --------------------------------------- --#}
{#
Most databases don't have a true unbounded numeric datatype, so we use a really big numeric field.
Our type terminology unfortunately collides with DB terminology (i.e. "big_integer" means different things in different contexts)
so this macro needs to be called very_large_integer.
#}
{%- macro type_very_large_integer() -%}
{{ adapter.dispatch('type_very_large_integer')() }}
{%- endmacro -%}
{% macro default__type_very_large_integer() %}
numeric
{% endmacro %}
{% macro snowflake__type_very_large_integer() %}
numeric
{% endmacro %}
{% macro mysql__type_very_large_integer() %}
decimal(38, 0)
{% endmacro %}
{% macro clickhouse__type_very_large_integer() %}
decimal128(0)
{% endmacro %}
{# timestamp ------------------------------------------------- --#}
{% macro mysql__type_timestamp() %}
time
{% endmacro %}
{%- macro sqlserver__type_timestamp() -%}
{#-- in TSQL timestamp is really datetime --#}
{#-- https://docs.microsoft.com/en-us/sql/t-sql/functions/date-and-time-data-types-and-functions-transact-sql?view=sql-server-ver15#DateandTimeDataTypes --#}
datetime
{%- endmacro -%}
{% macro clickhouse__type_timestamp() %}
DateTime64
{% endmacro %}
{# timestamp with time zone ------------------------------------------------- #}
{%- macro type_timestamp_with_timezone() -%}

View File

@@ -83,3 +83,8 @@ vars:
multiple_column_names_conflicts_stg: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_scd: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts: test_normalization._airbyte_raw_multiple_column_names_conflicts
types_testing_ab1: test_normalization._airbyte_raw_types_testing
types_testing_ab2: test_normalization._airbyte_raw_types_testing
types_testing_stg: test_normalization._airbyte_raw_types_testing
types_testing_scd: test_normalization._airbyte_raw_types_testing
types_testing: test_normalization._airbyte_raw_types_testing

View File

@@ -13,3 +13,4 @@ sources:
- name: _airbyte_raw_multiple_column_names_conflicts
- name: _airbyte_raw_pos_dedup_cdcx
- name: _airbyte_raw_renamed_dedup_cdc_excluded
- name: _airbyte_raw_types_testing

View File

@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
@@ -42,10 +42,10 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
@@ -83,3 +83,8 @@ vars:
multiple_column_names_conflicts_stg: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_scd: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts: test_normalization._airbyte_raw_multiple_column_names_conflicts
types_testing_ab1: test_normalization._airbyte_raw_types_testing
types_testing_ab2: test_normalization._airbyte_raw_types_testing
types_testing_stg: test_normalization._airbyte_raw_types_testing
types_testing_scd: test_normalization._airbyte_raw_types_testing
types_testing: test_normalization._airbyte_raw_types_testing

View File

@@ -13,3 +13,4 @@ sources:
- name: _airbyte_raw_multiple_column_names_conflicts
- name: _airbyte_raw_pos_dedup_cdcx
- name: _airbyte_raw_renamed_dedup_cdc_excluded
- name: _airbyte_raw_types_testing

View File

@@ -79,6 +79,10 @@ vars:
unnest_alias_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias: test_normalization._airbyte_raw_unnest_alias
arrays_ab1: test_normalization._airbyte_raw_arrays
arrays_ab2: test_normalization._airbyte_raw_arrays
arrays_ab3: test_normalization._airbyte_raw_arrays
arrays: test_normalization._airbyte_raw_arrays
nested_stream_with_co_2g_names_partition_ab1: test_normalization._airbyte_raw_nested_s__lting_into_long_names
nested_stream_with_co_2g_names_partition_ab2: test_normalization._airbyte_raw_nested_s__lting_into_long_names
nested_stream_with_co_2g_names_partition_ab3: test_normalization._airbyte_raw_nested_s__lting_into_long_names
@@ -91,6 +95,10 @@ vars:
unnest_alias_children_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children: test_normalization._airbyte_raw_unnest_alias
arrays_nested_array_parent_ab1: test_normalization._airbyte_raw_arrays
arrays_nested_array_parent_ab2: test_normalization._airbyte_raw_arrays
arrays_nested_array_parent_ab3: test_normalization._airbyte_raw_arrays
arrays_nested_array_parent: test_normalization._airbyte_raw_arrays
nested_stream_with_co_3double_array_data_ab1: test_normalization._airbyte_raw_nested_s__lting_into_long_names
nested_stream_with_co_3double_array_data_ab2: test_normalization._airbyte_raw_nested_s__lting_into_long_names
nested_stream_with_co_3double_array_data_ab3: test_normalization._airbyte_raw_nested_s__lting_into_long_names

View File

@@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _airbyte_raw_arrays
- name: _airbyte_raw_conflict_stream_array
- name: _airbyte_raw_conflict_stream_name
- name: _airbyte_raw_conflict_stream_scalar

View File

@@ -79,3 +79,8 @@ vars:
multiple_column_names_conflicts_stg: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_scd: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts: test_normalization._airbyte_raw_multiple_column_names_conflicts
types_testing_ab1: test_normalization._airbyte_raw_types_testing
types_testing_ab2: test_normalization._airbyte_raw_types_testing
types_testing_stg: test_normalization._airbyte_raw_types_testing
types_testing_scd: test_normalization._airbyte_raw_types_testing
types_testing: test_normalization._airbyte_raw_types_testing

View File

@@ -3,41 +3,41 @@
create table
test_normalization.`exchange_rate__dbt_tmp`
as (
with __dbt__cte__exchange_rate_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization._airbyte_raw_exchange_rate
select
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."id"' RETURNING CHAR) as id,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."currency"' RETURNING CHAR) as currency,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."date"' RETURNING CHAR) as `date`,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."timestamp_col"' RETURNING CHAR) as timestamp_col,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."HKD@spéçiäl & characters"' RETURNING CHAR) as `HKD@spéçiäl & characters`,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."HKD_special___characters"' RETURNING CHAR) as hkd_special___characters,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."NZD"' RETURNING CHAR) as nzd,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."USD"' RETURNING CHAR) as usd,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."column___with__quotes"' RETURNING CHAR) as `column__'with"_quotes`,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."datetime_tz"' RETURNING CHAR) as datetime_tz,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."datetime_no_tz"' RETURNING CHAR) as datetime_no_tz,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."time_tz"' RETURNING CHAR) as time_tz,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."time_no_tz"' RETURNING CHAR) as time_no_tz,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP
as _airbyte_normalized_at
from test_normalization._airbyte_raw_exchange_rate as table_alias
@@ -48,7 +48,7 @@ where 1 = 1
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__exchange_rate_ab1
select
cast(id as
cast(id as
signed
) as id,
cast(currency as char(1024)) as currency,
@@ -57,14 +57,14 @@ select
end as `date`
,
cast(nullif(timestamp_col, '') as char(1024)) as timestamp_col,
cast(`HKD@spéçiäl & characters` as
cast(`HKD@spéçiäl & characters` as
float
) as `HKD@spéçiäl & characters`,
cast(hkd_special___characters as char(1024)) as hkd_special___characters,
cast(nzd as
cast(nzd as
float
) as nzd,
cast(usd as
cast(usd as
float
) as usd,
cast(`column__'with"_quotes` as char(1024)) as `column__'with"_quotes`,
@@ -74,12 +74,12 @@ select
end as datetime_no_tz
,
nullif(cast(time_tz as char(1024)), "") as time_tz,
nullif(cast(time_no_tz as
nullif(cast(time_no_tz as
time
), "") as time_no_tz,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__cte__exchange_rate_ab1
@@ -113,7 +113,7 @@ select
time_no_tz,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_exchange_rate_hashid

View File

@@ -13,3 +13,4 @@ sources:
- name: _airbyte_raw_multiple_column_names_conflicts
- name: _airbyte_raw_pos_dedup_cdcx
- name: _airbyte_raw_renamed_dedup_cdc_excluded
- name: _airbyte_raw_types_testing

View File

@@ -3,41 +3,41 @@
create table
test_normalization.`exchange_rate__dbt_tmp`
as (
with __dbt__cte__exchange_rate_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization._airbyte_raw_exchange_rate
select
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."id"' RETURNING CHAR) as id,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."currency"' RETURNING CHAR) as currency,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."date"' RETURNING CHAR) as `date`,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."timestamp_col"' RETURNING CHAR) as timestamp_col,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."HKD@spéçiäl & characters"' RETURNING CHAR) as `HKD@spéçiäl & characters`,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."HKD_special___characters"' RETURNING CHAR) as hkd_special___characters,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."NZD"' RETURNING CHAR) as nzd,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."USD"' RETURNING CHAR) as usd,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."column___with__quotes"' RETURNING CHAR) as `column__'with"_quotes`,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."datetime_tz"' RETURNING CHAR) as datetime_tz,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."datetime_no_tz"' RETURNING CHAR) as datetime_no_tz,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."time_tz"' RETURNING CHAR) as time_tz,
json_value(_airbyte_data,
json_value(_airbyte_data,
'$."time_no_tz"' RETURNING CHAR) as time_no_tz,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP
as _airbyte_normalized_at
from test_normalization._airbyte_raw_exchange_rate as table_alias
@@ -48,7 +48,7 @@ where 1 = 1
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__exchange_rate_ab1
select
cast(id as
cast(id as
signed
) as id,
cast(currency as char(1024)) as currency,
@@ -57,14 +57,14 @@ select
end as `date`
,
cast(nullif(timestamp_col, '') as char(1024)) as timestamp_col,
cast(`HKD@spéçiäl & characters` as
cast(`HKD@spéçiäl & characters` as
float
) as `HKD@spéçiäl & characters`,
cast(hkd_special___characters as char(1024)) as hkd_special___characters,
cast(nzd as
cast(nzd as
float
) as nzd,
cast(usd as
cast(usd as
float
) as usd,
cast(`column__'with"_quotes` as char(1024)) as `column__'with"_quotes`,
@@ -74,12 +74,12 @@ select
end as datetime_no_tz
,
nullif(cast(time_tz as char(1024)), "") as time_tz,
nullif(cast(time_no_tz as
nullif(cast(time_no_tz as
time
), "") as time_no_tz,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__cte__exchange_rate_ab1
@@ -113,7 +113,7 @@ select
time_no_tz,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_exchange_rate_hashid

View File

@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
source-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
data-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
modules-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: false
schema: false
@@ -42,7 +42,7 @@ models:
+materialized: view
vars:
dbt_utils_dispatch_list:
- airbyte_utils
- airbyte_utils
json_column: _airbyte_data
models_to_source:
exchange_rate_ab1: test_normalization.airbyte_raw_exchange_rate
@@ -79,3 +79,8 @@ vars:
multiple_column_names_conflicts_stg: test_normalization.airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_scd: test_normalization.airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts: test_normalization.airbyte_raw_multiple_column_names_conflicts
types_testing_ab1: test_normalization.airbyte_raw_types_testing
types_testing_ab2: test_normalization.airbyte_raw_types_testing
types_testing_stg: test_normalization.airbyte_raw_types_testing
types_testing_scd: test_normalization.airbyte_raw_types_testing
types_testing: test_normalization.airbyte_raw_types_testing

View File

@@ -13,3 +13,4 @@ sources:
- name: airbyte_raw_multiple_column_names_conflicts
- name: airbyte_raw_pos_dedup_cdcx
- name: airbyte_raw_renamed_dedup_cdc_excluded
- name: airbyte_raw_types_testing

View File

@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- modified_models
- modified_models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
@@ -42,10 +42,10 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:

View File

@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
@@ -42,10 +42,10 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
@@ -83,3 +83,8 @@ vars:
multiple_column_names_conflicts_stg: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_scd: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts: test_normalization._airbyte_raw_multiple_column_names_conflicts
types_testing_ab1: test_normalization._airbyte_raw_types_testing
types_testing_ab2: test_normalization._airbyte_raw_types_testing
types_testing_stg: test_normalization._airbyte_raw_types_testing
types_testing_scd: test_normalization._airbyte_raw_types_testing
types_testing: test_normalization._airbyte_raw_types_testing

View File

@@ -0,0 +1,73 @@
create table "postgres".test_normalization."types_testing_scd"
as (
-- depends_on: ref('types_testing_stg')
with
input_data as (
select *
from "postgres"._airbyte_test_normalization."types_testing_stg"
-- types_testing from "postgres".test_normalization._airbyte_raw_types_testing
),
scd_data as (
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
md5(cast(coalesce(cast("id" as text), '') as text)) as _airbyte_unique_key,
"id",
airbyte_integer_column,
nullable_airbyte_integer_column,
_airbyte_emitted_at as _airbyte_start_at,
lag(_airbyte_emitted_at) over (
partition by "id"
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when row_number() over (
partition by "id"
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc
) = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_types_testing_hashid
from input_data
),
dedup_data as (
select
-- we need to ensure de-duplicated rows for merge/update queries
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by
_airbyte_unique_key,
_airbyte_start_at,
_airbyte_emitted_at
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
md5(cast(coalesce(cast(_airbyte_unique_key as text), '') || '-' || coalesce(cast(_airbyte_start_at as text), '') || '-' || coalesce(cast(_airbyte_emitted_at as text), '') as text)) as _airbyte_unique_key_scd,
scd_data.*
from scd_data
)
select
_airbyte_unique_key,
_airbyte_unique_key_scd,
"id",
airbyte_integer_column,
nullable_airbyte_integer_column,
_airbyte_start_at,
_airbyte_end_at,
_airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_types_testing_hashid
from dedup_data where _airbyte_row_num = 1
);

View File

@@ -0,0 +1,24 @@
create table "postgres".test_normalization."types_testing"
as (
-- Final base SQL model
-- depends_on: "postgres".test_normalization."types_testing_scd"
select
_airbyte_unique_key,
"id",
airbyte_integer_column,
nullable_airbyte_integer_column,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_types_testing_hashid
from "postgres".test_normalization."types_testing_scd"
-- types_testing from "postgres".test_normalization._airbyte_raw_types_testing
where 1 = 1
and _airbyte_active_row = 1
);

View File

@@ -0,0 +1,53 @@
create table "postgres"._airbyte_test_normalization."types_testing_stg"
as (
with __dbt__cte__types_testing_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "postgres".test_normalization._airbyte_raw_types_testing
select
jsonb_extract_path_text(_airbyte_data, 'id') as "id",
jsonb_extract_path_text(_airbyte_data, 'airbyte_integer_column') as airbyte_integer_column,
jsonb_extract_path_text(_airbyte_data, 'nullable_airbyte_integer_column') as nullable_airbyte_integer_column,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at
from "postgres".test_normalization._airbyte_raw_types_testing as table_alias
-- types_testing
where 1 = 1
), __dbt__cte__types_testing_ab2 as (
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__types_testing_ab1
select
cast("id" as
bigint
) as "id",
cast(airbyte_integer_column as
bigint
) as airbyte_integer_column,
cast(nullable_airbyte_integer_column as
bigint
) as nullable_airbyte_integer_column,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at
from __dbt__cte__types_testing_ab1
-- types_testing
where 1 = 1
)-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__types_testing_ab2
select
md5(cast(coalesce(cast("id" as text), '') || '-' || coalesce(cast(airbyte_integer_column as text), '') || '-' || coalesce(cast(nullable_airbyte_integer_column as text), '') as text)) as _airbyte_types_testing_hashid,
tmp.*
from __dbt__cte__types_testing_ab2 tmp
-- types_testing
where 1 = 1
);

View File

@@ -0,0 +1,20 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ source('test_normalization', '_airbyte_raw_types_testing') }}
select
{{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as {{ adapter.quote('id') }},
{{ json_extract_scalar('_airbyte_data', ['airbyte_integer_column'], ['airbyte_integer_column']) }} as airbyte_integer_column,
{{ json_extract_scalar('_airbyte_data', ['nullable_airbyte_integer_column'], ['nullable_airbyte_integer_column']) }} as nullable_airbyte_integer_column,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ source('test_normalization', '_airbyte_raw_types_testing') }} as table_alias
-- types_testing
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -0,0 +1,20 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: {{ ref('types_testing_ab1') }}
select
cast({{ adapter.quote('id') }} as {{ dbt_utils.type_bigint() }}) as {{ adapter.quote('id') }},
cast(airbyte_integer_column as {{ dbt_utils.type_bigint() }}) as airbyte_integer_column,
cast(nullable_airbyte_integer_column as {{ dbt_utils.type_bigint() }}) as nullable_airbyte_integer_column,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ ref('types_testing_ab1') }}
-- types_testing
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -0,0 +1,163 @@
{{ config(
indexes = [{'columns':['_airbyte_active_row','_airbyte_unique_key_scd','_airbyte_emitted_at'],'type': 'btree'}],
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
post_hook = ["
{%
set final_table_relation = adapter.get_relation(
database=this.database,
schema=this.schema,
identifier='types_testing'
)
%}
{#
If the final table doesn't exist, then obviously we can't delete anything from it.
Also, after a reset, the final table is created without the _airbyte_unique_key column (this column is created during the first sync)
So skip this deletion if the column doesn't exist. (in this case, the table is guaranteed to be empty anyway)
#}
{%
if final_table_relation is not none and '_airbyte_unique_key' in adapter.get_columns_in_relation(final_table_relation)|map(attribute='name')
%}
-- Delete records which are no longer active:
-- This query is equivalent, but the left join version is more performant:
-- delete from final_table where unique_key in (
-- select unique_key from scd_table where 1 = 1 <incremental_clause(normalized_at, final_table)>
-- ) and unique_key not in (
-- select unique_key from scd_table where active_row = 1 <incremental_clause(normalized_at, final_table)>
-- )
-- We're incremental against normalized_at rather than emitted_at because we need to fetch the SCD
-- entries that were _updated_ recently. This is because a deleted record will have an SCD record
-- which was emitted a long time ago, but recently re-normalized to have active_row = 0.
delete from {{ final_table_relation }} where {{ final_table_relation }}._airbyte_unique_key in (
select recent_records.unique_key
from (
select distinct _airbyte_unique_key as unique_key
from {{ this }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('types_testing')) }}
) recent_records
left join (
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
from {{ this }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('types_testing')) }}
group by _airbyte_unique_key
) active_counts
on recent_records.unique_key = active_counts.unique_key
where active_count is null or active_count = 0
)
{% else %}
-- We have to have a non-empty query, so just do a noop delete
delete from {{ this }} where 1=0
{% endif %}
","delete from _airbyte_test_normalization.types_testing_stg where _airbyte_emitted_at != (select max(_airbyte_emitted_at) from _airbyte_test_normalization.types_testing_stg)"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('types_testing_stg')
with
{% if is_incremental() %}
new_data as (
-- retrieve incremental "new" data
select
*
from {{ ref('types_testing_stg') }}
-- types_testing from {{ source('test_normalization', '_airbyte_raw_types_testing') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}
),
new_data_ids as (
-- build a subset of _airbyte_unique_key from rows that are new
select distinct
{{ dbt_utils.surrogate_key([
adapter.quote('id'),
]) }} as _airbyte_unique_key
from new_data
),
empty_new_data as (
-- build an empty table to only keep the table's column types
select * from new_data where 1 = 0
),
previous_active_scd_data as (
-- retrieve "incomplete old" data that needs to be updated with an end date because of new changes
select
{{ star_intersect(ref('types_testing_stg'), this, from_alias='inc_data', intersect_alias='this_data') }}
from {{ this }} as this_data
-- make a join with new_data using primary key to filter active data that need to be updated only
join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key
-- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes)
left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id
where _airbyte_active_row = 1
),
input_data as (
select {{ dbt_utils.star(ref('types_testing_stg')) }} from new_data
union all
select {{ dbt_utils.star(ref('types_testing_stg')) }} from previous_active_scd_data
),
{% else %}
input_data as (
select *
from {{ ref('types_testing_stg') }}
-- types_testing from {{ source('test_normalization', '_airbyte_raw_types_testing') }}
),
{% endif %}
scd_data as (
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
{{ dbt_utils.surrogate_key([
adapter.quote('id'),
]) }} as _airbyte_unique_key,
{{ adapter.quote('id') }},
airbyte_integer_column,
nullable_airbyte_integer_column,
_airbyte_emitted_at as _airbyte_start_at,
lag(_airbyte_emitted_at) over (
partition by {{ adapter.quote('id') }}
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when row_number() over (
partition by {{ adapter.quote('id') }}
order by
_airbyte_emitted_at is null asc,
_airbyte_emitted_at desc,
_airbyte_emitted_at desc
) = 1 then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_types_testing_hashid
from input_data
),
dedup_data as (
select
-- we need to ensure de-duplicated rows for merge/update queries
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by
_airbyte_unique_key,
_airbyte_start_at,
_airbyte_emitted_at
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
{{ dbt_utils.surrogate_key([
'_airbyte_unique_key',
'_airbyte_start_at',
'_airbyte_emitted_at'
]) }} as _airbyte_unique_key_scd,
scd_data.*
from scd_data
)
select
_airbyte_unique_key,
_airbyte_unique_key_scd,
{{ adapter.quote('id') }},
airbyte_integer_column,
nullable_airbyte_integer_column,
_airbyte_start_at,
_airbyte_end_at,
_airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_types_testing_hashid
from dedup_data where _airbyte_row_num = 1

View File

@@ -0,0 +1,23 @@
{{ config(
indexes = [{'columns':['_airbyte_unique_key'],'unique':True}],
unique_key = "_airbyte_unique_key",
schema = "test_normalization",
tags = [ "top-level" ]
) }}
-- Final base SQL model
-- depends_on: {{ ref('types_testing_scd') }}
select
_airbyte_unique_key,
{{ adapter.quote('id') }},
airbyte_integer_column,
nullable_airbyte_integer_column,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_types_testing_hashid
from {{ ref('types_testing_scd') }}
-- types_testing from {{ source('test_normalization', '_airbyte_raw_types_testing') }}
where 1 = 1
and _airbyte_active_row = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -0,0 +1,20 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to build a hash column based on the values of this record
-- depends_on: {{ ref('types_testing_ab2') }}
select
{{ dbt_utils.surrogate_key([
adapter.quote('id'),
'airbyte_integer_column',
'nullable_airbyte_integer_column',
]) }} as _airbyte_types_testing_hashid,
tmp.*
from {{ ref('types_testing_ab2') }} tmp
-- types_testing
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -13,3 +13,4 @@ sources:
- name: _airbyte_raw_multiple_column_names_conflicts
- name: _airbyte_raw_pos_dedup_cdcx
- name: _airbyte_raw_renamed_dedup_cdc_excluded
- name: _airbyte_raw_types_testing

View File

@@ -1,12 +1,12 @@
version: 2
sources:
- name: test_normalization
quoting:
database: true
schema: false
identifier: false
tables:
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
- name: _airbyte_raw_renamed_dedup_cdc_excluded
- name: test_normalization
quoting:
database: true
schema: false
identifier: false
tables:
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
- name: _airbyte_raw_renamed_dedup_cdc_excluded

View File

@@ -0,0 +1,15 @@
delete from "postgres".test_normalization."types_testing_scd"
where (_airbyte_unique_key_scd) in (
select (_airbyte_unique_key_scd)
from "types_testing_scd__dbt_tmp"
);
insert into "postgres".test_normalization."types_testing_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "airbyte_integer_column", "nullable_airbyte_integer_column", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_types_testing_hashid")
(
select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "airbyte_integer_column", "nullable_airbyte_integer_column", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_types_testing_hashid"
from "types_testing_scd__dbt_tmp"
)

View File

@@ -0,0 +1,15 @@
delete from "postgres".test_normalization."types_testing"
where (_airbyte_unique_key) in (
select (_airbyte_unique_key)
from "types_testing__dbt_tmp"
);
insert into "postgres".test_normalization."types_testing" ("_airbyte_unique_key", "id", "airbyte_integer_column", "nullable_airbyte_integer_column", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_types_testing_hashid")
(
select "_airbyte_unique_key", "id", "airbyte_integer_column", "nullable_airbyte_integer_column", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_types_testing_hashid"
from "types_testing__dbt_tmp"
)

View File

@@ -0,0 +1,15 @@
delete from "postgres"._airbyte_test_normalization."types_testing_stg"
where (_airbyte_ab_id) in (
select (_airbyte_ab_id)
from "types_testing_stg__dbt_tmp"
);
insert into "postgres"._airbyte_test_normalization."types_testing_stg" ("_airbyte_types_testing_hashid", "id", "airbyte_integer_column", "nullable_airbyte_integer_column", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at")
(
select "_airbyte_types_testing_hashid", "id", "airbyte_integer_column", "nullable_airbyte_integer_column", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at"
from "types_testing_stg__dbt_tmp"
)

View File

@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
@@ -44,76 +44,84 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
nested_stream_with_complex_columns_resulting_into_long_names_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_stg: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_scd: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab1: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab2: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab3: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names: test_normalization._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
some_stream_that_was_empty_ab1: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_ab2: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_stg: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_scd: test_normalization._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty: test_normalization._airbyte_raw_some_stream_that_was_empty
nested_stream_with_complex_columns_resulting_into_long_names_ab1: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_ab2: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_stg: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_scd: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab1: test_normalization_xjvlg._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab2: test_normalization_xjvlg._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names_ab3: test_normalization_xjvlg._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
non_nested_stream_without_namespace_resulting_into_long_names: test_normalization_xjvlg._airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
some_stream_that_was_empty_ab1: test_normalization_xjvlg._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_ab2: test_normalization_xjvlg._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_stg: test_normalization_xjvlg._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty_scd: test_normalization_xjvlg._airbyte_raw_some_stream_that_was_empty
some_stream_that_was_empty: test_normalization_xjvlg._airbyte_raw_some_stream_that_was_empty
simple_stream_with_namespace_resulting_into_long_names_ab1: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
simple_stream_with_namespace_resulting_into_long_names_ab2: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
simple_stream_with_namespace_resulting_into_long_names_ab3: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
simple_stream_with_namespace_resulting_into_long_names: test_normalization_namespace._airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
conflict_stream_name_ab1: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_ab2: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_ab3: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_scalar_ab1: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar_ab2: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar_ab3: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar: test_normalization._airbyte_raw_conflict_stream_scalar
conflict_stream_array_ab1: test_normalization._airbyte_raw_conflict_stream_array
conflict_stream_array_ab2: test_normalization._airbyte_raw_conflict_stream_array
conflict_stream_array_ab3: test_normalization._airbyte_raw_conflict_stream_array
conflict_stream_array: test_normalization._airbyte_raw_conflict_stream_array
unnest_alias_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias: test_normalization._airbyte_raw_unnest_alias
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
conflict_stream_name_conflict_stream_name_ab1: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_ab2: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_ab3: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name: test_normalization._airbyte_raw_conflict_stream_name
unnest_alias_children_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children: test_normalization._airbyte_raw_unnest_alias
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_data_ab1: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_data_ab2: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_data_ab3: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_data: test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab1: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab2: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab3: test_normalization._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name: test_normalization._airbyte_raw_conflict_stream_name
unnest_alias_children_owner_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab1: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab2: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab3: test_normalization._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes: test_normalization._airbyte_raw_unnest_alias
conflict_stream_name_ab1: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_name_ab2: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_name_ab3: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_name: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_scalar_ab1: test_normalization_xjvlg._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar_ab2: test_normalization_xjvlg._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar_ab3: test_normalization_xjvlg._airbyte_raw_conflict_stream_scalar
conflict_stream_scalar: test_normalization_xjvlg._airbyte_raw_conflict_stream_scalar
conflict_stream_array_ab1: test_normalization_xjvlg._airbyte_raw_conflict_stream_array
conflict_stream_array_ab2: test_normalization_xjvlg._airbyte_raw_conflict_stream_array
conflict_stream_array_ab3: test_normalization_xjvlg._airbyte_raw_conflict_stream_array
conflict_stream_array: test_normalization_xjvlg._airbyte_raw_conflict_stream_array
unnest_alias_ab1: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_ab2: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_ab3: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias: test_normalization_xjvlg._airbyte_raw_unnest_alias
arrays_ab1: test_normalization_xjvlg._airbyte_raw_arrays
arrays_ab2: test_normalization_xjvlg._airbyte_raw_arrays
arrays_ab3: test_normalization_xjvlg._airbyte_raw_arrays
arrays: test_normalization_xjvlg._airbyte_raw_arrays
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab1: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab2: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_ab3: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
conflict_stream_name_conflict_stream_name_ab1: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_ab2: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_ab3: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
unnest_alias_children_ab1: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children_ab2: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children_ab3: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children: test_normalization_xjvlg._airbyte_raw_unnest_alias
arrays_nested_array_parent_ab1: test_normalization_xjvlg._airbyte_raw_arrays
arrays_nested_array_parent_ab2: test_normalization_xjvlg._airbyte_raw_arrays
arrays_nested_array_parent_ab3: test_normalization_xjvlg._airbyte_raw_arrays
arrays_nested_array_parent: test_normalization_xjvlg._airbyte_raw_arrays
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab1: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab2: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab3: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_data_ab1: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_data_ab2: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_data_ab3: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
nested_stream_with_complex_columns_resulting_into_long_names_partition_data: test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab1: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab2: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name_ab3: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
conflict_stream_name_conflict_stream_name_conflict_stream_name: test_normalization_xjvlg._airbyte_raw_conflict_stream_name
unnest_alias_children_owner_ab1: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children_owner_ab2: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children_owner_ab3: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children_owner: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab1: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab2: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes_ab3: test_normalization_xjvlg._airbyte_raw_unnest_alias
unnest_alias_children_owner_column___with__quotes: test_normalization_xjvlg._airbyte_raw_unnest_alias

View File

@@ -2,7 +2,7 @@
create table
"integrationtests"."test_normalization"."nested_stream_with_complex_columns_resulting_into_long_names_scd__dbt_tmp"
"integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_scd"
compound sortkey(_airbyte_active_row,_airbyte_unique_key_scd,_airbyte_emitted_at)
@@ -14,8 +14,8 @@ with
input_data as (
select *
from "integrationtests"._airbyte_test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_stg"
-- nested_stream_with_complex_columns_resulting_into_long_names from "integrationtests".test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
from "integrationtests"._airbyte_test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_stg"
-- nested_stream_with_complex_columns_resulting_into_long_names from "integrationtests".test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
),
scd_data as (

View File

@@ -2,7 +2,7 @@
create table
"integrationtests"."test_normalization"."nested_stream_with_complex_columns_resulting_into_long_names__dbt_tmp"
"integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names"
compound sortkey(_airbyte_unique_key,_airbyte_emitted_at)
@@ -10,7 +10,7 @@
as (
-- Final base SQL model
-- depends_on: "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_scd"
-- depends_on: "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_scd"
select
_airbyte_unique_key,
id,
@@ -20,8 +20,8 @@ select
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at,
_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid
from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_scd"
-- nested_stream_with_complex_columns_resulting_into_long_names from "integrationtests".test_normalization._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_scd"
-- nested_stream_with_complex_columns_resulting_into_long_names from "integrationtests".test_normalization_xjvlg._airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
where 1 = 1
and _airbyte_active_row = 1

View File

@@ -2,7 +2,7 @@
create table
"integrationtests"."test_normalization"."nested_stream_with_complex_columns_resulting_into_long_names_partition__dbt_tmp"
"integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition"
compound sortkey(_airbyte_emitted_at)
@@ -12,7 +12,7 @@
with __dbt__cte__nested_stream_with_complex_columns_resulting_into_long_names_partition_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_scd"
-- depends_on: "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_scd"
select
_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid,
"partition"."double_array_data" as double_array_data,
@@ -20,7 +20,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_scd" as table_alias
from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_scd" as table_alias
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1
and "partition" is not null
@@ -62,7 +62,7 @@ select
getdate() as _airbyte_normalized_at,
_airbyte_partition_hashid
from __dbt__cte__nested_stream_with_complex_columns_resulting_into_long_names_partition_ab3
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_scd"
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_scd"
where 1 = 1
);

View File

@@ -2,7 +2,7 @@
create table
"integrationtests"."test_normalization"."nested_stream_with_complex_columns_resulting_into_long_names_partition_data__dbt_tmp"
"integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition_data"
compound sortkey(_airbyte_emitted_at)
@@ -12,13 +12,13 @@
with __dbt__cte__nested_stream_with_complex_columns_resulting_into_long_names_partition_data_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition"
-- depends_on: "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition"
with joined as (
select
table_alias._airbyte_partition_hashid as _airbyte_hashid,
_airbyte_nested_data
from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition" as table_alias, table_alias.data as _airbyte_nested_data
from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition" as table_alias, table_alias.data as _airbyte_nested_data
)
select
_airbyte_partition_hashid,
@@ -26,7 +26,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition" as table_alias
from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition" as table_alias
-- data at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA
left join joined on _airbyte_partition_hashid = joined._airbyte_hashid
where 1 = 1
@@ -67,7 +67,7 @@ select
getdate() as _airbyte_normalized_at,
_airbyte_data_hashid
from __dbt__cte__nested_stream_with_complex_columns_resulting_into_long_names_partition_data_ab3
-- data at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition"
-- data at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition"
where 1 = 1
);

View File

@@ -2,7 +2,7 @@
create table
"integrationtests"."test_normalization"."nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data__dbt_tmp"
"integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data"
compound sortkey(_airbyte_emitted_at)
@@ -12,13 +12,13 @@
with __dbt__cte__nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition"
-- depends_on: "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition"
with joined as (
select
table_alias._airbyte_partition_hashid as _airbyte_hashid,
_airbyte_nested_data
from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition" as table_alias, table_alias.double_array_data as _airbyte_nested_data
from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition" as table_alias, table_alias.double_array_data as _airbyte_nested_data
)
select
_airbyte_partition_hashid,
@@ -26,7 +26,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition" as table_alias
from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition" as table_alias
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data
left join joined on _airbyte_partition_hashid = joined._airbyte_hashid
where 1 = 1
@@ -67,7 +67,7 @@ select
getdate() as _airbyte_normalized_at,
_airbyte_double_array_data_hashid
from __dbt__cte__nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab3
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition"
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition"
where 1 = 1
);

View File

@@ -1,11 +1,11 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_xjvlg",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ source('test_normalization', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }}
-- depends_on: {{ source('test_normalization_xjvlg', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }}
select
{{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id,
{{ json_extract_scalar('_airbyte_data', ['date'], ['date']) }} as date,
@@ -13,7 +13,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ source('test_normalization', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }} as table_alias
from {{ source('test_normalization_xjvlg', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }} as table_alias
-- nested_stream_with_complex_columns_resulting_into_long_names
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -1,7 +1,7 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_xjvlg",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type

View File

@@ -1,6 +1,6 @@
{{ config(
sort = "_airbyte_emitted_at",
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_xjvlg",
tags = [ "nested-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema

View File

@@ -1,6 +1,6 @@
{{ config(
sort = "_airbyte_emitted_at",
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_xjvlg",
tags = [ "nested-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema

View File

@@ -1,6 +1,6 @@
{{ config(
sort = "_airbyte_emitted_at",
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_xjvlg",
tags = [ "nested-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema

View File

@@ -1,7 +1,7 @@
{{ config(
sort = ["_airbyte_active_row", "_airbyte_unique_key_scd", "_airbyte_emitted_at"],
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
schema = "test_normalization_xjvlg",
post_hook = ["
{%
set final_table_relation = adapter.get_relation(
@@ -48,7 +48,7 @@
-- We have to have a non-empty query, so just do a noop delete
delete from {{ this }} where 1=0
{% endif %}
","drop view _airbyte_test_normalization.nested_stream_with_complex_columns_resulting_into_long_names_stg"],
","drop view _airbyte_test_normalization_xjvlg.nested_stream_with_complex_columns_resulting_into_long_names_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('nested_stream_with_complex_columns_resulting_into_long_names_stg')
@@ -59,7 +59,7 @@ new_data as (
select
*
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_stg') }}
-- nested_stream_with_complex_columns_resulting_into_long_names from {{ source('test_normalization', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }}
-- nested_stream_with_complex_columns_resulting_into_long_names from {{ source('test_normalization_xjvlg', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}
),
@@ -95,7 +95,7 @@ input_data as (
input_data as (
select *
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_stg') }}
-- nested_stream_with_complex_columns_resulting_into_long_names from {{ source('test_normalization', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }}
-- nested_stream_with_complex_columns_resulting_into_long_names from {{ source('test_normalization_xjvlg', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }}
),
{% endif %}
scd_data as (

View File

@@ -1,7 +1,7 @@
{{ config(
sort = ["_airbyte_unique_key", "_airbyte_emitted_at"],
unique_key = "_airbyte_unique_key",
schema = "test_normalization",
schema = "test_normalization_xjvlg",
tags = [ "top-level" ]
) }}
-- Final base SQL model
@@ -16,7 +16,7 @@ select
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid
from {{ ref('nested_stream_with_complex_columns_resulting_into_long_names_scd') }}
-- nested_stream_with_complex_columns_resulting_into_long_names from {{ source('test_normalization', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }}
-- nested_stream_with_complex_columns_resulting_into_long_names from {{ source('test_normalization_xjvlg', '_airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names') }}
where 1 = 1
and _airbyte_active_row = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -1,6 +1,6 @@
{{ config(
sort = "_airbyte_emitted_at",
schema = "test_normalization",
schema = "test_normalization_xjvlg",
tags = [ "nested" ]
) }}
-- Final base SQL model

View File

@@ -1,6 +1,6 @@
{{ config(
sort = "_airbyte_emitted_at",
schema = "test_normalization",
schema = "test_normalization_xjvlg",
tags = [ "nested" ]
) }}
-- Final base SQL model

View File

@@ -1,6 +1,6 @@
{{ config(
sort = "_airbyte_emitted_at",
schema = "test_normalization",
schema = "test_normalization_xjvlg",
tags = [ "nested" ]
) }}
-- Final base SQL model

View File

@@ -1,18 +1,5 @@
version: 2
sources:
- name: test_normalization
quoting:
database: true
schema: false
identifier: false
tables:
- name: _airbyte_raw_conflict_stream_array
- name: _airbyte_raw_conflict_stream_name
- name: _airbyte_raw_conflict_stream_scalar
- name: _airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
- name: _airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
- name: _airbyte_raw_some_stream_that_was_empty
- name: _airbyte_raw_unnest_alias
- name: test_normalization_namespace
quoting:
database: true
@@ -20,3 +7,17 @@ sources:
identifier: false
tables:
- name: _airbyte_raw_simple_stream_with_namespace_resulting_into_long_names
- name: test_normalization_xjvlg
quoting:
database: true
schema: false
identifier: false
tables:
- name: _airbyte_raw_arrays
- name: _airbyte_raw_conflict_stream_array
- name: _airbyte_raw_conflict_stream_name
- name: _airbyte_raw_conflict_stream_scalar
- name: _airbyte_raw_nested_stream_with_complex_columns_resulting_into_long_names
- name: _airbyte_raw_non_nested_stream_without_namespace_resulting_into_long_names
- name: _airbyte_raw_some_stream_that_was_empty
- name: _airbyte_raw_unnest_alias

View File

@@ -1,13 +1,13 @@
delete from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_scd"
delete from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_scd"
where (_airbyte_unique_key_scd) in (
select (_airbyte_unique_key_scd)
from "nested_stream_with_complex_columns_resulti__dbt_tmp"
);
insert into "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "date", "partition", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid")
insert into "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "date", "partition", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid")
(
select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "date", "partition", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid"
from "nested_stream_with_complex_columns_resulti__dbt_tmp"

View File

@@ -1,13 +1,13 @@
delete from "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names"
delete from "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names"
where (_airbyte_unique_key) in (
select (_airbyte_unique_key)
from "nested_stream_with_complex_columns_resulti__dbt_tmp"
);
insert into "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names" ("_airbyte_unique_key", "id", "date", "partition", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid")
insert into "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names" ("_airbyte_unique_key", "id", "date", "partition", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid")
(
select "_airbyte_unique_key", "id", "date", "partition", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid"
from "nested_stream_with_complex_columns_resulti__dbt_tmp"

View File

@@ -1,7 +1,7 @@
insert into "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition" ("_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid", "double_array_data", "data", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_partition_hashid")
insert into "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition" ("_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid", "double_array_data", "data", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_partition_hashid")
(
select "_airbyte_nested_stream_with_complex_columns_resulting_into_long_names_hashid", "double_array_data", "data", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_partition_hashid"
from "nested_stream_with_complex_columns_resulti__dbt_tmp"

View File

@@ -1,7 +1,7 @@
insert into "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition_data" ("_airbyte_partition_hashid", "currency", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_data_hashid")
insert into "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition_data" ("_airbyte_partition_hashid", "currency", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_data_hashid")
(
select "_airbyte_partition_hashid", "currency", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_data_hashid"
from "nested_stream_with_complex_columns_resulti__dbt_tmp"

View File

@@ -1,7 +1,7 @@
insert into "integrationtests".test_normalization."nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data" ("_airbyte_partition_hashid", "id", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_double_array_data_hashid")
insert into "integrationtests".test_normalization_xjvlg."nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data" ("_airbyte_partition_hashid", "id", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_double_array_data_hashid")
(
select "_airbyte_partition_hashid", "id", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_double_array_data_hashid"
from "nested_stream_with_complex_columns_resulti__dbt_tmp"

View File

@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- modified_models
- modified_models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
@@ -44,29 +44,29 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
exchange_rate_ab1: test_normalization._airbyte_raw_exchange_rate
exchange_rate_ab2: test_normalization._airbyte_raw_exchange_rate
exchange_rate_ab3: test_normalization._airbyte_raw_exchange_rate
exchange_rate: test_normalization._airbyte_raw_exchange_rate
dedup_exchange_rate_ab1: test_normalization._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_ab2: test_normalization._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_stg: test_normalization._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_scd: test_normalization._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate: test_normalization._airbyte_raw_dedup_exchange_rate
renamed_dedup_cdc_excluded_ab1: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_ab2: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_stg: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_scd: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
dedup_cdc_excluded_ab1: test_normalization._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_ab2: test_normalization._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_stg: test_normalization._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_scd: test_normalization._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded: test_normalization._airbyte_raw_dedup_cdc_excluded
exchange_rate_ab1: test_normalization_bhhpj._airbyte_raw_exchange_rate
exchange_rate_ab2: test_normalization_bhhpj._airbyte_raw_exchange_rate
exchange_rate_ab3: test_normalization_bhhpj._airbyte_raw_exchange_rate
exchange_rate: test_normalization_bhhpj._airbyte_raw_exchange_rate
dedup_exchange_rate_ab1: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_ab2: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_stg: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_scd: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
renamed_dedup_cdc_excluded_ab1: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_ab2: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_stg: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_scd: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
dedup_cdc_excluded_ab1: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_ab2: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_stg: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_scd: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded

View File

@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
@@ -44,44 +44,49 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
exchange_rate_ab1: test_normalization._airbyte_raw_exchange_rate
exchange_rate_ab2: test_normalization._airbyte_raw_exchange_rate
exchange_rate_ab3: test_normalization._airbyte_raw_exchange_rate
exchange_rate: test_normalization._airbyte_raw_exchange_rate
dedup_exchange_rate_ab1: test_normalization._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_ab2: test_normalization._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_stg: test_normalization._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_scd: test_normalization._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate: test_normalization._airbyte_raw_dedup_exchange_rate
renamed_dedup_cdc_excluded_ab1: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_ab2: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_stg: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_scd: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded: test_normalization._airbyte_raw_renamed_dedup_cdc_excluded
dedup_cdc_excluded_ab1: test_normalization._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_ab2: test_normalization._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_stg: test_normalization._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_scd: test_normalization._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded: test_normalization._airbyte_raw_dedup_cdc_excluded
pos_dedup_cdcx_ab1: test_normalization._airbyte_raw_pos_dedup_cdcx
pos_dedup_cdcx_ab2: test_normalization._airbyte_raw_pos_dedup_cdcx
pos_dedup_cdcx_stg: test_normalization._airbyte_raw_pos_dedup_cdcx
pos_dedup_cdcx_scd: test_normalization._airbyte_raw_pos_dedup_cdcx
pos_dedup_cdcx: test_normalization._airbyte_raw_pos_dedup_cdcx
1_prefix_startwith_number_ab1: test_normalization._airbyte_raw_1_prefix_startwith_number
1_prefix_startwith_number_ab2: test_normalization._airbyte_raw_1_prefix_startwith_number
1_prefix_startwith_number_stg: test_normalization._airbyte_raw_1_prefix_startwith_number
1_prefix_startwith_number_scd: test_normalization._airbyte_raw_1_prefix_startwith_number
1_prefix_startwith_number: test_normalization._airbyte_raw_1_prefix_startwith_number
multiple_column_names_conflicts_ab1: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_ab2: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_stg: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_scd: test_normalization._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts: test_normalization._airbyte_raw_multiple_column_names_conflicts
exchange_rate_ab1: test_normalization_bhhpj._airbyte_raw_exchange_rate
exchange_rate_ab2: test_normalization_bhhpj._airbyte_raw_exchange_rate
exchange_rate_ab3: test_normalization_bhhpj._airbyte_raw_exchange_rate
exchange_rate: test_normalization_bhhpj._airbyte_raw_exchange_rate
dedup_exchange_rate_ab1: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_ab2: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_stg: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate_scd: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
dedup_exchange_rate: test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
renamed_dedup_cdc_excluded_ab1: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_ab2: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_stg: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded_scd: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
renamed_dedup_cdc_excluded: test_normalization_bhhpj._airbyte_raw_renamed_dedup_cdc_excluded
dedup_cdc_excluded_ab1: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_ab2: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_stg: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded_scd: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded
dedup_cdc_excluded: test_normalization_bhhpj._airbyte_raw_dedup_cdc_excluded
pos_dedup_cdcx_ab1: test_normalization_bhhpj._airbyte_raw_pos_dedup_cdcx
pos_dedup_cdcx_ab2: test_normalization_bhhpj._airbyte_raw_pos_dedup_cdcx
pos_dedup_cdcx_stg: test_normalization_bhhpj._airbyte_raw_pos_dedup_cdcx
pos_dedup_cdcx_scd: test_normalization_bhhpj._airbyte_raw_pos_dedup_cdcx
pos_dedup_cdcx: test_normalization_bhhpj._airbyte_raw_pos_dedup_cdcx
1_prefix_startwith_number_ab1: test_normalization_bhhpj._airbyte_raw_1_prefix_startwith_number
1_prefix_startwith_number_ab2: test_normalization_bhhpj._airbyte_raw_1_prefix_startwith_number
1_prefix_startwith_number_stg: test_normalization_bhhpj._airbyte_raw_1_prefix_startwith_number
1_prefix_startwith_number_scd: test_normalization_bhhpj._airbyte_raw_1_prefix_startwith_number
1_prefix_startwith_number: test_normalization_bhhpj._airbyte_raw_1_prefix_startwith_number
multiple_column_names_conflicts_ab1: test_normalization_bhhpj._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_ab2: test_normalization_bhhpj._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_stg: test_normalization_bhhpj._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts_scd: test_normalization_bhhpj._airbyte_raw_multiple_column_names_conflicts
multiple_column_names_conflicts: test_normalization_bhhpj._airbyte_raw_multiple_column_names_conflicts
types_testing_ab1: test_normalization_bhhpj._airbyte_raw_types_testing
types_testing_ab2: test_normalization_bhhpj._airbyte_raw_types_testing
types_testing_stg: test_normalization_bhhpj._airbyte_raw_types_testing
types_testing_scd: test_normalization_bhhpj._airbyte_raw_types_testing
types_testing: test_normalization_bhhpj._airbyte_raw_types_testing

View File

@@ -2,7 +2,7 @@
create table
"integrationtests"."test_normalization"."dedup_exchange_rate_scd__dbt_tmp"
"integrationtests".test_normalization_bhhpj."dedup_exchange_rate_scd"
compound sortkey(_airbyte_active_row,_airbyte_unique_key_scd,_airbyte_emitted_at)
@@ -14,8 +14,8 @@ with
input_data as (
select *
from "integrationtests"._airbyte_test_normalization."dedup_exchange_rate_stg"
-- dedup_exchange_rate from "integrationtests".test_normalization._airbyte_raw_dedup_exchange_rate
from "integrationtests"._airbyte_test_normalization_bhhpj."dedup_exchange_rate_stg"
-- dedup_exchange_rate from "integrationtests".test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
),
scd_data as (

View File

@@ -2,7 +2,7 @@
create table
"integrationtests"."test_normalization"."dedup_exchange_rate__dbt_tmp"
"integrationtests".test_normalization_bhhpj."dedup_exchange_rate"
compound sortkey(_airbyte_unique_key,_airbyte_emitted_at)
@@ -10,7 +10,7 @@
as (
-- Final base SQL model
-- depends_on: "integrationtests".test_normalization."dedup_exchange_rate_scd"
-- depends_on: "integrationtests".test_normalization_bhhpj."dedup_exchange_rate_scd"
select
_airbyte_unique_key,
id,
@@ -25,8 +25,8 @@ select
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at,
_airbyte_dedup_exchange_rate_hashid
from "integrationtests".test_normalization."dedup_exchange_rate_scd"
-- dedup_exchange_rate from "integrationtests".test_normalization._airbyte_raw_dedup_exchange_rate
from "integrationtests".test_normalization_bhhpj."dedup_exchange_rate_scd"
-- dedup_exchange_rate from "integrationtests".test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
where 1 = 1
and _airbyte_active_row = 1

View File

@@ -1,7 +1,7 @@
create table
"integrationtests".test_normalization."exchange_rate__dbt_tmp"
"integrationtests".test_normalization_bhhpj."exchange_rate__dbt_tmp"
compound sortkey(_airbyte_emitted_at)
@@ -11,7 +11,7 @@
with __dbt__cte__exchange_rate_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization._airbyte_raw_exchange_rate
-- depends_on: "integrationtests".test_normalization_bhhpj._airbyte_raw_exchange_rate
select
case when _airbyte_data."id" != '' then _airbyte_data."id" end as id,
case when _airbyte_data."currency" != '' then _airbyte_data."currency" end as currency,
@@ -25,7 +25,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization._airbyte_raw_exchange_rate as table_alias
from "integrationtests".test_normalization_bhhpj._airbyte_raw_exchange_rate as table_alias
-- exchange_rate
where 1 = 1
), __dbt__cte__exchange_rate_ab2 as (
@@ -87,6 +87,6 @@ select
getdate() as _airbyte_normalized_at,
_airbyte_exchange_rate_hashid
from __dbt__cte__exchange_rate_ab3
-- exchange_rate from "integrationtests".test_normalization._airbyte_raw_exchange_rate
-- exchange_rate from "integrationtests".test_normalization_bhhpj._airbyte_raw_exchange_rate
where 1 = 1
);

View File

@@ -1,11 +1,11 @@
create view "integrationtests"._airbyte_test_normalization."dedup_exchange_rate_stg__dbt_tmp" as (
create view "integrationtests"._airbyte_test_normalization_bhhpj."dedup_exchange_rate_stg__dbt_tmp" as (
with __dbt__cte__dedup_exchange_rate_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization._airbyte_raw_dedup_exchange_rate
-- depends_on: "integrationtests".test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
select
case when _airbyte_data."id" != '' then _airbyte_data."id" end as id,
case when _airbyte_data."currency" != '' then _airbyte_data."currency" end as currency,
@@ -18,7 +18,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization._airbyte_raw_dedup_exchange_rate as table_alias
from "integrationtests".test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate as table_alias
-- dedup_exchange_rate
where 1 = 1

View File

@@ -1,11 +1,11 @@
create view "integrationtests"._airbyte_test_normalization."multiple_column_names_conflicts_stg__dbt_tmp" as (
create view "integrationtests"._airbyte_test_normalization_bhhpj."multiple_column_names_conflicts_stg__dbt_tmp" as (
with __dbt__cte__multiple_column_names_conflicts_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization._airbyte_raw_multiple_column_names_conflicts
-- depends_on: "integrationtests".test_normalization_bhhpj._airbyte_raw_multiple_column_names_conflicts
select
case when _airbyte_data."id" != '' then _airbyte_data."id" end as id,
case when _airbyte_data."User Id" != '' then _airbyte_data."User Id" end as "user id",
@@ -17,7 +17,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization._airbyte_raw_multiple_column_names_conflicts as table_alias
from "integrationtests".test_normalization_bhhpj._airbyte_raw_multiple_column_names_conflicts as table_alias
-- multiple_column_names_conflicts
where 1 = 1

View File

@@ -1,11 +1,11 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_bhhpj",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
-- depends_on: {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }}
select
{{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id,
{{ json_extract_scalar('_airbyte_data', ['currency'], ['currency']) }} as currency,
@@ -18,7 +18,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} as table_alias
from {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }} as table_alias
-- dedup_exchange_rate
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -1,7 +1,7 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_bhhpj",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type

View File

@@ -1,7 +1,7 @@
{{ config(
sort = ["_airbyte_active_row", "_airbyte_unique_key_scd", "_airbyte_emitted_at"],
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
schema = "test_normalization_bhhpj",
post_hook = ["
{%
set final_table_relation = adapter.get_relation(
@@ -48,7 +48,7 @@
-- We have to have a non-empty query, so just do a noop delete
delete from {{ this }} where 1=0
{% endif %}
","drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
","drop view _airbyte_test_normalization_bhhpj.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
@@ -59,7 +59,7 @@ new_data as (
select
*
from {{ ref('dedup_exchange_rate_stg') }}
-- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
-- dedup_exchange_rate from {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}
),
@@ -97,7 +97,7 @@ input_data as (
input_data as (
select *
from {{ ref('dedup_exchange_rate_stg') }}
-- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
-- dedup_exchange_rate from {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }}
),
{% endif %}
scd_data as (

View File

@@ -1,7 +1,7 @@
{{ config(
sort = ["_airbyte_unique_key", "_airbyte_emitted_at"],
unique_key = "_airbyte_unique_key",
schema = "test_normalization",
schema = "test_normalization_bhhpj",
tags = [ "top-level" ]
) }}
-- Final base SQL model
@@ -21,7 +21,7 @@ select
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_dedup_exchange_rate_hashid
from {{ ref('dedup_exchange_rate_scd') }}
-- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
-- dedup_exchange_rate from {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }}
where 1 = 1
and _airbyte_active_row = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -1,7 +1,7 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
schema = "test_normalization_bhhpj",
tags = [ "top-level" ]
) }}
-- Final base SQL model
@@ -21,6 +21,6 @@ select
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_exchange_rate_hashid
from {{ ref('exchange_rate_ab3') }}
-- exchange_rate from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }}
-- exchange_rate from {{ source('test_normalization_bhhpj', '_airbyte_raw_exchange_rate') }}
where 1 = 1

View File

@@ -1,7 +1,7 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_bhhpj",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to build a hash column based on the values of this record

View File

@@ -1,6 +1,6 @@
version: 2
sources:
- name: test_normalization
- name: test_normalization_bhhpj
quoting:
database: true
schema: false
@@ -13,3 +13,4 @@ sources:
- name: _airbyte_raw_multiple_column_names_conflicts
- name: _airbyte_raw_pos_dedup_cdcx
- name: _airbyte_raw_renamed_dedup_cdc_excluded
- name: _airbyte_raw_types_testing

View File

@@ -1,11 +1,11 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_bhhpj",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
-- depends_on: {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }}
select
{{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id,
{{ json_extract_scalar('_airbyte_data', ['currency'], ['currency']) }} as currency,
@@ -18,7 +18,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }} as table_alias
from {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }} as table_alias
-- dedup_exchange_rate
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -1,7 +1,7 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_bhhpj",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type

View File

@@ -1,7 +1,7 @@
{{ config(
sort = ["_airbyte_active_row", "_airbyte_unique_key_scd", "_airbyte_emitted_at"],
unique_key = "_airbyte_unique_key_scd",
schema = "test_normalization",
schema = "test_normalization_bhhpj",
post_hook = ["
{%
set final_table_relation = adapter.get_relation(
@@ -48,7 +48,7 @@
-- We have to have a non-empty query, so just do a noop delete
delete from {{ this }} where 1=0
{% endif %}
","drop view _airbyte_test_normalization.dedup_exchange_rate_stg"],
","drop view _airbyte_test_normalization_bhhpj.dedup_exchange_rate_stg"],
tags = [ "top-level" ]
) }}
-- depends_on: ref('dedup_exchange_rate_stg')
@@ -59,7 +59,7 @@ new_data as (
select
*
from {{ ref('dedup_exchange_rate_stg') }}
-- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
-- dedup_exchange_rate from {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }}
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}
),
@@ -97,7 +97,7 @@ input_data as (
input_data as (
select *
from {{ ref('dedup_exchange_rate_stg') }}
-- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
-- dedup_exchange_rate from {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }}
),
{% endif %}
scd_data as (

View File

@@ -1,7 +1,7 @@
{{ config(
sort = ["_airbyte_unique_key", "_airbyte_emitted_at"],
unique_key = "_airbyte_unique_key",
schema = "test_normalization",
schema = "test_normalization_bhhpj",
tags = [ "top-level" ]
) }}
-- Final base SQL model
@@ -21,7 +21,7 @@ select
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_dedup_exchange_rate_hashid
from {{ ref('dedup_exchange_rate_scd') }}
-- dedup_exchange_rate from {{ source('test_normalization', '_airbyte_raw_dedup_exchange_rate') }}
-- dedup_exchange_rate from {{ source('test_normalization_bhhpj', '_airbyte_raw_dedup_exchange_rate') }}
where 1 = 1
and _airbyte_active_row = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

View File

@@ -1,7 +1,7 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
schema = "test_normalization_bhhpj",
tags = [ "top-level" ]
) }}
-- Final base SQL model
@@ -21,6 +21,6 @@ select
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_exchange_rate_hashid
from {{ ref('exchange_rate_ab3') }}
-- exchange_rate from {{ source('test_normalization', '_airbyte_raw_exchange_rate') }}
-- exchange_rate from {{ source('test_normalization_bhhpj', '_airbyte_raw_exchange_rate') }}
where 1 = 1

View File

@@ -1,7 +1,7 @@
{{ config(
sort = "_airbyte_emitted_at",
unique_key = '_airbyte_ab_id',
schema = "_airbyte_test_normalization",
schema = "_airbyte_test_normalization_bhhpj",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to build a hash column based on the values of this record

View File

@@ -1,12 +1,12 @@
version: 2
sources:
- name: test_normalization
quoting:
database: true
schema: false
identifier: false
tables:
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
- name: _airbyte_raw_renamed_dedup_cdc_excluded
- name: test_normalization_bhhpj
quoting:
database: true
schema: false
identifier: false
tables:
- name: _airbyte_raw_dedup_cdc_excluded
- name: _airbyte_raw_dedup_exchange_rate
- name: _airbyte_raw_exchange_rate
- name: _airbyte_raw_renamed_dedup_cdc_excluded

View File

@@ -1,13 +1,13 @@
delete from "integrationtests".test_normalization."dedup_exchange_rate_scd"
delete from "integrationtests".test_normalization_bhhpj."dedup_exchange_rate_scd"
where (_airbyte_unique_key_scd) in (
select (_airbyte_unique_key_scd)
from "dedup_exchange_rate_scd__dbt_tmp"
);
insert into "integrationtests".test_normalization."dedup_exchange_rate_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "currency", "date", "timestamp_col", "hkd@spéçiäl & characters", "hkd_special___characters", "nzd", "usd", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
insert into "integrationtests".test_normalization_bhhpj."dedup_exchange_rate_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "currency", "date", "timestamp_col", "hkd@spéçiäl & characters", "hkd_special___characters", "nzd", "usd", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
(
select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "currency", "date", "timestamp_col", "hkd@spéçiäl & characters", "hkd_special___characters", "nzd", "usd", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid"
from "dedup_exchange_rate_scd__dbt_tmp"

View File

@@ -1,13 +1,13 @@
delete from "integrationtests".test_normalization."dedup_exchange_rate"
delete from "integrationtests".test_normalization_bhhpj."dedup_exchange_rate"
where (_airbyte_unique_key) in (
select (_airbyte_unique_key)
from "dedup_exchange_rate__dbt_tmp"
);
insert into "integrationtests".test_normalization."dedup_exchange_rate" ("_airbyte_unique_key", "id", "currency", "date", "timestamp_col", "hkd@spéçiäl & characters", "hkd_special___characters", "nzd", "usd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
insert into "integrationtests".test_normalization_bhhpj."dedup_exchange_rate" ("_airbyte_unique_key", "id", "currency", "date", "timestamp_col", "hkd@spéçiäl & characters", "hkd_special___characters", "nzd", "usd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
(
select "_airbyte_unique_key", "id", "currency", "date", "timestamp_col", "hkd@spéçiäl & characters", "hkd_special___characters", "nzd", "usd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid"
from "dedup_exchange_rate__dbt_tmp"

View File

@@ -1,7 +1,7 @@
create table
"integrationtests".test_normalization."exchange_rate__dbt_tmp"
"integrationtests".test_normalization_bhhpj."exchange_rate__dbt_tmp"
compound sortkey(_airbyte_emitted_at)
@@ -11,7 +11,7 @@
with __dbt__cte__exchange_rate_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization._airbyte_raw_exchange_rate
-- depends_on: "integrationtests".test_normalization_bhhpj._airbyte_raw_exchange_rate
select
case when _airbyte_data."id" != '' then _airbyte_data."id" end as id,
case when _airbyte_data."currency" != '' then _airbyte_data."currency" end as currency,
@@ -25,7 +25,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization._airbyte_raw_exchange_rate as table_alias
from "integrationtests".test_normalization_bhhpj._airbyte_raw_exchange_rate as table_alias
-- exchange_rate
where 1 = 1
), __dbt__cte__exchange_rate_ab2 as (
@@ -87,6 +87,6 @@ select
getdate() as _airbyte_normalized_at,
_airbyte_exchange_rate_hashid
from __dbt__cte__exchange_rate_ab3
-- exchange_rate from "integrationtests".test_normalization._airbyte_raw_exchange_rate
-- exchange_rate from "integrationtests".test_normalization_bhhpj._airbyte_raw_exchange_rate
where 1 = 1
);

View File

@@ -1,11 +1,11 @@
create view "integrationtests"._airbyte_test_normalization."dedup_exchange_rate_stg__dbt_tmp" as (
create view "integrationtests"._airbyte_test_normalization_bhhpj."dedup_exchange_rate_stg__dbt_tmp" as (
with __dbt__cte__dedup_exchange_rate_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization._airbyte_raw_dedup_exchange_rate
-- depends_on: "integrationtests".test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
select
case when _airbyte_data."id" != '' then _airbyte_data."id" end as id,
case when _airbyte_data."currency" != '' then _airbyte_data."currency" end as currency,
@@ -18,7 +18,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization._airbyte_raw_dedup_exchange_rate as table_alias
from "integrationtests".test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate as table_alias
-- dedup_exchange_rate
where 1 = 1

View File

@@ -1,13 +1,13 @@
delete from "integrationtests".test_normalization."dedup_exchange_rate_scd"
delete from "integrationtests".test_normalization_bhhpj."dedup_exchange_rate_scd"
where (_airbyte_unique_key_scd) in (
select (_airbyte_unique_key_scd)
from "dedup_exchange_rate_scd__dbt_tmp"
);
insert into "integrationtests".test_normalization."dedup_exchange_rate_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "currency", "new_column", "date", "timestamp_col", "hkd@spéçiäl & characters", "nzd", "usd", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
insert into "integrationtests".test_normalization_bhhpj."dedup_exchange_rate_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "currency", "new_column", "date", "timestamp_col", "hkd@spéçiäl & characters", "nzd", "usd", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
(
select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "currency", "new_column", "date", "timestamp_col", "hkd@spéçiäl & characters", "nzd", "usd", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid"
from "dedup_exchange_rate_scd__dbt_tmp"

View File

@@ -1,13 +1,13 @@
delete from "integrationtests".test_normalization."dedup_exchange_rate"
delete from "integrationtests".test_normalization_bhhpj."dedup_exchange_rate"
where (_airbyte_unique_key) in (
select (_airbyte_unique_key)
from "dedup_exchange_rate__dbt_tmp"
);
insert into "integrationtests".test_normalization."dedup_exchange_rate" ("_airbyte_unique_key", "id", "currency", "new_column", "date", "timestamp_col", "hkd@spéçiäl & characters", "nzd", "usd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
insert into "integrationtests".test_normalization_bhhpj."dedup_exchange_rate" ("_airbyte_unique_key", "id", "currency", "new_column", "date", "timestamp_col", "hkd@spéçiäl & characters", "nzd", "usd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
(
select "_airbyte_unique_key", "id", "currency", "new_column", "date", "timestamp_col", "hkd@spéçiäl & characters", "nzd", "usd", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid"
from "dedup_exchange_rate__dbt_tmp"

View File

@@ -1,7 +1,7 @@
create table
"integrationtests".test_normalization."exchange_rate__dbt_tmp"
"integrationtests".test_normalization_bhhpj."exchange_rate__dbt_tmp"
compound sortkey(_airbyte_emitted_at)
@@ -11,7 +11,7 @@
with __dbt__cte__exchange_rate_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization._airbyte_raw_exchange_rate
-- depends_on: "integrationtests".test_normalization_bhhpj._airbyte_raw_exchange_rate
select
case when _airbyte_data."id" != '' then _airbyte_data."id" end as id,
case when _airbyte_data."currency" != '' then _airbyte_data."currency" end as currency,
@@ -25,7 +25,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization._airbyte_raw_exchange_rate as table_alias
from "integrationtests".test_normalization_bhhpj._airbyte_raw_exchange_rate as table_alias
-- exchange_rate
where 1 = 1
), __dbt__cte__exchange_rate_ab2 as (
@@ -89,6 +89,6 @@ select
getdate() as _airbyte_normalized_at,
_airbyte_exchange_rate_hashid
from __dbt__cte__exchange_rate_ab3
-- exchange_rate from "integrationtests".test_normalization._airbyte_raw_exchange_rate
-- exchange_rate from "integrationtests".test_normalization_bhhpj._airbyte_raw_exchange_rate
where 1 = 1
);

View File

@@ -1,11 +1,11 @@
create view "integrationtests"._airbyte_test_normalization."dedup_exchange_rate_stg__dbt_tmp" as (
create view "integrationtests"._airbyte_test_normalization_bhhpj."dedup_exchange_rate_stg__dbt_tmp" as (
with __dbt__cte__dedup_exchange_rate_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "integrationtests".test_normalization._airbyte_raw_dedup_exchange_rate
-- depends_on: "integrationtests".test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate
select
case when _airbyte_data."id" != '' then _airbyte_data."id" end as id,
case when _airbyte_data."currency" != '' then _airbyte_data."currency" end as currency,
@@ -18,7 +18,7 @@ select
_airbyte_ab_id,
_airbyte_emitted_at,
getdate() as _airbyte_normalized_at
from "integrationtests".test_normalization._airbyte_raw_dedup_exchange_rate as table_alias
from "integrationtests".test_normalization_bhhpj._airbyte_raw_dedup_exchange_rate as table_alias
-- dedup_exchange_rate
where 1 = 1

View File

@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
@@ -43,10 +43,10 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
@@ -84,6 +84,10 @@ vars:
UNNEST_ALIAS_AB2: TEST_NORMALIZATION._AIRBYTE_RAW_UNNEST_ALIAS
UNNEST_ALIAS_AB3: TEST_NORMALIZATION._AIRBYTE_RAW_UNNEST_ALIAS
UNNEST_ALIAS: TEST_NORMALIZATION._AIRBYTE_RAW_UNNEST_ALIAS
ARRAYS_AB1: TEST_NORMALIZATION._AIRBYTE_RAW_ARRAYS
ARRAYS_AB2: TEST_NORMALIZATION._AIRBYTE_RAW_ARRAYS
ARRAYS_AB3: TEST_NORMALIZATION._AIRBYTE_RAW_ARRAYS
ARRAYS: TEST_NORMALIZATION._AIRBYTE_RAW_ARRAYS
NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_PARTITION_AB1: TEST_NORMALIZATION._AIRBYTE_RAW_NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES
NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_PARTITION_AB2: TEST_NORMALIZATION._AIRBYTE_RAW_NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES
NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_PARTITION_AB3: TEST_NORMALIZATION._AIRBYTE_RAW_NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES
@@ -96,6 +100,10 @@ vars:
UNNEST_ALIAS_CHILDREN_AB2: TEST_NORMALIZATION._AIRBYTE_RAW_UNNEST_ALIAS
UNNEST_ALIAS_CHILDREN_AB3: TEST_NORMALIZATION._AIRBYTE_RAW_UNNEST_ALIAS
UNNEST_ALIAS_CHILDREN: TEST_NORMALIZATION._AIRBYTE_RAW_UNNEST_ALIAS
ARRAYS_NESTED_ARRAY_PARENT_AB1: TEST_NORMALIZATION._AIRBYTE_RAW_ARRAYS
ARRAYS_NESTED_ARRAY_PARENT_AB2: TEST_NORMALIZATION._AIRBYTE_RAW_ARRAYS
ARRAYS_NESTED_ARRAY_PARENT_AB3: TEST_NORMALIZATION._AIRBYTE_RAW_ARRAYS
ARRAYS_NESTED_ARRAY_PARENT: TEST_NORMALIZATION._AIRBYTE_RAW_ARRAYS
NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_PARTITION_DOUBLE_ARRAY_DATA_AB1: TEST_NORMALIZATION._AIRBYTE_RAW_NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES
NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_PARTITION_DOUBLE_ARRAY_DATA_AB2: TEST_NORMALIZATION._AIRBYTE_RAW_NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES
NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES_PARTITION_DOUBLE_ARRAY_DATA_AB3: TEST_NORMALIZATION._AIRBYTE_RAW_NESTED_STREAM_WITH_COMPLEX_COLUMNS_RESULTING_INTO_LONG_NAMES

View File

@@ -6,6 +6,7 @@ sources:
schema: false
identifier: false
tables:
- name: _AIRBYTE_RAW_ARRAYS
- name: _AIRBYTE_RAW_CONFLICT_STREAM_ARRAY
- name: _AIRBYTE_RAW_CONFLICT_STREAM_NAME
- name: _AIRBYTE_RAW_CONFLICT_STREAM_SCALAR

View File

@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
model-paths:
- models
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
- tests
seed-paths:
- data
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
@@ -43,10 +43,10 @@ models:
+tags: airbyte_internal_views
+materialized: view
dispatch:
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
- macro_namespace: dbt_utils
search_order:
- airbyte_utils
- dbt_utils
vars:
json_column: _airbyte_data
models_to_source:
@@ -84,3 +84,8 @@ vars:
MULTIPLE_COLUMN_NAMES_CONFLICTS_STG: TEST_NORMALIZATION._AIRBYTE_RAW_MULTIPLE_COLUMN_NAMES_CONFLICTS
MULTIPLE_COLUMN_NAMES_CONFLICTS_SCD: TEST_NORMALIZATION._AIRBYTE_RAW_MULTIPLE_COLUMN_NAMES_CONFLICTS
MULTIPLE_COLUMN_NAMES_CONFLICTS: TEST_NORMALIZATION._AIRBYTE_RAW_MULTIPLE_COLUMN_NAMES_CONFLICTS
TYPES_TESTING_AB1: TEST_NORMALIZATION._AIRBYTE_RAW_TYPES_TESTING
TYPES_TESTING_AB2: TEST_NORMALIZATION._AIRBYTE_RAW_TYPES_TESTING
TYPES_TESTING_STG: TEST_NORMALIZATION._AIRBYTE_RAW_TYPES_TESTING
TYPES_TESTING_SCD: TEST_NORMALIZATION._AIRBYTE_RAW_TYPES_TESTING
TYPES_TESTING: TEST_NORMALIZATION._AIRBYTE_RAW_TYPES_TESTING

View File

@@ -13,3 +13,4 @@ sources:
- name: _AIRBYTE_RAW_MULTIPLE_COLUMN_NAMES_CONFLICTS
- name: _AIRBYTE_RAW_POS_DEDUP_CDCX
- name: _AIRBYTE_RAW_RENAMED_DEDUP_CDC_EXCLUDED
- name: _AIRBYTE_RAW_TYPES_TESTING

View File

@@ -262,6 +262,31 @@
"cursor_field": [],
"destination_sync_mode": "append_dedup",
"primary_key": [["id"]]
},
{
"stream": {
"name": "types_testing",
"json_schema": {
"type": ["null", "object"],
"properties": {
"id": {
"type": "integer"
},
"airbyte_integer_column": {
"type": "number",
"airbyte_type": "integer"
},
"nullable_airbyte_integer_column": {
"type": ["null", "number"],
"airbyte_type": "integer"
}
}
}
},
"sync_mode": "full_refresh",
"cursor_field": [],
"destination_sync_mode": "append_dedup",
"primary_key": [["id"]]
}
]
}

View File

@@ -54,3 +54,14 @@
{"type": "RECORD", "record": {"stream": "1_prefix_startwith_number", "emitted_at": 1602637990800, "data": { "id": 2, "date": "2020-09-01", "text": "hi 4"}}}
{"type":"RECORD","record":{"stream":"multiple_column_names_conflicts","data":{"id":1,"User Id":"chris","user_id":42,"User id":300,"user id": 102,"UserId":101},"emitted_at":1623959926}}
# These records are verified in types_testing_incorrect_values.sql. If you add/remove entries, make sure to update that file as well.
# IMPORTANT: big_integer_column and nullable_big_integer_column were removed from catalog.json because of difficulties in implementing NUMERIC support.
# This is fine, because no major sources currently produce big_integer fields.
# After that functionality is completed, we should restore their entries to catalog.json.
# Verify max value for int64, and a 28-digit value for big_integer. (28 is larger than an int64 can handle, but still within bounds for a BigQuery NUMERIC column)
{"type":"RECORD","record":{"stream":"types_testing","data":{"id":1,"airbyte_integer_column":9223372036854775807,"nullable_airbyte_integer_column":9223372036854775807,"big_integer_column":"1234567890123456789012345678","nullable_big_integer_column":"1234567890123456789012345678"},"emitted_at":1623959926}}
# Verify max value for int64, and a negative 28-digit value for big_integer
{"type":"RECORD","record":{"stream":"types_testing","data":{"id":2,"airbyte_integer_column":-9223372036854775808,"nullable_airbyte_integer_column":-9223372036854775808,"big_integer_column":"-1234567890123456789012345678","nullable_big_integer_column":"-1234567890123456789012345678"},"emitted_at":1623959926}}
# Verify nullable values
{"type":"RECORD","record":{"stream":"types_testing","data":{"id":3,"airbyte_integer_column":0,"big_integer_column":0},"emitted_at":1623959926}}

View File

@@ -25,6 +25,9 @@
},
{
"SIMPLE_STREAMS_SECOND_RUN_ROW_COUNTS": "simple_streams_second_run_row_counts"
},
{
"TYPES_TESTING_INCORRECT_VALUES": "types_testing_incorrect_values"
}
],
"redshift": [],

View File

@@ -37,6 +37,10 @@ union all
union all
select distinct 'pos_dedup_cdcx' as label, count(*) as row_count, 3 as expected_count
from {{ ref('pos_dedup_cdcx') }}
union all
select distinct 'types_testing' as label, count(*) as row_count, 3 as expected_count
from {{ ref('types_testing') }}
)
select *
from table_row_counts

View File

@@ -0,0 +1,34 @@
-- Note that we cast columns_column to string to avoid any weird numeric equality nonsense.
-- For example, in Postgres, this query returns `true`, even though the two numbers are different: (9223372036854775807 is the max value of a signed 64-bit int)
-- select (9223372036854775807 :: double precision) = (9223372036854775806 :: double precision)
-- Because a double has only 15 decimals of precision, so both values are rounded off to 9.223372036854776e+18
select * from {{ ref('types_testing') }} where
(
id = 1 and (
cast(airbyte_integer_column as {{ dbt_utils.type_string() }}) != '9223372036854775807'
or cast(nullable_airbyte_integer_column as {{ dbt_utils.type_string() }}) != '9223372036854775807'
{#
or cast(big_integer_column as {{ dbt_utils.type_string() }}) != '1234567890123456789012345678'
or cast(nullable_big_integer_column as {{ dbt_utils.type_string() }}) != '1234567890123456789012345678'
#}
)
) or (
id = 2 and (
cast(airbyte_integer_column as {{ dbt_utils.type_string() }}) != '-9223372036854775808'
or cast(nullable_airbyte_integer_column as {{ dbt_utils.type_string() }}) != '-9223372036854775808'
{#
or cast(big_integer_column as {{ dbt_utils.type_string() }}) != '-1234567890123456789012345678'
or cast(nullable_big_integer_column as {{ dbt_utils.type_string() }}) != '-1234567890123456789012345678'
#}
)
) or (
id = 3 and (
cast(airbyte_integer_column as {{ dbt_utils.type_string() }}) != '0'
or nullable_airbyte_integer_column is not null
{#
or cast(big_integer_column as {{ dbt_utils.type_string() }}) != '0'
or nullable_big_integer_column is not null
#}
)
)

View File

@@ -17,13 +17,14 @@ from normalization.transform_catalog.table_name_registry import TableNameRegistr
from normalization.transform_catalog.utils import (
is_airbyte_column,
is_array,
is_big_integer,
is_boolean,
is_combining_node,
is_date,
is_datetime,
is_datetime_with_timezone,
is_datetime_without_timezone,
is_integer,
is_long,
is_number,
is_object,
is_simple_property,
@@ -458,11 +459,11 @@ where 1 = 1
if "type" in definition:
if is_array(definition["type"]):
json_extract = jinja_call(f"json_extract_array({json_column_name}, {json_path}, {normalized_json_path})")
if is_simple_property(definition.get("items", {"type": "object"}).get("type", "object")):
if is_simple_property(definition.get("items", {"type": "object"})):
json_extract = jinja_call(f"json_extract_string_array({json_column_name}, {json_path}, {normalized_json_path})")
elif is_object(definition["type"]):
json_extract = jinja_call(f"json_extract('{table_alias}', {json_column_name}, {json_path}, {normalized_json_path})")
elif is_simple_property(definition["type"]):
elif is_simple_property(definition):
json_extract = jinja_call(f"json_extract_scalar({json_column_name}, {json_path}, {normalized_json_path})")
return f"{json_extract} as {column_name}"
@@ -511,10 +512,12 @@ where 1 = 1
elif is_object(definition["type"]):
sql_type = jinja_call("type_json()")
# Treat simple types from narrower to wider scope type: boolean < integer < number < string
elif is_boolean(definition["type"]):
elif is_boolean(definition["type"], definition):
cast_operation = jinja_call(f"cast_to_boolean({jinja_column})")
return f"{cast_operation} as {column_name}"
elif is_integer(definition["type"]):
elif is_big_integer(definition):
sql_type = jinja_call("type_very_large_integer()")
elif is_long(definition["type"], definition):
sql_type = jinja_call("dbt_utils.type_bigint()")
elif is_number(definition["type"]):
sql_type = jinja_call("dbt_utils.type_float()")
@@ -713,7 +716,7 @@ where 1 = 1
if "type" not in definition:
col = column_name
elif is_boolean(definition["type"]):
elif is_boolean(definition["type"], definition):
col = f"boolean_to_string({column_name})"
elif is_array(definition["type"]):
col = f"array_to_string({column_name})"
@@ -1460,7 +1463,7 @@ def find_properties_object(path: List[str], field: str, properties) -> Dict[str,
elif "properties" in properties:
# we found a properties object
return {current: properties["properties"]}
elif "type" in properties and is_simple_property(properties["type"]):
elif "type" in properties and is_simple_property(properties):
# we found a basic type
return {current: {}}
elif isinstance(properties, dict):

View File

@@ -63,15 +63,26 @@ def is_number(property_type) -> bool:
return property_type == "number" or "number" in property_type
def is_integer(property_type) -> bool:
def is_big_integer(definition: dict) -> bool:
return "airbyte_type" in definition and definition["airbyte_type"] == "big_integer"
def is_long(property_type, definition: dict) -> bool:
# Check specifically for {type: number, airbyte_type: integer}
if (
(property_type == "number" or "number" in property_type)
and "airbyte_type" in definition
and definition["airbyte_type"] == "integer"
):
return True
if is_string(property_type) or is_number(property_type):
# Handle union type, give priority to wider scope types
return False
return property_type == "integer" or "integer" in property_type
def is_boolean(property_type) -> bool:
if is_string(property_type) or is_number(property_type) or is_integer(property_type):
def is_boolean(property_type, definition: dict) -> bool:
if is_string(property_type) or is_number(property_type) or is_big_integer(definition) or is_long(property_type, definition):
# Handle union type, give priority to wider scope types
return False
return property_type == "boolean" or "boolean" in property_type
@@ -89,8 +100,18 @@ def is_airbyte_column(name: str) -> bool:
return name.startswith("_airbyte_")
def is_simple_property(property_type) -> bool:
return is_string(property_type) or is_integer(property_type) or is_number(property_type) or is_boolean(property_type)
def is_simple_property(definition: dict) -> bool:
if "type" not in definition:
property_type = "object"
else:
property_type = definition["type"]
return (
is_string(property_type)
or is_big_integer(definition)
or is_long(property_type, definition)
or is_number(property_type)
or is_boolean(property_type, definition)
)
def is_combining_node(properties: dict) -> Set[str]:

View File

@@ -89,8 +89,8 @@ public abstract class CdcSourceTest {
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME,
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER),
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))));
@@ -474,8 +474,8 @@ public abstract class CdcSourceTest {
.withStream(CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER),
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
@@ -599,8 +599,8 @@ public abstract class CdcSourceTest {
final AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER),
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING));
streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList());
streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
@@ -609,8 +609,8 @@ public abstract class CdcSourceTest {
final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
MODELS_SCHEMA + "_random",
Field.of(COL_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MODEL + "_random", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))

View File

@@ -31,10 +31,6 @@
"airbyte_type": "big_number"
},
"property_integer": { "type": "integer" },
"property_big_integer": {
"type": "string",
"airbyte_type": "big_integer"
},
"property_boolean": { "type": "boolean" }
}
}

View File

@@ -1,2 +1,2 @@
{"type": "RECORD", "record": {"stream": "object_array_test_1", "emitted_at": 1602637589100, "data": { "property_string" : "qqq", "property_array" : [ { "property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": 56.78, "property_big_number": "100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.1234", "property_integer": 42, "property_big_integer": "123141241234124123141241234124123141241234124123141241234124123141241234124", "property_boolean": true } ] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "RECORD", "record": {"stream": "object_array_test_1", "emitted_at": 1602637589100, "data": { "property_string" : "qqq", "property_array" : [ { "property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": 56.78, "property_big_number": "100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.1234", "property_integer": 42, "property_boolean": true } ] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}

Some files were not shown because too many files have changed in this diff Show More