From f790fee57cfdc13fbfd4249cfcc45c8f09e8f65e Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Mon, 9 Aug 2021 18:11:22 +0100 Subject: [PATCH] Populate the PK from the Singer discovery run (#2713) (#4789) When running Singer discovery, use the `key_properties` field to populate the `source_defined_primary_key` stream meta. --- airbyte-cdk/python/CHANGELOG.md | 8 +- .../sources/singer/singer_helpers.py | 28 ++++- .../airbyte_cdk/sources/singer/source.py | 12 +- airbyte-cdk/python/setup.py | 2 +- .../unit_tests/singer/test_singer_helpers.py | 51 +++++++- .../unit_tests/singer/test_singer_source.py | 109 +++++++++++++++++- 6 files changed, 194 insertions(+), 16 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index b553bd0a578..425948d15da 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,7 +1,13 @@ # Changelog +## 0.1.8 +Allow to fetch primary key info from singer catalog + +## 0.1.7 +Allow to use non-JSON payloads in request body for http source + ## 0.1.6 -Add abstraction for creating destinations. +Add abstraction for creating destinations. Fix logging of the initial state. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/singer/singer_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/singer/singer_helpers.py index d3a1d9d258a..19a4e40fd1a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/singer/singer_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/singer/singer_helpers.py @@ -124,11 +124,14 @@ class SingerHelper: field_object["type"] = SingerHelper._parse_type(field_object["type"]) @staticmethod - def singer_catalog_to_airbyte_catalog(singer_catalog: Dict[str, any], sync_mode_overrides: Dict[str, SyncModeInfo]) -> AirbyteCatalog: + def singer_catalog_to_airbyte_catalog( + singer_catalog: Dict[str, any], sync_mode_overrides: Dict[str, SyncModeInfo], primary_key_overrides: Dict[str, List[str]] + ) -> AirbyteCatalog: """ :param singer_catalog: :param sync_mode_overrides: A dict from stream name to the sync modes it should use. Each stream in this dict must exist in the Singer catalog, but not every stream in the catalog should exist in this + :param primary_key_overrides: A dict of stream name -> list of fields to be used as PKs. :return: Airbyte Catalog """ airbyte_streams = [] @@ -138,28 +141,41 @@ class SingerHelper: airbyte_stream = AirbyteStream(name=name, json_schema=schema) if name in sync_mode_overrides: override_sync_modes(airbyte_stream, sync_mode_overrides[name]) - else: set_sync_modes_from_metadata(airbyte_stream, stream.get("metadata", [])) + if name in primary_key_overrides: + airbyte_stream.source_defined_primary_key = [[k] for k in primary_key_overrides[name]] + elif stream.get("key_properties"): + airbyte_stream.source_defined_primary_key = [[k] for k in stream["key_properties"]] + airbyte_streams += [airbyte_stream] return AirbyteCatalog(streams=airbyte_streams) @staticmethod - def get_catalogs(logger, shell_command: str, sync_mode_overrides: Dict[str, SyncModeInfo], excluded_streams: List) -> Catalogs: + def _read_singer_catalog(logger, shell_command: str) -> Mapping[str, Any]: completed_process = subprocess.run( shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) - for line in completed_process.stderr.splitlines(): logger.log_by_prefix(line, "ERROR") - singer_catalog = json.loads(completed_process.stdout) + return json.loads(completed_process.stdout) + + @staticmethod + def get_catalogs( + logger, + shell_command: str, + sync_mode_overrides: Dict[str, SyncModeInfo], + primary_key_overrides: Dict[str, List[str]], + excluded_streams: List, + ) -> Catalogs: + singer_catalog = SingerHelper._read_singer_catalog(logger, shell_command) streams = singer_catalog.get("streams", []) if streams and excluded_streams: singer_catalog["streams"] = [stream for stream in streams if stream["stream"] not in excluded_streams] - airbyte_catalog = SingerHelper.singer_catalog_to_airbyte_catalog(singer_catalog, sync_mode_overrides) + airbyte_catalog = SingerHelper.singer_catalog_to_airbyte_catalog(singer_catalog, sync_mode_overrides, primary_key_overrides) return Catalogs(singer_catalog=singer_catalog, airbyte_catalog=airbyte_catalog) @staticmethod diff --git a/airbyte-cdk/python/airbyte_cdk/sources/singer/source.py b/airbyte-cdk/python/airbyte_cdk/sources/singer/source.py index 623b264cf2f..c3a81c8f15f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/singer/source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/singer/source.py @@ -100,7 +100,9 @@ class SingerSource(Source): def _discover_internal(self, logger: AirbyteLogger, config_path: str) -> Catalogs: cmd = self.discover_cmd(logger, config_path) - catalogs = SingerHelper.get_catalogs(logger, cmd, self.get_sync_mode_overrides(), self.get_excluded_streams()) + catalogs = SingerHelper.get_catalogs( + logger, cmd, self.get_sync_mode_overrides(), self.get_primary_key_overrides(), self.get_excluded_streams() + ) return catalogs def check(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteConnectionStatus: @@ -147,6 +149,14 @@ class SingerSource(Source): """ return {} + def get_primary_key_overrides(self) -> Dict[str, List[str]]: + """ + Similar to get_sync_mode_overrides but for primary keys. + + :return: A dict from stream name to the list of primary key fields for the stream. + """ + return {} + def get_excluded_streams(self) -> List[str]: """ This method provide ability to exclude some streams from catalog diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 7841342744f..b554ccf9fbb 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ README = (HERE / "README.md").read_text() setup( name="airbyte-cdk", - version="0.1.7", + version="0.1.8", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/singer/test_singer_helpers.py b/airbyte-cdk/python/unit_tests/singer/test_singer_helpers.py index d7df7433824..28b01a1de6e 100644 --- a/airbyte-cdk/python/unit_tests/singer/test_singer_helpers.py +++ b/airbyte-cdk/python/unit_tests/singer/test_singer_helpers.py @@ -23,9 +23,54 @@ # +import copy + from airbyte_cdk.sources.singer import SingerHelper +basic_singer_catalog = { + "streams": [ + { + "type": "SCHEMA", + "stream": "users", + "schema": { + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "updated_at": {"type": "string", "format": "date-time"}, + } + }, + "key_properties": ["id"], + "bookmark_properties": ["updated_at"], + } + ] +} -def test_singer_helper(): - # TODO write tests. for now this just verifies the file imports correctly. - assert SingerHelper is not None + +def test_singer_catalog_to_airbyte_catalog(): + airbyte_catalog = SingerHelper.singer_catalog_to_airbyte_catalog( + singer_catalog=basic_singer_catalog, sync_mode_overrides={}, primary_key_overrides={} + ) + + user_stream = airbyte_catalog.streams[0] + assert user_stream.source_defined_primary_key == [["id"]] + + +def test_singer_catalog_to_airbyte_catalog_composite_pk(): + singer_catalog = copy.deepcopy(basic_singer_catalog) + singer_catalog["streams"][0]["key_properties"] = ["id", "name"] + + airbyte_catalog = SingerHelper.singer_catalog_to_airbyte_catalog( + singer_catalog=singer_catalog, sync_mode_overrides={}, primary_key_overrides={} + ) + + user_stream = airbyte_catalog.streams[0] + assert user_stream.source_defined_primary_key == [["id"], ["name"]] + + +def test_singer_catalog_to_airbyte_catalog_pk_override(): + airbyte_catalog = SingerHelper.singer_catalog_to_airbyte_catalog( + singer_catalog=basic_singer_catalog, sync_mode_overrides={}, primary_key_overrides={"users": ["name"]} + ) + + user_stream = airbyte_catalog.streams[0] + assert user_stream.source_defined_primary_key == [["name"]] diff --git a/airbyte-cdk/python/unit_tests/singer/test_singer_source.py b/airbyte-cdk/python/unit_tests/singer/test_singer_source.py index 98533a56b79..e5081a69d77 100644 --- a/airbyte-cdk/python/unit_tests/singer/test_singer_source.py +++ b/airbyte-cdk/python/unit_tests/singer/test_singer_source.py @@ -23,9 +23,110 @@ # -from airbyte_cdk.sources.singer import SingerSource +import copy +from unittest.mock import patch + +from airbyte_cdk.logger import AirbyteLogger +from airbyte_cdk.models.airbyte_protocol import SyncMode +from airbyte_cdk.sources.singer import SingerHelper, SyncModeInfo +from airbyte_cdk.sources.singer.source import BaseSingerSource, ConfigContainer + +LOGGER = AirbyteLogger() -def test_singer_source_loads(): - # TODO write tests. for now this just verifies the file imports correctly. - assert SingerSource() is not None +class TetsBaseSinger(BaseSingerSource): + tap_cmd = "" + + +USER_STREAM = { + "type": "SCHEMA", + "stream": "users", + "schema": { + "properties": {"id": {"type": "integer"}, "name": {"type": "string"}, "updated_at": {"type": "string", "format": "date-time"}} + }, + "key_properties": ["id"], + "bookmark_properties": ["updated_at"], +} + +ROLES_STREAM = { + "type": "SCHEMA", + "stream": "roles", + "schema": { + "properties": { + "name": {"type": "string"}, + } + }, + "key_properties": ["name"], + "bookmark_properties": ["updated_at"], + "metadata": [ + { + "metadata": { + "inclusion": "available", + "table-key-properties": ["id"], + "selected": True, + "valid-replication-keys": ["name"], + "schema-name": "roles", + }, + "breadcrumb": [], + } + ], +} + +basic_singer_catalog = {"streams": [USER_STREAM, ROLES_STREAM]} + + +@patch.object(SingerHelper, "_read_singer_catalog", return_value=basic_singer_catalog) +def test_singer_discover_single_pk(mock_read_catalog): + airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + _user_stream = airbyte_catalog.streams[0] + _roles_stream = airbyte_catalog.streams[1] + assert _user_stream.source_defined_primary_key == [["id"]] + assert _roles_stream.json_schema == ROLES_STREAM["schema"] + assert _user_stream.json_schema == USER_STREAM["schema"] + + +def test_singer_discover_with_composite_pk(): + singer_catalog_composite_pk = copy.deepcopy(basic_singer_catalog) + singer_catalog_composite_pk["streams"][0]["key_properties"] = ["id", "name"] + with patch.object(SingerHelper, "_read_singer_catalog", return_value=singer_catalog_composite_pk): + airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + + _user_stream = airbyte_catalog.streams[0] + _roles_stream = airbyte_catalog.streams[1] + assert _user_stream.source_defined_primary_key == [["id"], ["name"]] + assert _roles_stream.json_schema == ROLES_STREAM["schema"] + assert _user_stream.json_schema == USER_STREAM["schema"] + + +@patch.object(BaseSingerSource, "get_primary_key_overrides", return_value={"users": ["updated_at"]}) +@patch.object(SingerHelper, "_read_singer_catalog", return_value=basic_singer_catalog) +def test_singer_discover_pk_overrides(mock_pk_override, mock_read_catalog): + airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + _user_stream = airbyte_catalog.streams[0] + _roles_stream = airbyte_catalog.streams[1] + assert _user_stream.source_defined_primary_key == [["updated_at"]] + assert _roles_stream.json_schema == ROLES_STREAM["schema"] + assert _user_stream.json_schema == USER_STREAM["schema"] + + +@patch.object(SingerHelper, "_read_singer_catalog", return_value=basic_singer_catalog) +def test_singer_discover_metadata(mock_read_catalog): + airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + _user_stream = airbyte_catalog.streams[0] + _roles_stream = airbyte_catalog.streams[1] + + assert _user_stream.supported_sync_modes is None + assert _user_stream.default_cursor_field is None + assert _roles_stream.supported_sync_modes == [SyncMode.incremental] + assert _roles_stream.default_cursor_field == ["name"] + + +@patch.object(SingerHelper, "_read_singer_catalog", return_value=basic_singer_catalog) +def test_singer_discover_sync_mode_overrides(mock_read_catalog): + sync_mode_override = SyncModeInfo(supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], default_cursor_field=["name"]) + with patch.object(BaseSingerSource, "get_sync_mode_overrides", return_value={"roles": sync_mode_override}): + airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + + _roles_stream = airbyte_catalog.streams[1] + assert _roles_stream.supported_sync_modes == sync_mode_override.supported_sync_modes + assert _roles_stream.default_cursor_field == sync_mode_override.default_cursor_field