diff --git a/airbyte-cdk/python/connector_builder/connector_builder_handler.py b/airbyte-cdk/python/connector_builder/connector_builder_handler.py index 7efc4d9718b..2b57ca8b163 100644 --- a/airbyte-cdk/python/connector_builder/connector_builder_handler.py +++ b/airbyte-cdk/python/connector_builder/connector_builder_handler.py @@ -4,21 +4,19 @@ import dataclasses from datetime import datetime -from typing import Any, Mapping +from typing import Any, Dict, List, Mapping +from urllib.parse import urljoin from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog from airbyte_cdk.models import Type from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.utils.traced_exception import AirbyteTracedException from connector_builder.message_grouper import MessageGrouper - -def list_streams() -> AirbyteMessage: - raise NotImplementedError - - DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5 DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5 DEFAULT_MAX_RECORDS = 100 @@ -58,5 +56,40 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: return error.as_airbyte_message() +def list_streams(source: ManifestDeclarativeSource, config: Dict[str, Any]) -> AirbyteMessage: + try: + streams = [ + {"name": http_stream.name, "url": urljoin(http_stream.url_base, http_stream.path())} + for http_stream in _get_http_streams(source, config) + ] + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + data={"streams": streams}, + emitted_at=_emitted_at(), + stream="list_streams", + ), + ) + except Exception as exc: + return AirbyteTracedException.from_exception(exc, message="Error listing streams.").as_airbyte_message() + + +def _get_http_streams(source: ManifestDeclarativeSource, config: Dict[str, Any]) -> List[HttpStream]: + http_streams = [] + for stream in source.streams(config=config): + if isinstance(stream, DeclarativeStream): + if isinstance(stream.retriever, HttpStream): + http_streams.append(stream.retriever) + else: + raise TypeError( + f"A declarative stream should only have a retriever of type HttpStream, but received: {stream.retriever.__class__}" + ) + else: + raise TypeError( + f"A declarative source should only contain streams of type DeclarativeStream, but received: {stream.__class__}" + ) + return http_streams + + def _emitted_at(): return int(datetime.now().timestamp()) * 1000 diff --git a/airbyte-cdk/python/connector_builder/main.py b/airbyte-cdk/python/connector_builder/main.py index 06bf05f0a28..b64fe345139 100644 --- a/airbyte-cdk/python/connector_builder/main.py +++ b/airbyte-cdk/python/connector_builder/main.py @@ -11,7 +11,7 @@ from airbyte_cdk.entrypoint import AirbyteEntrypoint from airbyte_cdk.models import ConfiguredAirbyteCatalog from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.utils.traced_exception import AirbyteTracedException -from connector_builder.connector_builder_handler import read_stream, resolve_manifest +from connector_builder.connector_builder_handler import list_streams, read_stream, resolve_manifest def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource: @@ -54,6 +54,8 @@ def handle_connector_builder_request( elif command == "test_read": assert catalog is not None, "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." return read_stream(source, config, catalog) + elif command == "list_streams": + return list_streams(source, config) else: raise ValueError(f"Unrecognized command {command}.") diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index e402cb90073..c4d5461b3d7 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -11,8 +11,11 @@ import connector_builder import pytest from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from connector_builder.connector_builder_handler import resolve_manifest +from airbyte_cdk.sources.streams.core import Stream +from airbyte_cdk.sources.streams.http import HttpStream +from connector_builder.connector_builder_handler import list_streams, resolve_manifest from connector_builder.main import handle_connector_builder_request, handle_request, read_stream from connector_builder.models import StreamRead, StreamReadSlicesInner, StreamReadSlicesInnerPagesInner from unit_tests.connector_builder.utils import create_configured_catalog @@ -32,7 +35,7 @@ MANIFEST = { "page_size": 10, "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10}, }, "requester": { "path": "/v3/marketing/lists", @@ -162,7 +165,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "page_size": 10, "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10}, }, "requester": { "path": "/v3/marketing/lists", @@ -212,6 +215,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "primary_key": _stream_primary_key, "url_base": _stream_url_base, "$parameters": _stream_options, + "page_size": 10 }, "name": _stream_name, "primary_key": _stream_primary_key, @@ -364,3 +368,84 @@ def test_missing_config(valid_resolve_manifest_config_file): def test_invalid_config_command(invalid_config_file, dummy_catalog): with pytest.raises(ValueError): handle_request(["read", "--config", str(invalid_config_file), "--catalog", str(dummy_catalog)]) + + +@pytest.fixture +def manifest_declarative_source(): + return mock.Mock(spec=ManifestDeclarativeSource, autospec=True) + + +def test_list_streams(manifest_declarative_source): + manifest_declarative_source.streams.return_value = [ + create_mock_declarative_stream(create_mock_http_stream("a name", "https://a-url-base.com", "a-path")), + create_mock_declarative_stream(create_mock_http_stream("another name", "https://another-url-base.com", "another-path")), + ] + + result = list_streams(manifest_declarative_source, {}) + + assert result.type == MessageType.RECORD + assert result.record.stream == "list_streams" + assert result.record.data == {"streams": [ + {"name": "a name", "url": "https://a-url-base.com/a-path"}, + {"name": "another name", "url": "https://another-url-base.com/another-path"} + ]} + + +def test_given_stream_is_not_declarative_stream_when_list_streams_then_return_exception_message(manifest_declarative_source): + manifest_declarative_source.streams.return_value = [mock.Mock(spec=Stream)] + + error_message = list_streams(manifest_declarative_source, {}) + + assert error_message.type == MessageType.TRACE + assert "Error listing streams." == error_message.trace.error.message + assert "A declarative source should only contain streams of type DeclarativeStream" in error_message.trace.error.internal_message + + +def test_given_declarative_stream_retriever_is_not_http_when_list_streams_then_return_exception_message(manifest_declarative_source): + declarative_stream = mock.Mock(spec=DeclarativeStream) + # `spec=DeclarativeStream` is needed for `isinstance` work but `spec` does not expose dataclasses fields, so we create one ourselves + declarative_stream.retriever = mock.Mock() + manifest_declarative_source.streams.return_value = [declarative_stream] + + error_message = list_streams(manifest_declarative_source, {}) + + assert error_message.type == MessageType.TRACE + assert "Error listing streams." == error_message.trace.error.message + assert "A declarative stream should only have a retriever of type HttpStream" in error_message.trace.error.internal_message + + +def test_given_unexpected_error_when_list_streams_then_return_exception_message(manifest_declarative_source): + manifest_declarative_source.streams.side_effect = Exception("unexpected error") + + error_message = list_streams(manifest_declarative_source, {}) + + assert error_message.type == MessageType.TRACE + assert "Error listing streams." == error_message.trace.error.message + assert "unexpected error" == error_message.trace.error.internal_message + + +def test_list_streams_integration_test(): + config = copy.deepcopy(RESOLVE_MANIFEST_CONFIG) + command = "list_streams" + config["__command"] = command + source = ManifestDeclarativeSource(MANIFEST) + + list_streams = handle_connector_builder_request(source, command, config, None) + + assert list_streams.record.data == { + "streams": [{"name": "stream_with_custom_requester", "url": "https://api.sendgrid.com/v3/marketing/lists"}] + } + + +def create_mock_http_stream(name, url_base, path): + http_stream = mock.Mock(spec=HttpStream, autospec=True) + http_stream.name = name + http_stream.url_base = url_base + http_stream.path.return_value = path + return http_stream + + +def create_mock_declarative_stream(http_stream): + declarative_stream = mock.Mock(spec=DeclarativeStream, autospec=True) + declarative_stream.retriever = http_stream + return declarative_stream