CDK: AbstractSource.read() skips syncing stream if its unavailable (add AvailabilityStrategy concept) (#19977)
* Rough first implememtation of AvailabilityStrategy s * Basic unit tests for AvailabilityStrategy and ScopedAvailabilityStrategy * Make availability_strategy a property, separate out tests * Remove from DeclarativeSource, remove Source parameter from methods, make default no AvailabilityStrategy * Add skip stream if not available to read() * Changes to CDK to get source-github working using AvailabilityStrategy, flakecheck * reorganize cdk class, add HTTPAvailabilityStrategy test * cleanup, docstrings * pull out error handling into separate method * Pass source and logger to check_connection method * Add documentation links, handle 403 specifically * Fix circular import * Add AvailabilityStrategy to Stream and HTTPStream classes * Remove AS from abstract_source, add to Stream, HTTPStream, AvailabilityStrategy unit tests passing for per-stream strategies * Modify MockHttpStream to set no AvailabilityStrategy since source test mocking doesn't support this * Move AvailabilityStrategy class to sources.streams * Move HTTPAvailabilityStrategy to http module * Use pascal case for HttpAvailabilityStrategy * Remove docs message method :( and default to True availability on unhandled HTTPErrors * add check_availability method to stream class * Add optional source parameter * Add test for connector-specific documentation, small tests refactor * Add test that performs the read() function for stream with default availability strategy * Add test for read function behavior when stream is unavailable * Add 403 info in logger message * Don't return error for other HTTPErrors * Split up error handling into methods 'unavailable_error_codes' and 'get_reason_for_error' * rework overrideable list of status codes to be a dict with reasons, to enforce that users provide reasons for all listed errors * Fix incorrect typing * Move HttpAvailability to its own module, fix flake errors * Fix ScopedAvailabilityStrategy, docstrings and types for streams/availability_strategy.py * Docstrings and types for core.py and http/availability_strategy.py * Move _get_stream_slices to a StreamHelper class * Docstrings + types for stream_helpers.py, cleanup test_availability.py * Clean up test_source.py * Move logic of getting the initial record from a stream to StreamHelper class * Add changelog and bump minor version * change 'is True' and 'is False' behavior * use mocker.MagicMock * Remove ScopedAvailabilityStrategy * Don't except non-403 errors, check_stream uses availability_strategy if possible * CDK: pass error to reasons_for_error_codes * make get_stream_slice public * Add tests for raising unhandled errors and retries are handled * Add tests for CheckStream via AvailabilityStrategy * Add documentation for stream availability of http streams * Move availability unit tests to correct modules, report error message if possible * Add test for reporting specific error if available
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
# Changelog
|
||||
|
||||
## 0.13.0
|
||||
Add `Stream.check_availability` and `Stream.AvailabilityStrategy`. Make `HttpAvailabilityStrategy` the default `HttpStream.AvailabilityStrategy`.
|
||||
|
||||
## 0.12.4
|
||||
Lookback window should applied when a state is supplied as well
|
||||
|
||||
|
||||
@@ -106,6 +106,10 @@ class AbstractSource(Source, ABC):
|
||||
f"The requested stream {configured_stream.stream.name} was not found in the source."
|
||||
f" Available streams: {stream_instances.keys()}"
|
||||
)
|
||||
stream_is_available, error = stream_instance.check_availability(logger, self)
|
||||
if not stream_is_available:
|
||||
logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}")
|
||||
continue
|
||||
try:
|
||||
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
|
||||
yield from self._read_stream(
|
||||
@@ -187,7 +191,7 @@ class AbstractSource(Source, ABC):
|
||||
@staticmethod
|
||||
def _limit_reached(internal_config: InternalConfig, records_counter: int) -> bool:
|
||||
"""
|
||||
Check if record count reached liimt set by internal config.
|
||||
Check if record count reached limit set by internal config.
|
||||
:param internal_config - internal CDK configuration separated from user defined config
|
||||
:records_counter - number of records already red
|
||||
:return True if limit reached, False otherwise
|
||||
|
||||
@@ -6,9 +6,9 @@ import logging
|
||||
from dataclasses import InitVar, dataclass
|
||||
from typing import Any, List, Mapping, Tuple
|
||||
|
||||
from airbyte_cdk.models.airbyte_protocol import SyncMode
|
||||
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
|
||||
from airbyte_cdk.sources.source import Source
|
||||
from airbyte_cdk.sources.utils.stream_helpers import StreamHelper
|
||||
from dataclasses_jsonschema import JsonSchemaMixin
|
||||
|
||||
|
||||
@@ -33,29 +33,19 @@ class CheckStream(ConnectionChecker, JsonSchemaMixin):
|
||||
if len(streams) == 0:
|
||||
return False, f"No streams to connect to from source {source}"
|
||||
for stream_name in self.stream_names:
|
||||
if stream_name in stream_name_to_stream.keys():
|
||||
stream = stream_name_to_stream[stream_name]
|
||||
try:
|
||||
# Some streams need a stream slice to read records (eg if they have a SubstreamSlicer)
|
||||
stream_slice = self._get_stream_slice(stream)
|
||||
records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
|
||||
next(records)
|
||||
except Exception as error:
|
||||
return False, f"Unable to connect to stream {stream_name} - {error}"
|
||||
else:
|
||||
if stream_name not in stream_name_to_stream.keys():
|
||||
raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}")
|
||||
return True, None
|
||||
|
||||
def _get_stream_slice(self, stream):
|
||||
# We wrap the return output of stream_slices() because some implementations return types that are iterable,
|
||||
# but not iterators such as lists or tuples
|
||||
slices = iter(
|
||||
stream.stream_slices(
|
||||
cursor_field=stream.cursor_field,
|
||||
sync_mode=SyncMode.full_refresh,
|
||||
)
|
||||
)
|
||||
try:
|
||||
return next(slices)
|
||||
except StopIteration:
|
||||
return {}
|
||||
stream = stream_name_to_stream[stream_name]
|
||||
try:
|
||||
if stream.availability_strategy is not None:
|
||||
stream_is_available, reason = stream.check_availability(logger, source)
|
||||
if not stream_is_available:
|
||||
return False, reason
|
||||
else:
|
||||
stream_helper = StreamHelper()
|
||||
stream_helper.get_first_record(stream)
|
||||
except Exception as error:
|
||||
return False, f"Unable to connect to stream {stream_name} - {error}"
|
||||
|
||||
return True, None
|
||||
|
||||
@@ -17,7 +17,7 @@ class DeclarativeSource(AbstractSource):
|
||||
@property
|
||||
@abstractmethod
|
||||
def connection_checker(self) -> ConnectionChecker:
|
||||
"""Returns the ConnectioChecker to use for the `check` operation"""
|
||||
"""Returns the ConnectionChecker to use for the `check` operation"""
|
||||
|
||||
def check_connection(self, logger, config) -> Tuple[bool, any]:
|
||||
"""
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
#
|
||||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import logging
|
||||
import typing
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from airbyte_cdk.sources import Source
|
||||
|
||||
|
||||
class AvailabilityStrategy(ABC):
|
||||
"""
|
||||
Abstract base class for checking stream availability.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Checks stream availability.
|
||||
|
||||
:param stream: stream
|
||||
:param logger: source logger
|
||||
:param source: (optional) source
|
||||
:return: A tuple of (boolean, str). If boolean is true, then the stream
|
||||
is available, and no str is required. Otherwise, the stream is unavailable
|
||||
for some reason and the str should describe what went wrong and how to
|
||||
resolve the unavailability, if possible.
|
||||
"""
|
||||
@@ -5,9 +5,10 @@
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
import typing
|
||||
from abc import ABC, abstractmethod
|
||||
from functools import lru_cache
|
||||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
|
||||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
|
||||
|
||||
import airbyte_cdk.sources.utils.casing as casing
|
||||
from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
|
||||
@@ -17,6 +18,10 @@ from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
|
||||
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
|
||||
from deprecated.classic import deprecated
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from airbyte_cdk.sources import Source
|
||||
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
||||
|
||||
# A stream's read method can return one of the following types:
|
||||
# Mapping[str, Any]: The content of an AirbyteRecordMessage
|
||||
# AirbyteRecordMessage: An AirbyteRecordMessage
|
||||
@@ -170,6 +175,28 @@ class Stream(ABC):
|
||||
"""
|
||||
return True
|
||||
|
||||
def check_availability(self, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Checks whether this stream is available.
|
||||
|
||||
:param logger: source logger
|
||||
:param source: (optional) source
|
||||
:return: A tuple of (boolean, str). If boolean is true, then this stream
|
||||
is available, and no str is required. Otherwise, this stream is unavailable
|
||||
for some reason and the str should describe what went wrong and how to
|
||||
resolve the unavailability, if possible.
|
||||
"""
|
||||
if self.availability_strategy:
|
||||
return self.availability_strategy.check_availability(self, logger, source)
|
||||
return True, None
|
||||
|
||||
@property
|
||||
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
|
||||
"""
|
||||
:return: The AvailabilityStrategy used to check whether this stream is available.
|
||||
"""
|
||||
return None
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
#
|
||||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import logging
|
||||
import typing
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
import requests
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
||||
from airbyte_cdk.sources.utils.stream_helpers import StreamHelper
|
||||
from requests import HTTPError
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from airbyte_cdk.sources import Source
|
||||
|
||||
|
||||
class HttpAvailabilityStrategy(AvailabilityStrategy):
|
||||
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Check stream availability by attempting to read the first record of the
|
||||
stream.
|
||||
|
||||
:param stream: stream
|
||||
:param logger: source logger
|
||||
:param source: (optional) source
|
||||
:return: A tuple of (boolean, str). If boolean is true, then the stream
|
||||
is available, and no str is required. Otherwise, the stream is unavailable
|
||||
for some reason and the str should describe what went wrong and how to
|
||||
resolve the unavailability, if possible.
|
||||
"""
|
||||
try:
|
||||
stream_helper = StreamHelper()
|
||||
stream_helper.get_first_record(stream)
|
||||
except HTTPError as error:
|
||||
return self.handle_http_error(stream, logger, source, error)
|
||||
return True, None
|
||||
|
||||
def handle_http_error(
|
||||
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
|
||||
) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Override this method to define error handling for various `HTTPError`s
|
||||
that are raised while attempting to check a stream's availability.
|
||||
|
||||
Checks whether an error's status_code is in a list of unavailable_error_codes,
|
||||
and gets the associated reason for that error.
|
||||
|
||||
:param stream: stream
|
||||
:param logger: source logger
|
||||
:param source: optional (source)
|
||||
:param error: HTTPError raised while checking stream's availability.
|
||||
:return: A tuple of (boolean, str). If boolean is true, then the stream
|
||||
is available, and no str is required. Otherwise, the stream is unavailable
|
||||
for some reason and the str should describe what went wrong and how to
|
||||
resolve the unavailability, if possible.
|
||||
"""
|
||||
try:
|
||||
status_code = error.response.status_code
|
||||
reason = self.reasons_for_unavailable_status_codes(stream, logger, source, error)[status_code]
|
||||
response_error_message = stream.parse_response_error_message(error.response)
|
||||
if response_error_message:
|
||||
reason += response_error_message
|
||||
return False, reason
|
||||
except KeyError:
|
||||
# If the HTTPError is not in the dictionary of errors we know how to handle, don't except it
|
||||
raise error
|
||||
|
||||
def reasons_for_unavailable_status_codes(
|
||||
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
|
||||
) -> Dict[int, str]:
|
||||
"""
|
||||
Returns a dictionary of HTTP status codes that indicate stream
|
||||
unavailability and reasons explaining why a given status code may
|
||||
have occurred and how the user can resolve that error, if applicable.
|
||||
|
||||
:param stream: stream
|
||||
:param logger: source logger
|
||||
:param source: optional (source)
|
||||
:return: A dictionary of (status code, reason) where the 'reason' explains
|
||||
why 'status code' may have occurred and how the user can resolve that
|
||||
error, if applicable.
|
||||
"""
|
||||
forbidden_error_message = f"The endpoint to access stream '{stream.name}' returned 403: Forbidden. "
|
||||
forbidden_error_message += "This is most likely due to insufficient permissions on the credentials in use. "
|
||||
forbidden_error_message += self._visit_docs_message(logger, source)
|
||||
|
||||
reasons_for_codes: Dict[int, str] = {requests.codes.FORBIDDEN: forbidden_error_message}
|
||||
return reasons_for_codes
|
||||
|
||||
@staticmethod
|
||||
def _visit_docs_message(logger: logging.Logger, source: Optional["Source"]) -> str:
|
||||
"""
|
||||
Creates a message indicicating where to look in the documentation for
|
||||
more information on a given source by checking the spec of that source
|
||||
(if provided) for a 'documentationUrl'.
|
||||
|
||||
:param logger: source logger
|
||||
:param source: optional (source)
|
||||
:return: A message telling the user where to go to learn more about the source.
|
||||
"""
|
||||
if not source:
|
||||
return "Please visit the connector's documentation to learn more. "
|
||||
|
||||
try:
|
||||
connector_spec = source.spec(logger)
|
||||
docs_url = connector_spec.documentationUrl
|
||||
if docs_url:
|
||||
return f"Please visit {docs_url} to learn more. "
|
||||
else:
|
||||
return "Please visit the connector's documentation to learn more. "
|
||||
|
||||
except FileNotFoundError: # If we are unit testing without implementing spec() method in source
|
||||
if source:
|
||||
docs_url = f"https://docs.airbyte.com/integrations/sources/{source.name}"
|
||||
else:
|
||||
docs_url = "https://docs.airbyte.com/integrations/sources/test"
|
||||
|
||||
return f"Please visit {docs_url} to learn more."
|
||||
@@ -13,7 +13,9 @@ from urllib.parse import urljoin
|
||||
import requests
|
||||
import requests_cache
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
||||
from airbyte_cdk.sources.streams.core import Stream, StreamData
|
||||
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
|
||||
from requests.auth import AuthBase
|
||||
from requests_cache.session import CachedSession
|
||||
|
||||
@@ -113,6 +115,10 @@ class HttpStream(Stream, ABC):
|
||||
def authenticator(self) -> HttpAuthenticator:
|
||||
return self._authenticator
|
||||
|
||||
@property
|
||||
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
|
||||
return HttpAvailabilityStrategy()
|
||||
|
||||
@abstractmethod
|
||||
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
||||
"""
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
#
|
||||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from typing import Any, Mapping, Optional
|
||||
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
from airbyte_cdk.sources.streams.core import StreamData
|
||||
|
||||
|
||||
class StreamHelper:
|
||||
def get_first_record(self, stream: Stream) -> StreamData:
|
||||
"""
|
||||
Gets the first record for a stream.
|
||||
|
||||
:param stream: stream
|
||||
:return: StreamData containing the first record in the stream
|
||||
"""
|
||||
# Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer)
|
||||
stream_slice = self.get_stream_slice(stream)
|
||||
records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
|
||||
next(records)
|
||||
|
||||
@staticmethod
|
||||
def get_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
|
||||
"""
|
||||
Gets the first stream_slice from a given stream's stream_slices.
|
||||
|
||||
:param stream: stream
|
||||
:return: First stream slice from 'stream_slices' generator
|
||||
"""
|
||||
# We wrap the return output of stream_slices() because some implementations return types that are iterable,
|
||||
# but not iterators such as lists or tuples
|
||||
slices = iter(
|
||||
stream.stream_slices(
|
||||
cursor_field=stream.cursor_field,
|
||||
sync_mode=SyncMode.full_refresh,
|
||||
)
|
||||
)
|
||||
try:
|
||||
return next(slices)
|
||||
except StopIteration:
|
||||
return {}
|
||||
@@ -81,3 +81,14 @@ When we are dealing with streams that depend on the results of another stream, w
|
||||
If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
|
||||
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
|
||||
be returned as a keyword argument.
|
||||
|
||||
## Stream Availability
|
||||
|
||||
The CDK defines an `AvailabilityStrategy` for a stream, which is used to perform the `check_availability` method. This method checks whether
|
||||
the stream is available before performing `read_records`.
|
||||
|
||||
For HTTP streams, a default `HttpAvailabilityStrategy` is defined, which attempts to read the first record of the stream, and excepts
|
||||
a dictionary of known error codes and associated reasons, `reasons_for_unavailable_status_codes`. By default, this list contains only
|
||||
`requests.status_codes.FORBIDDEN` (403), with an associated error message that tells the user that they are likely missing permissions associated with that stream.
|
||||
|
||||
You can override these known errors to except more error codes and inform the user how to resolve errors.
|
||||
|
||||
@@ -15,7 +15,7 @@ README = (HERE / "README.md").read_text()
|
||||
|
||||
setup(
|
||||
name="airbyte-cdk",
|
||||
version="0.12.4",
|
||||
version="0.13.0",
|
||||
description="A framework for writing Airbyte Connectors.",
|
||||
long_description=README,
|
||||
long_description_content_type="text/markdown",
|
||||
|
||||
@@ -2,10 +2,15 @@
|
||||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import Any, Iterable, Mapping, Optional
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
|
||||
from airbyte_cdk.sources.streams.http import HttpStream
|
||||
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
|
||||
|
||||
logger = None
|
||||
config = dict()
|
||||
@@ -27,6 +32,7 @@ record = MagicMock()
|
||||
def test_check_stream_with_slices_as_list(test_name, record, streams_to_check, stream_slice, expectation, slices_as_list):
|
||||
stream = MagicMock()
|
||||
stream.name = "s1"
|
||||
stream.availability_strategy = None
|
||||
if slices_as_list:
|
||||
stream.stream_slices.return_value = [stream_slice]
|
||||
else:
|
||||
@@ -49,3 +55,56 @@ def test_check_stream_with_slices_as_list(test_name, record, streams_to_check, s
|
||||
|
||||
def mock_read_records(responses, default_response=None, **kwargs):
|
||||
return lambda stream_slice, sync_mode: responses[frozenset(stream_slice)] if frozenset(stream_slice) in responses else default_response
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_name, response_code, available_expectation, expected_messages",
|
||||
[
|
||||
("test_stream_unavailable_unhandled_error", 404, False, ["Unable to connect to stream mock_http_stream", "404 Client Error"]),
|
||||
("test_stream_unavailable_handled_error", 403, False, [
|
||||
"The endpoint to access stream 'mock_http_stream' returned 403: Forbidden.",
|
||||
"This is most likely due to insufficient permissions on the credentials in use.",
|
||||
]),
|
||||
("test_stream_available", 200, True, []),
|
||||
],
|
||||
)
|
||||
def test_check_http_stream_via_availability_strategy(mocker, test_name, response_code, available_expectation, expected_messages):
|
||||
class MockHttpStream(HttpStream):
|
||||
url_base = "https://test_base_url.com"
|
||||
primary_key = ""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.resp_counter = 1
|
||||
|
||||
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
||||
return None
|
||||
|
||||
def path(self, **kwargs) -> str:
|
||||
return ""
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
stub_resp = {"data": self.resp_counter}
|
||||
self.resp_counter += 1
|
||||
yield stub_resp
|
||||
pass
|
||||
|
||||
http_stream = MockHttpStream()
|
||||
assert isinstance(http_stream, HttpStream)
|
||||
assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy)
|
||||
|
||||
source = MagicMock()
|
||||
source.streams.return_value = [http_stream]
|
||||
|
||||
check_stream = CheckStream(stream_names=["mock_http_stream"], options={})
|
||||
|
||||
req = requests.Response()
|
||||
req.status_code = response_code
|
||||
mocker.patch.object(requests.Session, "send", return_value=req)
|
||||
|
||||
logger = logging.getLogger(f"airbyte.{getattr(source, 'name', '')}")
|
||||
stream_is_available, reason = check_stream.check_connection(source, logger, config)
|
||||
|
||||
assert stream_is_available == available_expectation
|
||||
for message in expected_messages:
|
||||
assert message in reason
|
||||
|
||||
@@ -0,0 +1,141 @@
|
||||
#
|
||||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import Any, Iterable, List, Mapping, Optional, Tuple
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from airbyte_cdk.sources import AbstractSource
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
|
||||
from airbyte_cdk.sources.streams.http.http import HttpStream
|
||||
from requests import HTTPError
|
||||
|
||||
logger = logging.getLogger("airbyte")
|
||||
|
||||
|
||||
class MockHttpStream(HttpStream):
|
||||
url_base = "https://test_base_url.com"
|
||||
primary_key = ""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.resp_counter = 1
|
||||
|
||||
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
||||
return None
|
||||
|
||||
def path(self, **kwargs) -> str:
|
||||
return ""
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
stub_resp = {"data": self.resp_counter}
|
||||
self.resp_counter += 1
|
||||
yield stub_resp
|
||||
pass
|
||||
|
||||
def retry_factor(self) -> float:
|
||||
return 0.01
|
||||
|
||||
|
||||
def test_default_http_availability_strategy(mocker):
|
||||
http_stream = MockHttpStream()
|
||||
assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy)
|
||||
|
||||
class MockResponse(requests.Response, mocker.MagicMock):
|
||||
def __init__(self, *args, **kvargs):
|
||||
mocker.MagicMock.__init__(self)
|
||||
requests.Response.__init__(self, **kvargs)
|
||||
self.json = mocker.MagicMock()
|
||||
|
||||
response = MockResponse()
|
||||
response.status_code = 403
|
||||
response.json.return_value = {"error": "Oh no!"}
|
||||
mocker.patch.object(requests.Session, "send", return_value=response)
|
||||
|
||||
stream_is_available, reason = http_stream.check_availability(logger)
|
||||
assert not stream_is_available
|
||||
|
||||
expected_messages = [
|
||||
"This is most likely due to insufficient permissions on the credentials in use.",
|
||||
"Please visit the connector's documentation to learn more.",
|
||||
"Oh no!",
|
||||
]
|
||||
for message in expected_messages:
|
||||
assert message in reason
|
||||
|
||||
req = requests.Response()
|
||||
req.status_code = 200
|
||||
mocker.patch.object(requests.Session, "send", return_value=req)
|
||||
|
||||
stream_is_available, _ = http_stream.check_availability(logger)
|
||||
assert stream_is_available
|
||||
|
||||
|
||||
def test_http_availability_connector_specific_docs(mocker):
|
||||
class MockSource(AbstractSource):
|
||||
def __init__(self, streams: List[Stream] = None):
|
||||
self._streams = streams
|
||||
|
||||
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
|
||||
return True, ""
|
||||
|
||||
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
||||
if not self._streams:
|
||||
raise Exception("Stream is not set")
|
||||
return self._streams
|
||||
|
||||
http_stream = MockHttpStream()
|
||||
source = MockSource(streams=[http_stream])
|
||||
assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy)
|
||||
|
||||
req = requests.Response()
|
||||
req.status_code = 403
|
||||
mocker.patch.object(requests.Session, "send", return_value=req, json={"error": "Oh no!"})
|
||||
|
||||
stream_is_available, reason = http_stream.check_availability(logger, source)
|
||||
assert not stream_is_available
|
||||
|
||||
expected_messages = [
|
||||
f"The endpoint to access stream '{http_stream.name}' returned 403: Forbidden.",
|
||||
"This is most likely due to insufficient permissions on the credentials in use.",
|
||||
f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more.",
|
||||
# "Oh no!",
|
||||
]
|
||||
for message in expected_messages:
|
||||
assert message in reason
|
||||
|
||||
|
||||
def test_http_availability_raises_unhandled_error(mocker):
|
||||
http_stream = MockHttpStream()
|
||||
assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy)
|
||||
|
||||
req = requests.Response()
|
||||
req.status_code = 404
|
||||
mocker.patch.object(requests.Session, "send", return_value=req)
|
||||
|
||||
with pytest.raises(HTTPError):
|
||||
http_stream.check_availability(logger)
|
||||
|
||||
|
||||
def test_send_handles_retries_when_checking_availability(mocker, caplog):
|
||||
http_stream = MockHttpStream()
|
||||
assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy)
|
||||
|
||||
req_1 = requests.Response()
|
||||
req_1.status_code = 429
|
||||
req_2 = requests.Response()
|
||||
req_2.status_code = 503
|
||||
req_3 = requests.Response()
|
||||
req_3.status_code = 200
|
||||
mock_send = mocker.patch.object(requests.Session, "send", side_effect=[req_1, req_2, req_3])
|
||||
|
||||
with caplog.at_level(logging.INFO):
|
||||
stream_is_available, _ = http_stream.check_availability(logger)
|
||||
|
||||
assert stream_is_available
|
||||
assert mock_send.call_count == 3
|
||||
for message in ["Caught retryable error", "Response Code: 429", "Response Code: 503"]:
|
||||
assert message in caplog.text
|
||||
@@ -0,0 +1,70 @@
|
||||
#
|
||||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import Any, Iterable, List, Mapping, Optional, Tuple, Union
|
||||
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources import Source
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
||||
from airbyte_cdk.sources.streams.core import StreamData
|
||||
|
||||
logger = logging.getLogger("airbyte")
|
||||
|
||||
|
||||
class MockStream(Stream):
|
||||
def __init__(self, name: str) -> Stream:
|
||||
self._name = name
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
|
||||
pass
|
||||
|
||||
def read_records(
|
||||
self,
|
||||
sync_mode: SyncMode,
|
||||
cursor_field: List[str] = None,
|
||||
stream_slice: Mapping[str, Any] = None,
|
||||
stream_state: Mapping[str, Any] = None,
|
||||
) -> Iterable[StreamData]:
|
||||
pass
|
||||
|
||||
|
||||
def test_no_availability_strategy():
|
||||
stream_1 = MockStream("stream")
|
||||
assert stream_1.availability_strategy is None
|
||||
|
||||
stream_1_is_available, _ = stream_1.check_availability(logger)
|
||||
assert stream_1_is_available
|
||||
|
||||
|
||||
def test_availability_strategy():
|
||||
class MockAvailabilityStrategy(AvailabilityStrategy):
|
||||
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional[Source]) -> Tuple[bool, any]:
|
||||
if stream.name == "available_stream":
|
||||
return True, None
|
||||
return False, f"Could not reach stream '{stream.name}'."
|
||||
|
||||
class MockStreamWithAvailabilityStrategy(MockStream):
|
||||
@property
|
||||
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
|
||||
return MockAvailabilityStrategy()
|
||||
|
||||
stream_1 = MockStreamWithAvailabilityStrategy("available_stream")
|
||||
stream_2 = MockStreamWithAvailabilityStrategy("unavailable_stream")
|
||||
|
||||
for stream in [stream_1, stream_2]:
|
||||
assert isinstance(stream.availability_strategy, MockAvailabilityStrategy)
|
||||
|
||||
stream_1_is_available, _ = stream_1.check_availability(logger)
|
||||
assert stream_1_is_available
|
||||
|
||||
stream_2_is_available, reason = stream_2.check_availability(logger)
|
||||
assert not stream_2_is_available
|
||||
assert "Could not reach stream 'unavailable_stream'" in reason
|
||||
@@ -7,10 +7,10 @@ import logging
|
||||
import tempfile
|
||||
from collections import defaultdict
|
||||
from contextlib import nullcontext as does_not_raise
|
||||
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple
|
||||
from unittest.mock import MagicMock
|
||||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from airbyte_cdk.models import (
|
||||
AirbyteGlobalState,
|
||||
AirbyteStateBlob,
|
||||
@@ -24,6 +24,7 @@ from airbyte_cdk.models import (
|
||||
)
|
||||
from airbyte_cdk.sources import AbstractSource, Source
|
||||
from airbyte_cdk.sources.streams.core import Stream
|
||||
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
|
||||
from airbyte_cdk.sources.streams.http.http import HttpStream
|
||||
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
|
||||
from pydantic import ValidationError
|
||||
@@ -43,10 +44,15 @@ class MockSource(Source):
|
||||
|
||||
|
||||
class MockAbstractSource(AbstractSource):
|
||||
def __init__(self, streams: Optional[List[Stream]] = None):
|
||||
self._streams = streams
|
||||
|
||||
def check_connection(self, *args, **kwargs) -> Tuple[bool, Optional[Any]]:
|
||||
return True, ""
|
||||
|
||||
def streams(self, *args, **kwargs) -> List[Stream]:
|
||||
if self._streams:
|
||||
return self._streams
|
||||
return []
|
||||
|
||||
|
||||
@@ -79,26 +85,30 @@ def abstract_source(mocker):
|
||||
mocker.patch.multiple(HttpStream, __abstractmethods__=set())
|
||||
mocker.patch.multiple(Stream, __abstractmethods__=set())
|
||||
|
||||
class MockHttpStream(MagicMock, HttpStream):
|
||||
class MockHttpStream(mocker.MagicMock, HttpStream):
|
||||
url_base = "http://example.com"
|
||||
path = "/dummy/path"
|
||||
get_json_schema = MagicMock()
|
||||
get_json_schema = mocker.MagicMock()
|
||||
|
||||
def supports_incremental(self):
|
||||
return True
|
||||
|
||||
def __init__(self, *args, **kvargs):
|
||||
MagicMock.__init__(self)
|
||||
mocker.MagicMock.__init__(self)
|
||||
HttpStream.__init__(self, *args, kvargs)
|
||||
self.read_records = MagicMock()
|
||||
self.read_records = mocker.MagicMock()
|
||||
|
||||
class MockStream(MagicMock, Stream):
|
||||
@property
|
||||
def availability_strategy(self):
|
||||
return None
|
||||
|
||||
class MockStream(mocker.MagicMock, Stream):
|
||||
page_size = None
|
||||
get_json_schema = MagicMock()
|
||||
get_json_schema = mocker.MagicMock()
|
||||
|
||||
def __init__(self, *args, **kvargs):
|
||||
MagicMock.__init__(self)
|
||||
self.read_records = MagicMock()
|
||||
def __init__(self, **kwargs):
|
||||
mocker.MagicMock.__init__(self)
|
||||
self.read_records = mocker.MagicMock()
|
||||
|
||||
streams = [MockHttpStream(), MockStream()]
|
||||
|
||||
@@ -385,8 +395,8 @@ def test_internal_config(abstract_source, catalog):
|
||||
assert not non_http_stream.page_size
|
||||
|
||||
|
||||
def test_internal_config_limit(abstract_source, catalog):
|
||||
logger_mock = MagicMock()
|
||||
def test_internal_config_limit(mocker, abstract_source, catalog):
|
||||
logger_mock = mocker.MagicMock()
|
||||
logger_mock.level = logging.DEBUG
|
||||
del catalog.streams[1]
|
||||
STREAM_LIMIT = 2
|
||||
@@ -423,8 +433,8 @@ def test_internal_config_limit(abstract_source, catalog):
|
||||
SCHEMA = {"type": "object", "properties": {"value": {"type": "string"}}}
|
||||
|
||||
|
||||
def test_source_config_no_transform(abstract_source, catalog):
|
||||
logger_mock = MagicMock()
|
||||
def test_source_config_no_transform(mocker, abstract_source, catalog):
|
||||
logger_mock = mocker.MagicMock()
|
||||
logger_mock.level = logging.DEBUG
|
||||
streams = abstract_source.streams(None)
|
||||
http_stream, non_http_stream = streams
|
||||
@@ -437,8 +447,8 @@ def test_source_config_no_transform(abstract_source, catalog):
|
||||
assert non_http_stream.get_json_schema.call_count == 5
|
||||
|
||||
|
||||
def test_source_config_transform(abstract_source, catalog):
|
||||
logger_mock = MagicMock()
|
||||
def test_source_config_transform(mocker, abstract_source, catalog):
|
||||
logger_mock = mocker.MagicMock()
|
||||
logger_mock.level = logging.DEBUG
|
||||
streams = abstract_source.streams(None)
|
||||
http_stream, non_http_stream = streams
|
||||
@@ -451,8 +461,8 @@ def test_source_config_transform(abstract_source, catalog):
|
||||
assert [r.record.data for r in records] == [{"value": "23"}] * 2
|
||||
|
||||
|
||||
def test_source_config_transform_and_no_transform(abstract_source, catalog):
|
||||
logger_mock = MagicMock()
|
||||
def test_source_config_transform_and_no_transform(mocker, abstract_source, catalog):
|
||||
logger_mock = mocker.MagicMock()
|
||||
logger_mock.level = logging.DEBUG
|
||||
streams = abstract_source.streams(None)
|
||||
http_stream, non_http_stream = streams
|
||||
@@ -462,3 +472,116 @@ def test_source_config_transform_and_no_transform(abstract_source, catalog):
|
||||
records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})]
|
||||
assert len(records) == 2
|
||||
assert [r.record.data for r in records] == [{"value": "23"}, {"value": 23}]
|
||||
|
||||
|
||||
def test_read_default_http_availability_strategy_stream_available(catalog, mocker):
|
||||
mocker.patch.multiple(HttpStream, __abstractmethods__=set())
|
||||
mocker.patch.multiple(Stream, __abstractmethods__=set())
|
||||
|
||||
class MockHttpStream(mocker.MagicMock, HttpStream):
|
||||
url_base = "http://example.com"
|
||||
path = "/dummy/path"
|
||||
get_json_schema = mocker.MagicMock()
|
||||
|
||||
def supports_incremental(self):
|
||||
return True
|
||||
|
||||
def __init__(self, *args, **kvargs):
|
||||
mocker.MagicMock.__init__(self)
|
||||
HttpStream.__init__(self, *args, kvargs)
|
||||
self.read_records = mocker.MagicMock()
|
||||
|
||||
class MockStream(mocker.MagicMock, Stream):
|
||||
page_size = None
|
||||
get_json_schema = mocker.MagicMock()
|
||||
|
||||
def __init__(self, *args, **kvargs):
|
||||
mocker.MagicMock.__init__(self)
|
||||
self.read_records = mocker.MagicMock()
|
||||
|
||||
streams = [MockHttpStream(), MockStream()]
|
||||
http_stream, non_http_stream = streams
|
||||
assert isinstance(http_stream, HttpStream)
|
||||
assert not isinstance(non_http_stream, HttpStream)
|
||||
|
||||
assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy)
|
||||
assert non_http_stream.availability_strategy is None
|
||||
|
||||
# Add an extra record for the default HttpAvailabilityStrategy to pull from
|
||||
# during the try: next(records) check, since we are mocking the return value
|
||||
# and not re-creating the generator like we would during actual reading
|
||||
http_stream.read_records.return_value = iter([{"value": "test"}] + [{}] * 3)
|
||||
non_http_stream.read_records.return_value = iter([{}] * 3)
|
||||
|
||||
source = MockAbstractSource(streams=streams)
|
||||
logger = logging.getLogger(f"airbyte.{getattr(abstract_source, 'name', '')}")
|
||||
records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})]
|
||||
# 3 for http stream and 3 for non http stream
|
||||
assert len(records) == 3 + 3
|
||||
assert http_stream.read_records.called
|
||||
assert non_http_stream.read_records.called
|
||||
|
||||
|
||||
def test_read_default_http_availability_strategy_stream_unavailable(catalog, mocker, caplog):
|
||||
mocker.patch.multiple(Stream, __abstractmethods__=set())
|
||||
|
||||
class MockHttpStream(HttpStream):
|
||||
url_base = "https://test_base_url.com"
|
||||
primary_key = ""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.resp_counter = 1
|
||||
|
||||
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
||||
return None
|
||||
|
||||
def path(self, **kwargs) -> str:
|
||||
return ""
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
stub_response = {"data": self.resp_counter}
|
||||
self.resp_counter += 1
|
||||
yield stub_response
|
||||
|
||||
class MockStream(mocker.MagicMock, Stream):
|
||||
page_size = None
|
||||
get_json_schema = mocker.MagicMock()
|
||||
|
||||
def __init__(self, *args, **kvargs):
|
||||
mocker.MagicMock.__init__(self)
|
||||
self.read_records = mocker.MagicMock()
|
||||
|
||||
streams = [MockHttpStream(), MockStream()]
|
||||
http_stream, non_http_stream = streams
|
||||
assert isinstance(http_stream, HttpStream)
|
||||
assert not isinstance(non_http_stream, HttpStream)
|
||||
|
||||
assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy)
|
||||
assert non_http_stream.availability_strategy is None
|
||||
|
||||
# Don't set anything for read_records return value for HttpStream, since
|
||||
# it should be skipped due to the stream being unavailable
|
||||
non_http_stream.read_records.return_value = iter([{}] * 3)
|
||||
|
||||
# Patch HTTP request to stream endpoint to make it unavailable
|
||||
req = requests.Response()
|
||||
req.status_code = 403
|
||||
mocker.patch.object(requests.Session, "send", return_value=req)
|
||||
|
||||
source = MockAbstractSource(streams=streams)
|
||||
logger = logging.getLogger("test_read_default_http_availability_strategy_stream_unavailable")
|
||||
with caplog.at_level(logging.WARNING):
|
||||
records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})]
|
||||
|
||||
# 0 for http stream and 3 for non http stream
|
||||
assert len(records) == 0 + 3
|
||||
assert non_http_stream.read_records.called
|
||||
expected_logs = [
|
||||
f"Skipped syncing stream '{http_stream.name}' because it was unavailable.",
|
||||
f"The endpoint to access stream '{http_stream.name}' returned 403: Forbidden.",
|
||||
"This is most likely due to insufficient permissions on the credentials in use.",
|
||||
f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more."
|
||||
]
|
||||
for message in expected_logs:
|
||||
assert message in caplog.text
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
@@ -12,14 +13,13 @@ from collections.abc import Mapping
|
||||
from pathlib import Path
|
||||
|
||||
import jsonref
|
||||
from airbyte_cdk.logger import AirbyteLogger
|
||||
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification, FailureType
|
||||
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit
|
||||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
|
||||
from pytest import fixture
|
||||
from pytest import raises as pytest_raises
|
||||
|
||||
logger = AirbyteLogger()
|
||||
logger = logging.getLogger("airbyte")
|
||||
|
||||
|
||||
MODULE = sys.modules[__name__]
|
||||
|
||||
Reference in New Issue
Block a user