diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 6ccaa5fe484..f0578d85d35 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -110,12 +110,12 @@ class AbstractSource(Source, ABC): f"The requested stream {configured_stream.stream.name} was not found in the source." f" Available streams: {stream_instances.keys()}" ) - stream_is_available, error = stream_instance.check_availability(logger, self) - if not stream_is_available: - logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}") - continue try: timer.start_event(f"Syncing stream {configured_stream.stream.name}") + stream_is_available, reason = stream_instance.check_availability(logger, self) + if not stream_is_available: + logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. {reason}") + continue logger.info(f"Marking stream {configured_stream.stream.name} as STARTED") yield stream_status_as_airbyte_message(configured_stream, AirbyteStreamStatus.STARTED) yield from self._read_stream( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index b1baacde083..ac7fd6d9037 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -40,10 +40,9 @@ class CheckStream(ConnectionChecker): availability_strategy = stream.availability_strategy or HttpAvailabilityStrategy() try: stream_is_available, reason = availability_strategy.check_availability(stream, logger, source) - if stream_is_available: - return True, None - else: + if not stream_is_available: return False, reason except Exception as error: logger.error(f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}") return False, f"Unable to connect to stream {stream_name} - {error}" + return True, None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py index 939050f60ce..3f8755070c4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py @@ -40,6 +40,11 @@ class HttpAvailabilityStrategy(AvailabilityStrategy): # without accounting for the case in which the parent stream is empty. reason = f"Cannot attempt to connect to stream {stream.name} - no stream slices were found, likely because the parent stream is empty." return False, reason + except HTTPError as error: + is_available, reason = self.handle_http_error(stream, logger, source, error) + if not is_available: + reason = f"Unable to get slices for {stream.name} stream, because of error in parent stream. {reason}" + return is_available, reason try: get_first_record_for_slice(stream, stream_slice) @@ -48,7 +53,10 @@ class HttpAvailabilityStrategy(AvailabilityStrategy): logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") return True, None except HTTPError as error: - return self.handle_http_error(stream, logger, source, error) + is_available, reason = self.handle_http_error(stream, logger, source, error) + if not is_available: + reason = f"Unable to read {stream.name} stream. {reason}" + return is_available, reason def handle_http_error( self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError @@ -69,17 +77,20 @@ class HttpAvailabilityStrategy(AvailabilityStrategy): for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible. """ - try: - status_code = error.response.status_code - reason = self.reasons_for_unavailable_status_codes(stream, logger, source, error)[status_code] - response_error_message = stream.parse_response_error_message(error.response) - if response_error_message: - reason += response_error_message - return False, reason - except KeyError: - # If the HTTPError is not in the dictionary of errors we know how to handle, don't except it + status_code = error.response.status_code + known_status_codes = self.reasons_for_unavailable_status_codes(stream, logger, source, error) + known_reason = known_status_codes.get(status_code) + if not known_reason: + # If the HTTPError is not in the dictionary of errors we know how to handle, don't except raise error + doc_ref = self._visit_docs_message(logger, source) + reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {known_reason}. {doc_ref} " + response_error_message = stream.parse_response_error_message(error.response) + if response_error_message: + reason += response_error_message + return False, reason + def reasons_for_unavailable_status_codes( self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError ) -> Dict[int, str]: @@ -95,17 +106,16 @@ class HttpAvailabilityStrategy(AvailabilityStrategy): why 'status code' may have occurred and how the user can resolve that error, if applicable. """ - forbidden_error_message = f"The endpoint to access stream '{stream.name}' returned 403: Forbidden. " - forbidden_error_message += "This is most likely due to insufficient permissions on the credentials in use. " - forbidden_error_message += self._visit_docs_message(logger, source) - - reasons_for_codes: Dict[int, str] = {requests.codes.FORBIDDEN: forbidden_error_message} + reasons_for_codes: Dict[int, str] = { + requests.codes.FORBIDDEN: "This is most likely due to insufficient permissions on the credentials in use. " + "Try to grant required permissions/scopes or re-authenticate" + } return reasons_for_codes @staticmethod def _visit_docs_message(logger: logging.Logger, source: Optional["Source"]) -> str: """ - Creates a message indicicating where to look in the documentation for + Creates a message indicating where to look in the documentation for more information on a given source by checking the spec of that source (if provided) for a 'documentationUrl'. diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index 606ceb730ef..552f810f629 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -96,7 +96,7 @@ def test_check_stream_with_no_stream_slices_aborts(): 403, False, [ - "The endpoint to access stream 'mock_http_stream' returned 403: Forbidden.", + "Unable to read mock_http_stream stream", "This is most likely due to insufficient permissions on the credentials in use.", ], ), diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 5237c81f87e..1996c56914c 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -25,7 +25,7 @@ from airbyte_cdk.models import ( from airbyte_cdk.sources import AbstractSource, Source from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy -from airbyte_cdk.sources.streams.http.http import HttpStream +from airbyte_cdk.sources.streams.http.http import HttpStream, HttpSubStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from pydantic import ValidationError @@ -584,12 +584,95 @@ def test_read_default_http_availability_strategy_stream_unavailable(catalog, moc with caplog.at_level(logging.WARNING): records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] - # 0 for http stream, 3 for non http stream and 3 status trace meessages + # 0 for http stream, 3 for non http stream and 3 status trace messages assert len(records) == 0 + 3 + 3 assert non_http_stream.read_records.called expected_logs = [ f"Skipped syncing stream '{http_stream.name}' because it was unavailable.", - f"The endpoint to access stream '{http_stream.name}' returned 403: Forbidden.", + f"Unable to read {http_stream.name} stream.", + "This is most likely due to insufficient permissions on the credentials in use.", + f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more." + ] + for message in expected_logs: + assert message in caplog.text + + +def test_read_default_http_availability_strategy_parent_stream_unavailable(catalog, mocker, caplog): + """Test default availability strategy if error happens during slice extraction (reading of parent stream)""" + mocker.patch.multiple(Stream, __abstractmethods__=set()) + + class MockHttpParentStream(HttpStream): + url_base = "https://test_base_url.com" + primary_key = "" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.resp_counter = 1 + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def path(self, **kwargs) -> str: + return "" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + stub_response = {"data": self.resp_counter} + self.resp_counter += 1 + yield stub_response + + class MockHttpStream(HttpSubStream): + url_base = "https://test_base_url.com" + primary_key = "" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.resp_counter = 1 + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def path(self, **kwargs) -> str: + return "" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + stub_response = {"data": self.resp_counter} + self.resp_counter += 1 + yield stub_response + + http_stream = MockHttpStream(parent=MockHttpParentStream()) + streams = [http_stream] + assert isinstance(http_stream, HttpSubStream) + assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) + + # Patch HTTP request to stream endpoint to make it unavailable + req = requests.Response() + req.status_code = 403 + mocker.patch.object(requests.Session, "send", return_value=req) + + source = MockAbstractSource(streams=streams) + logger = logging.getLogger("test_read_default_http_availability_strategy_parent_stream_unavailable") + configured_catalog = { + "streams": [ + { + "stream": { + "name": "mock_http_stream", + "json_schema": {"type": "object", "properties": {"k": "v"}}, + "supported_sync_modes": ["full_refresh"], + }, + "destination_sync_mode": "overwrite", + "sync_mode": "full_refresh", + } + ] + } + catalog = ConfiguredAirbyteCatalog.parse_obj(configured_catalog) + with caplog.at_level(logging.WARNING): + records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] + + # 0 for http stream, 3 for non http stream and 3 status trace messages + assert len(records) == 0 + expected_logs = [ + f"Skipped syncing stream '{http_stream.name}' because it was unavailable.", + f"Unable to get slices for {http_stream.name} stream, because of error in parent stream", "This is most likely due to insufficient permissions on the credentials in use.", f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more." ]