From f3799280f22b5b81df1857fa123bf64df9a34ae8 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Fri, 14 Apr 2023 10:23:59 -0700 Subject: [PATCH] connector builder: Emit message at start of slice (#25180) * Move condition for yielding the slice message to an overwritable method * Automated Commit - Formatting Changes * yield the slice log messages * same for incremental * refactor * Revert "refactor" This reverts commit c594365bd82c059df9d3d41656d58d426278aa78. * move flag from factory to source * set the flag * remove debug print * halfmock * clean up * Add a test for a single page * Add another test * Pass the flag * rename --------- Co-authored-by: girarda --- .../connector_builder_handler.py | 4 +- .../airbyte_cdk/sources/abstract_source.py | 12 +- .../manifest_declarative_source.py | 22 ++- .../test_connector_builder_handler.py | 163 +++++++++++++++--- 4 files changed, 173 insertions(+), 28 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py index 76909c8ca6d..5637b0955e3 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte-cdk/python/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -45,7 +45,9 @@ def get_limits(config: Mapping[str, Any]) -> TestReadLimits: def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource: manifest = config["__injected_declarative_manifest"] return ManifestDeclarativeSource( - source_config=manifest, component_factory=ModelToComponentFactory( + emit_connector_builder_messages=True, + source_config=manifest, + component_factory=ModelToComponentFactory( emit_connector_builder_messages=True, limit_pages_fetched_per_slice=limits.max_pages_per_slice, limit_slices_fetched=limits.max_slices) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 4beec964909..8cfcb0b5bea 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -237,7 +237,7 @@ class AbstractSource(Source, ABC): has_slices = False for _slice in slices: has_slices = True - if logger.isEnabledFor(logging.DEBUG): + if self.should_log_slice_message(logger): yield AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), @@ -277,6 +277,14 @@ class AbstractSource(Source, ABC): checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager) yield checkpoint + def should_log_slice_message(self, logger: logging.Logger): + """ + + :param logger: + :return: + """ + return logger.isEnabledFor(logging.DEBUG) + def _read_full_refresh( self, logger: logging.Logger, @@ -290,7 +298,7 @@ class AbstractSource(Source, ABC): ) total_records_counter = 0 for _slice in slices: - if logger.isEnabledFor(logging.DEBUG): + if self.should_log_slice_message(logger): yield AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 9ddb5c8819b..82fbb1dfb36 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -36,7 +36,13 @@ class ManifestDeclarativeSource(DeclarativeSource): VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "schemas", "spec", "streams", "type", "version"} - def __init__(self, source_config: ConnectionDefinition, debug: bool = False, component_factory: ModelToComponentFactory = None): + def __init__( + self, + source_config: ConnectionDefinition, + debug: bool = False, + emit_connector_builder_messages: bool = False, + component_factory: ModelToComponentFactory = None, + ): """ :param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector :param debug(bool): True if debug mode is enabled @@ -53,7 +59,8 @@ class ManifestDeclarativeSource(DeclarativeSource): propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters("", resolved_source_config, {}) self._source_config = propagated_source_config self._debug = debug - self._constructor = component_factory if component_factory else ModelToComponentFactory() + self._emit_connector_builder_messages = emit_connector_builder_messages + self._constructor = component_factory if component_factory else ModelToComponentFactory(emit_connector_builder_messages) self._validate_source() @@ -66,7 +73,9 @@ class ManifestDeclarativeSource(DeclarativeSource): check = self._source_config["check"] if "type" not in check: check["type"] = "CheckStream" - check_stream = self._constructor.create_component(CheckStreamModel, check, dict()) + check_stream = self._constructor.create_component( + CheckStreamModel, check, dict(), emit_connector_builder_messages=self._emit_connector_builder_messages + ) if isinstance(check_stream, ConnectionChecker): return check_stream else: @@ -76,7 +85,9 @@ class ManifestDeclarativeSource(DeclarativeSource): self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}) source_streams = [ - self._constructor.create_component(DeclarativeStreamModel, stream_config, config) + self._constructor.create_component( + DeclarativeStreamModel, stream_config, config, emit_connector_builder_messages=self._emit_connector_builder_messages + ) for stream_config in self._stream_configs(self._source_config) ] @@ -118,6 +129,9 @@ class ManifestDeclarativeSource(DeclarativeSource): self._configure_logger_level(logger) yield from super().read(logger, config, catalog, state) + def should_log_slice_message(self, logger: logging.Logger): + return self._emit_connector_builder_messages or super(self).should_log_slice_message(logger) + def _configure_logger_level(self, logger: logging.Logger): """ Set the log level to logging.DEBUG if debug mode is enabled diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index 0ebadda7948..2ac37f24c6c 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -9,6 +9,7 @@ from unittest import mock from unittest.mock import patch import pytest +import requests from airbyte_cdk import connector_builder from airbyte_cdk.connector_builder.connector_builder_handler import ( DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, @@ -22,10 +23,22 @@ from airbyte_cdk.connector_builder.connector_builder_handler import ( ) from airbyte_cdk.connector_builder.main import handle_connector_builder_request, handle_request, read_stream from airbyte_cdk.connector_builder.models import LogMessage, StreamRead, StreamReadSlicesInner, StreamReadSlicesInnerPagesInner -from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.models import ( + AirbyteLogMessage, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Level, + SyncMode, +) +from airbyte_cdk.models import Type from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.declarative.retrievers import SimpleRetrieverTestReadDecorator from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http import HttpStream from unit_tests.connector_builder.utils import create_configured_catalog @@ -34,23 +47,29 @@ _stream_name = "stream_with_custom_requester" _stream_primary_key = "id" _stream_url_base = "https://api.sendgrid.com" _stream_options = {"name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base} +_page_size = 2 MANIFEST = { "version": "0.30.3", "definitions": { - "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, "retriever": { "paginator": { "type": "DefaultPaginator", - "page_size": 10, + "page_size": _page_size, "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": _page_size}, }, + "partition_router": { + "type": "ListPartitionRouter", + "values": ["0", "1", "2", "3", "4", "5", "6", "7"], + "cursor_field": "item_id" + }, + "" "requester": { "path": "/v3/marketing/lists", "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, - "request_parameters": {"page_size": "10"}, + "request_parameters": {"a_param": "10"}, }, "record_selector": {"extractor": {"field_path": ["result"]}}, }, @@ -59,7 +78,6 @@ MANIFEST = { { "type": "DeclarativeStream", "$parameters": _stream_options, - "schema_loader": {"$ref": "#/definitions/schema_loader"}, "retriever": "#/definitions/retriever", }, ], @@ -169,19 +187,23 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "type": "DeclarativeSource", "version": "0.30.3", "definitions": { - "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, "retriever": { "paginator": { "type": "DefaultPaginator", - "page_size": 10, + "page_size": _page_size, "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": 10}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}", "page_size": _page_size}, + }, + "partition_router": { + "type": "ListPartitionRouter", + "values": ["0", "1", "2", "3", "4", "5", "6", "7"], + "cursor_field": "item_id", }, "requester": { "path": "/v3/marketing/lists", "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, - "request_parameters": {"page_size": "10"}, + "request_parameters": {"a_param": "10"}, }, "record_selector": {"extractor": {"field_path": ["result"]}}, }, @@ -189,19 +211,11 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "streams": [ { "type": "DeclarativeStream", - "schema_loader": { - "type": "JsonFileSchemaLoader", - "name": "{{ options.stream_name }}", - "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", - "primary_key": _stream_primary_key, - "url_base": _stream_url_base, - "$parameters": _stream_options, - }, "retriever": { "type": "SimpleRetriever", "paginator": { "type": "DefaultPaginator", - "page_size": 10, + "page_size": _page_size, "page_size_option": { "type": "RequestOption", "inject_into": "request_parameter", @@ -226,7 +240,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "primary_key": _stream_primary_key, "url_base": _stream_url_base, "$parameters": _stream_options, - "page_size": 10, + "page_size": _page_size, }, "name": _stream_name, "primary_key": _stream_primary_key, @@ -244,7 +258,16 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): "url_base": _stream_url_base, "$parameters": _stream_options, }, - "request_parameters": {"page_size": "10"}, + "request_parameters": {"a_param": "10"}, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "partition_router": { + "type": "ListPartitionRouter", + "values": ["0", "1", "2", "3", "4", "5", "6", "7"], + "cursor_field": "item_id", "name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base, @@ -512,3 +535,101 @@ def test_create_source(): assert isinstance(source, ManifestDeclarativeSource) assert source._constructor._limit_pages_fetched_per_slice == limits.max_pages_per_slice assert source._constructor._limit_slices_fetched == limits.max_slices + + +def request_log_message(request: dict) -> AirbyteMessage: + return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"request:{json.dumps(request)}")) + + +def response_log_message(response: dict) -> AirbyteMessage: + return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"response:{json.dumps(response)}")) + + +def _create_request(): + url = "https://example.com/api" + headers = {'Content-Type': 'application/json'} + return requests.Request('POST', url, headers=headers, json={"key": "value"}).prepare() + + +def _create_response(body): + response = requests.Response() + response.status_code = 200 + response._content = bytes(json.dumps(body), "utf-8") + response.headers["Content-Type"] = "application/json" + return response + + +def _create_page(response_body): + return _create_request(), _create_response(response_body) + + +@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}})) * 10) +def test_read_source(mock_http_stream): + """ + This test sort of acts as an integration test for the connector builder. + + Each slice has two pages + The first page has two records + The second page one record + + The response._metadata.next field in the first page tells the paginator to fetch the next page. + """ + max_records = 100 + max_pages_per_slice = 2 + max_slices = 3 + limits = TestReadLimits(max_records, max_pages_per_slice, max_slices) + + catalog = ConfiguredAirbyteCatalog(streams=[ + ConfiguredAirbyteStream(stream=AirbyteStream(name=_stream_name, json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append) + ]) + + config = {"__injected_declarative_manifest": MANIFEST} + + source = create_source(config, limits) + + output_data = read_stream(source, config, catalog, limits).record.data + slices = output_data["slices"] + + assert len(slices) == max_slices + for s in slices: + pages = s["pages"] + assert len(pages) == max_pages_per_slice + + first_page, second_page = pages[0], pages[1] + assert len(first_page["records"]) == _page_size + assert len(second_page["records"]) == 1 + + streams = source.streams(config) + for s in streams: + assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator) + + +@patch.object(HttpStream, "_fetch_next_page", side_effect=(_create_page({"result": [{"id": 0}, {"id": 1}],"_metadata": {"next": "next"}}), _create_page({"result": [{"id": 2}],"_metadata": {"next": "next"}}))) +def test_read_source_single_page_single_slice(mock_http_stream): + max_records = 100 + max_pages_per_slice = 1 + max_slices = 1 + limits = TestReadLimits(max_records, max_pages_per_slice, max_slices) + + catalog = ConfiguredAirbyteCatalog(streams=[ + ConfiguredAirbyteStream(stream=AirbyteStream(name=_stream_name, json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append) + ]) + + config = {"__injected_declarative_manifest": MANIFEST} + + source = create_source(config, limits) + + output_data = read_stream(source, config, catalog, limits).record.data + slices = output_data["slices"] + + assert len(slices) == max_slices + for s in slices: + pages = s["pages"] + assert len(pages) == max_pages_per_slice + + first_page = pages[0] + assert len(first_page["records"]) == _page_size + + streams = source.streams(config) + for s in streams: + assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)