AirbyteLib: Case insensitive missing column checks, deterministic column ordering in duckdb inserts (#34824)
This commit is contained in:
committed by
GitHub
parent
45803a3ff9
commit
615536323c
@@ -17,6 +17,7 @@ from airbyte_lib._file_writers.base import (
|
||||
FileWriterBatchHandle,
|
||||
FileWriterConfigBase,
|
||||
)
|
||||
from airbyte_lib._util.text_util import lower_case_set
|
||||
|
||||
|
||||
class ParquetWriterConfig(FileWriterConfigBase):
|
||||
@@ -47,12 +48,19 @@ class ParquetWriter(FileWriterBase):
|
||||
stream_name: str,
|
||||
record_batch: pa.Table,
|
||||
) -> list[str]:
|
||||
"""Return a list of columns that are missing in the batch."""
|
||||
"""Return a list of columns that are missing in the batch.
|
||||
|
||||
The comparison is based on a case-insensitive comparison of the column names.
|
||||
"""
|
||||
if not self._catalog_manager:
|
||||
raise exc.AirbyteLibInternalError(message="Catalog manager should exist but does not.")
|
||||
stream = self._catalog_manager.get_stream_config(stream_name)
|
||||
stream_property_names = stream.stream.json_schema["properties"].keys()
|
||||
return [col for col in stream_property_names if col not in record_batch.schema.names]
|
||||
return [
|
||||
col
|
||||
for col in stream_property_names
|
||||
if col.lower() not in lower_case_set(record_batch.schema.names)
|
||||
]
|
||||
|
||||
@overrides
|
||||
def _write_batch(
|
||||
|
||||
15
airbyte-lib/airbyte_lib/_util/text_util.py
Normal file
15
airbyte-lib/airbyte_lib/_util/text_util.py
Normal file
@@ -0,0 +1,15 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
|
||||
"""Internal utility functions for dealing with text."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
|
||||
|
||||
def lower_case_set(str_iter: Iterable[str]) -> set[str]:
|
||||
"""Converts a list of strings to a set of lower case strings."""
|
||||
return {s.lower() for s in str_iter}
|
||||
@@ -29,6 +29,7 @@ from sqlalchemy.sql.elements import TextClause
|
||||
from airbyte_lib import exceptions as exc
|
||||
from airbyte_lib._file_writers.base import FileWriterBase, FileWriterBatchHandle
|
||||
from airbyte_lib._processors import BatchHandle, RecordProcessor
|
||||
from airbyte_lib._util.text_util import lower_case_set
|
||||
from airbyte_lib.caches._catalog_manager import CatalogManager
|
||||
from airbyte_lib.config import CacheConfigBase
|
||||
from airbyte_lib.datasets._sql import CachedDataset
|
||||
@@ -407,12 +408,19 @@ class SQLCacheBase(RecordProcessor):
|
||||
stream_column_names: list[str] = json_schema["properties"].keys()
|
||||
table_column_names: list[str] = self.get_sql_table(stream_name).columns.keys()
|
||||
|
||||
missing_columns: set[str] = set(stream_column_names) - set(table_column_names)
|
||||
lower_case_table_column_names = lower_case_set(table_column_names)
|
||||
missing_columns = [
|
||||
stream_col
|
||||
for stream_col in stream_column_names
|
||||
if stream_col.lower() not in lower_case_table_column_names
|
||||
]
|
||||
if missing_columns:
|
||||
if raise_on_error:
|
||||
raise exc.AirbyteLibCacheTableValidationError(
|
||||
violation="Cache table is missing expected columns.",
|
||||
context={
|
||||
"stream_column_names": stream_column_names,
|
||||
"table_column_names": table_column_names,
|
||||
"missing_columns": missing_columns,
|
||||
},
|
||||
)
|
||||
@@ -870,7 +878,7 @@ class SQLCacheBase(RecordProcessor):
|
||||
|
||||
# Define a condition that checks for records in temp_table that do not have a corresponding
|
||||
# record in final_table
|
||||
where_not_exists_clause = final_table.c.id == null()
|
||||
where_not_exists_clause = getattr(final_table.c, pk_columns[0]) == null()
|
||||
|
||||
# Select records from temp_table that are not in final_table
|
||||
select_new_records_stmt = (
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from textwrap import dedent
|
||||
from textwrap import dedent, indent
|
||||
from typing import cast
|
||||
|
||||
from overrides import overrides
|
||||
@@ -172,12 +172,20 @@ class DuckDBCache(DuckDBCacheBase):
|
||||
stream_name=stream_name,
|
||||
batch_id=batch_id,
|
||||
)
|
||||
columns_list = list(self._get_sql_column_definitions(stream_name).keys())
|
||||
columns_list_str = indent("\n, ".join(columns_list), " ")
|
||||
files_list = ", ".join([f"'{f!s}'" for f in files])
|
||||
insert_statement = dedent(
|
||||
f"""
|
||||
INSERT INTO {self.config.schema_name}.{temp_table_name}
|
||||
SELECT * FROM read_parquet(
|
||||
[{files_list}]
|
||||
(
|
||||
{columns_list_str}
|
||||
)
|
||||
SELECT
|
||||
{columns_list_str}
|
||||
FROM read_parquet(
|
||||
[{files_list}],
|
||||
union_by_name = true
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -76,7 +76,7 @@ def _get_secret_from_source(
|
||||
if (
|
||||
source in [SecretSource.GOOGLE_COLAB, SecretSource.ANY]
|
||||
and colab_userdata is not None
|
||||
and colab_userdata.get(secret_name, None)
|
||||
and colab_userdata.get(secret_name)
|
||||
):
|
||||
return colab_userdata.get(secret_name)
|
||||
|
||||
|
||||
@@ -26,7 +26,8 @@ from airbyte_protocol.models import (
|
||||
|
||||
from airbyte_lib import exceptions as exc
|
||||
from airbyte_lib._factories.cache_factories import get_default_cache
|
||||
from airbyte_lib._util import protocol_util # Internal utility functions
|
||||
from airbyte_lib._util import protocol_util
|
||||
from airbyte_lib._util.text_util import lower_case_set # Internal utility functions
|
||||
from airbyte_lib.datasets._lazy import LazyDataset
|
||||
from airbyte_lib.progress import progress
|
||||
from airbyte_lib.results import ReadResult
|
||||
@@ -300,13 +301,17 @@ class Source:
|
||||
) from KeyError(stream)
|
||||
|
||||
configured_stream = configured_catalog.streams[0]
|
||||
col_list = configured_stream.stream.json_schema["properties"].keys()
|
||||
all_properties = set(configured_stream.stream.json_schema["properties"].keys())
|
||||
|
||||
def _with_missing_columns(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
|
||||
"""Add missing columns to the record with null values."""
|
||||
for record in records:
|
||||
appended_columns = set(col_list) - set(record.keys())
|
||||
appended_dict = {col: None for col in appended_columns}
|
||||
existing_properties_lower = lower_case_set(record.keys())
|
||||
appended_dict = {
|
||||
prop: None
|
||||
for prop in all_properties
|
||||
if prop.lower() not in existing_properties_lower
|
||||
}
|
||||
yield {**record, **appended_dict}
|
||||
|
||||
iterator: Iterator[dict[str, Any]] = _with_missing_columns(
|
||||
|
||||
@@ -4,7 +4,7 @@ import CodeBlock from '@theme/CodeBlock';
|
||||
|
||||
/**
|
||||
* Generate a fake config based on the spec.
|
||||
*
|
||||
*
|
||||
* As our specs are not 100% consistent, errors may occur.
|
||||
* Try to generate a few times before giving up.
|
||||
*/
|
||||
@@ -40,7 +40,7 @@ export const AirbyteLibExample = ({
|
||||
|
||||
config = ${fakeConfig}
|
||||
|
||||
result = ab.get_connector(
|
||||
result = ab.get_source(
|
||||
"${connector}",
|
||||
config=config,
|
||||
).read()
|
||||
|
||||
Reference in New Issue
Block a user