From c44c3eae485390f5b4e2bd75b7fe573bf9ef83f4 Mon Sep 17 00:00:00 2001 From: midavadim Date: Fri, 23 Jun 2023 20:15:25 +0300 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20CDK:=20availability=20check=20-=20h?= =?UTF-8?q?andle=20HttpErrors=20which=20happen=20during=20slice=20extracti?= =?UTF-8?q?on=20=20(#26630)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * for availability check - handle HttError happens during slice extraction (reading of parent stream), updated reason messages, moved check availability call under common try/except which handles errors during usual stream read, moved log messages which indicate start of the stream sync before availability check in to make to understand which stream is the source of errors * why do we return here and not try next stream? * fixed bug in CheckStream, now we try to check availability for all streams --- .../airbyte_cdk/sources/abstract_source.py | 8 +- .../declarative/checks/check_stream.py | 5 +- .../streams/http/availability_strategy.py | 42 +++++---- .../declarative/checks/test_check_stream.py | 2 +- .../python/unit_tests/sources/test_source.py | 89 ++++++++++++++++++- 5 files changed, 119 insertions(+), 27 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 6ccaa5fe484..f0578d85d35 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -110,12 +110,12 @@ class AbstractSource(Source, ABC): f"The requested stream {configured_stream.stream.name} was not found in the source." f" Available streams: {stream_instances.keys()}" ) - stream_is_available, error = stream_instance.check_availability(logger, self) - if not stream_is_available: - logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}") - continue try: timer.start_event(f"Syncing stream {configured_stream.stream.name}") + stream_is_available, reason = stream_instance.check_availability(logger, self) + if not stream_is_available: + logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. {reason}") + continue logger.info(f"Marking stream {configured_stream.stream.name} as STARTED") yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED) yield from self._read_stream( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index b1baacde083..ac7fd6d9037 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -40,10 +40,9 @@ class CheckStream(ConnectionChecker): availability_strategy = stream.availability_strategy or HttpAvailabilityStrategy() try: stream_is_available, reason = availability_strategy.check_availability(stream, logger, source) - if stream_is_available: - return True, None - else: + if not stream_is_available: return False, reason except Exception as error: logger.error(f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}") return False, f"Unable to connect to stream {stream_name} - {error}" + return True, None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py index 939050f60ce..3f8755070c4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py @@ -40,6 +40,11 @@ class HttpAvailabilityStrategy(AvailabilityStrategy): # without accounting for the case in which the parent stream is empty. reason = f"Cannot attempt to connect to stream {stream.name} - no stream slices were found, likely because the parent stream is empty." return False, reason + except HTTPError as error: + is_available, reason = self.handle_http_error(stream, logger, source, error) + if not is_available: + reason = f"Unable to get slices for {stream.name} stream, because of error in parent stream. {reason}" + return is_available, reason try: get_first_record_for_slice(stream, stream_slice) @@ -48,7 +53,10 @@ class HttpAvailabilityStrategy(AvailabilityStrategy): logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") return True, None except HTTPError as error: - return self.handle_http_error(stream, logger, source, error) + is_available, reason = self.handle_http_error(stream, logger, source, error) + if not is_available: + reason = f"Unable to read {stream.name} stream. {reason}" + return is_available, reason def handle_http_error( self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError @@ -69,17 +77,20 @@ class HttpAvailabilityStrategy(AvailabilityStrategy): for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible. """ - try: - status_code = error.response.status_code - reason = self.reasons_for_unavailable_status_codes(stream, logger, source, error)[status_code] - response_error_message = stream.parse_response_error_message(error.response) - if response_error_message: - reason += response_error_message - return False, reason - except KeyError: - # If the HTTPError is not in the dictionary of errors we know how to handle, don't except it + status_code = error.response.status_code + known_status_codes = self.reasons_for_unavailable_status_codes(stream, logger, source, error) + known_reason = known_status_codes.get(status_code) + if not known_reason: + # If the HTTPError is not in the dictionary of errors we know how to handle, don't except raise error + doc_ref = self._visit_docs_message(logger, source) + reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {known_reason}. {doc_ref} " + response_error_message = stream.parse_response_error_message(error.response) + if response_error_message: + reason += response_error_message + return False, reason + def reasons_for_unavailable_status_codes( self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError ) -> Dict[int, str]: @@ -95,17 +106,16 @@ class HttpAvailabilityStrategy(AvailabilityStrategy): why 'status code' may have occurred and how the user can resolve that error, if applicable. """ - forbidden_error_message = f"The endpoint to access stream '{stream.name}' returned 403: Forbidden. " - forbidden_error_message += "This is most likely due to insufficient permissions on the credentials in use. " - forbidden_error_message += self._visit_docs_message(logger, source) - - reasons_for_codes: Dict[int, str] = {requests.codes.FORBIDDEN: forbidden_error_message} + reasons_for_codes: Dict[int, str] = { + requests.codes.FORBIDDEN: "This is most likely due to insufficient permissions on the credentials in use. " + "Try to grant required permissions/scopes or re-authenticate" + } return reasons_for_codes @staticmethod def _visit_docs_message(logger: logging.Logger, source: Optional["Source"]) -> str: """ - Creates a message indicicating where to look in the documentation for + Creates a message indicating where to look in the documentation for more information on a given source by checking the spec of that source (if provided) for a 'documentationUrl'. diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index 606ceb730ef..552f810f629 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -96,7 +96,7 @@ def test_check_stream_with_no_stream_slices_aborts(): 403, False, [ - "The endpoint to access stream 'mock_http_stream' returned 403: Forbidden.", + "Unable to read mock_http_stream stream", "This is most likely due to insufficient permissions on the credentials in use.", ], ), diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 5237c81f87e..1996c56914c 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -25,7 +25,7 @@ from airbyte_cdk.models import ( from airbyte_cdk.sources import AbstractSource, Source from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy -from airbyte_cdk.sources.streams.http.http import HttpStream +from airbyte_cdk.sources.streams.http.http import HttpStream, HttpSubStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from pydantic import ValidationError @@ -584,12 +584,95 @@ def test_read_default_http_availability_strategy_stream_unavailable(catalog, moc with caplog.at_level(logging.WARNING): records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] - # 0 for http stream, 3 for non http stream and 3 status trace meessages + # 0 for http stream, 3 for non http stream and 3 status trace messages assert len(records) == 0 + 3 + 3 assert non_http_stream.read_records.called expected_logs = [ f"Skipped syncing stream '{http_stream.name}' because it was unavailable.", - f"The endpoint to access stream '{http_stream.name}' returned 403: Forbidden.", + f"Unable to read {http_stream.name} stream.", + "This is most likely due to insufficient permissions on the credentials in use.", + f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more." + ] + for message in expected_logs: + assert message in caplog.text + + +def test_read_default_http_availability_strategy_parent_stream_unavailable(catalog, mocker, caplog): + """Test default availability strategy if error happens during slice extraction (reading of parent stream)""" + mocker.patch.multiple(Stream, __abstractmethods__=set()) + + class MockHttpParentStream(HttpStream): + url_base = "https://test_base_url.com" + primary_key = "" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.resp_counter = 1 + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def path(self, **kwargs) -> str: + return "" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + stub_response = {"data": self.resp_counter} + self.resp_counter += 1 + yield stub_response + + class MockHttpStream(HttpSubStream): + url_base = "https://test_base_url.com" + primary_key = "" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.resp_counter = 1 + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def path(self, **kwargs) -> str: + return "" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + stub_response = {"data": self.resp_counter} + self.resp_counter += 1 + yield stub_response + + http_stream = MockHttpStream(parent=MockHttpParentStream()) + streams = [http_stream] + assert isinstance(http_stream, HttpSubStream) + assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) + + # Patch HTTP request to stream endpoint to make it unavailable + req = requests.Response() + req.status_code = 403 + mocker.patch.object(requests.Session, "send", return_value=req) + + source = MockAbstractSource(streams=streams) + logger = logging.getLogger("test_read_default_http_availability_strategy_parent_stream_unavailable") + configured_catalog = { + "streams": [ + { + "stream": { + "name": "mock_http_stream", + "json_schema": {"type": "object", "properties": {"k": "v"}}, + "supported_sync_modes": ["full_refresh"], + }, + "destination_sync_mode": "overwrite", + "sync_mode": "full_refresh", + } + ] + } + catalog = ConfiguredAirbyteCatalog.parse_obj(configured_catalog) + with caplog.at_level(logging.WARNING): + records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] + + # 0 for http stream, 3 for non http stream and 3 status trace messages + assert len(records) == 0 + expected_logs = [ + f"Skipped syncing stream '{http_stream.name}' because it was unavailable.", + f"Unable to get slices for {http_stream.name} stream, because of error in parent stream", "This is most likely due to insufficient permissions on the credentials in use.", f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more." ]