Add normalization test cases (#2992)
* Add normalization test cases * Fix new normalization test on name collisions
This commit is contained in:
@@ -613,18 +613,25 @@ from {{ from_table }}
|
||||
"""
|
||||
tables_registry = union_registries(self.tables_registry, self.local_registry)
|
||||
new_table_name = self.normalized_stream_name()
|
||||
schema = self.get_schema(is_intermediate)
|
||||
if not is_intermediate and self.parent is None:
|
||||
if suffix:
|
||||
norm_suffix = suffix if not suffix or suffix.startswith("_") else f"_{suffix}"
|
||||
new_table_name = new_table_name + norm_suffix
|
||||
# Top-level stream has priority on table_names
|
||||
if self.schema in tables_registry and new_table_name in tables_registry[self.schema]:
|
||||
raise ValueError(f"Conflict: Table name {new_table_name} in schema {self.schema} already exists!")
|
||||
elif not self.parent:
|
||||
new_table_name = get_table_name(self.name_transformer, "", new_table_name, suffix, self.json_path)
|
||||
else:
|
||||
new_table_name = get_table_name(self.name_transformer, "_".join(self.json_path[:-1]), new_table_name, suffix, self.json_path)
|
||||
new_table_name = self.name_transformer.normalize_table_name(new_table_name, False, False)
|
||||
|
||||
if schema in tables_registry and new_table_name in tables_registry[schema]:
|
||||
# Check if new_table_name already exists. If yes, then add hash of the stream name to it
|
||||
new_table_name = self.name_transformer.normalize_table_name(f"{new_table_name}_{hash_name(self.stream_name)}", False, False)
|
||||
if new_table_name in tables_registry[schema]:
|
||||
raise ValueError(
|
||||
f"Conflict: Table name {new_table_name} in schema {schema} already exists! (is there a hashing collision or duplicate streams?)"
|
||||
)
|
||||
|
||||
if not is_intermediate:
|
||||
self.final_table_name = new_table_name
|
||||
return new_table_name
|
||||
@@ -754,9 +761,12 @@ def find_properties_object(path: List[str], field: str, properties) -> Dict[str,
|
||||
|
||||
|
||||
def hash_json_path(json_path: List[str]) -> str:
|
||||
lineage = "&airbyte&".join(json_path)
|
||||
return hash_name("&airbyte&".join(json_path))
|
||||
|
||||
|
||||
def hash_name(input: str) -> str:
|
||||
h = hashlib.sha1()
|
||||
h.update(lineage.encode("utf-8"))
|
||||
h.update(input.encode("utf-8"))
|
||||
return h.hexdigest()[:3]
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
{
|
||||
"streams": [
|
||||
{
|
||||
"stream": {
|
||||
"name": "postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine",
|
||||
"json_schema": {
|
||||
"type": ["null", "object"],
|
||||
"properties": {
|
||||
"id": { "type": ["null", "string"] }
|
||||
}
|
||||
},
|
||||
"supported_sync_modes": ["incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": []
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"cursor_field": [],
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "postgres_has_a_64_characters_and_not_more_limit_to_table_names_but_other_destinations_are_fine",
|
||||
"json_schema": {
|
||||
"type": ["null", "object"],
|
||||
"properties": {
|
||||
"id": { "type": ["null", "string"] }
|
||||
}
|
||||
},
|
||||
"supported_sync_modes": ["incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": []
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"cursor_field": [],
|
||||
"destination_sync_mode": "append"
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"tables": []
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
{
|
||||
"tables": [
|
||||
"postgres_has_a_64_characters_and_not_more_limit_to_table_names_but_other_destinations_are_fine",
|
||||
"postgres_has_a_64_characters_and_not_more_limit_to_table_names_but_other_destinations_are_fine_ab1",
|
||||
"postgres_has_a_64_characters_and_not_more_limit_to_table_names_but_other_destinations_are_fine_ab2",
|
||||
"postgres_has_a_64_characters_and_not_more_limit_to_table_names_but_other_destinations_are_fine_ab3",
|
||||
|
||||
"postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine",
|
||||
"postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine_ab1",
|
||||
"postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine_ab2",
|
||||
"postgres_has_a_64_characters_limit_to_table_names_but_other_destinations_are_fine_ab3"
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"tables": [
|
||||
"postgres_has_a_64_ch__destinations_are_fine",
|
||||
"postgres_has_a_64_ch__destinations_are_fine_611",
|
||||
"postgres_has_a_64_ch__inations_are_fine_ab1",
|
||||
"postgres_has_a_64_ch__inations_are_fine_ab1_611",
|
||||
"postgres_has_a_64_ch__inations_are_fine_ab2",
|
||||
"postgres_has_a_64_ch__inations_are_fine_ab2_611",
|
||||
"postgres_has_a_64_ch__inations_are_fine_ab3",
|
||||
"postgres_has_a_64_ch__inations_are_fine_ab3_611"
|
||||
]
|
||||
}
|
||||
@@ -171,6 +171,7 @@ def test_normalize_column_name(input_str: str, destination_type: str, expected:
|
||||
("Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iii", "Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iii"),
|
||||
# over the limit
|
||||
("Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iiii", "Aaaa_Bbbb_Cccc_Dddd___e_Ffff_Gggg_Hhhh_Iiii"),
|
||||
("Aaaa_Bbbb_Cccc_Dddd_a_very_long_name_Ffff_Gggg_Hhhh_Iiii", "Aaaa_Bbbb_Cccc_Dddd___e_Ffff_Gggg_Hhhh_Iiii"),
|
||||
("Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iiii_Jjjj_Kkkk", "Aaaa_Bbbb_Cccc_Dddd___g_Hhhh_Iiii_Jjjj_Kkkk"),
|
||||
("ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz_0123456789", "ABCDEFGHIJKLMNOPQRST__qrstuvwxyz_0123456789"),
|
||||
],
|
||||
|
||||
@@ -44,6 +44,7 @@ def setup_test_path():
|
||||
@pytest.mark.parametrize(
|
||||
"catalog_file",
|
||||
[
|
||||
"edge_cases_catalog",
|
||||
"hubspot_catalog",
|
||||
"facebook_catalog",
|
||||
"stripe_catalog",
|
||||
@@ -179,7 +180,43 @@ def test_generate_new_table_name(stream_name: str, is_intermediate: bool, suffix
|
||||
primary_key=[],
|
||||
json_column_name="json_column_name",
|
||||
properties=[],
|
||||
tables_registry=set(),
|
||||
tables_registry=dict(),
|
||||
from_table="",
|
||||
)
|
||||
assert stream_processor.generate_new_table_name(is_intermediate=is_intermediate, suffix=suffix) == expected
|
||||
assert stream_processor.final_table_name == expected_final_name
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"stream_name, is_intermediate, suffix, expected, expected_final_name",
|
||||
[
|
||||
("stream_name", False, "", "stream_name_485", "stream_name_485"),
|
||||
("stream_name", False, "suffix", "stream_name_suffix_485", "stream_name_suffix_485"),
|
||||
("stream_name", False, "_suffix", "stream_name_suffix_485", "stream_name_suffix_485"),
|
||||
("stream_name", True, "suffix", "stream_name_suffix_485", ""),
|
||||
("stream_name", True, "_suffix", "stream_name_suffix_485", ""),
|
||||
],
|
||||
)
|
||||
def test_collisions_generate_new_table_name(stream_name: str, is_intermediate: bool, suffix: str, expected: str, expected_final_name: str):
|
||||
# fill test_registry with the same stream names as if it was already used so there would be collisions...
|
||||
test_registry = dict()
|
||||
test_registry["schema_name"] = set()
|
||||
test_registry["schema_name"].add("stream_name")
|
||||
test_registry["schema_name"].add("stream_name_suffix")
|
||||
test_registry["raw_schema"] = set()
|
||||
test_registry["raw_schema"].add("stream_name_suffix")
|
||||
stream_processor = StreamProcessor.create(
|
||||
stream_name=stream_name,
|
||||
destination_type=DestinationType.POSTGRES,
|
||||
raw_schema="raw_schema",
|
||||
schema="schema_name",
|
||||
source_sync_mode=SyncMode.full_refresh,
|
||||
destination_sync_mode=DestinationSyncMode.append_dedup,
|
||||
cursor_field=[],
|
||||
primary_key=[],
|
||||
json_column_name="json_column_name",
|
||||
properties=[],
|
||||
tables_registry=test_registry,
|
||||
from_table="",
|
||||
)
|
||||
assert stream_processor.generate_new_table_name(is_intermediate=is_intermediate, suffix=suffix) == expected
|
||||
@@ -208,7 +245,7 @@ def test_nested_generate_new_table_name(stream_name: str, is_intermediate: bool,
|
||||
primary_key=[],
|
||||
json_column_name="json_column_name",
|
||||
properties=[],
|
||||
tables_registry=set(),
|
||||
tables_registry=dict(),
|
||||
from_table="",
|
||||
)
|
||||
nested_stream_processor = StreamProcessor.create_from_parent(
|
||||
|
||||
Reference in New Issue
Block a user