bugfix infinite pagination in CDK (#3366)
This commit is contained in:
7
airbyte-cdk/python/CHANGELOG.md
Normal file
7
airbyte-cdk/python/CHANGELOG.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Changelog
|
||||
|
||||
## 0.1.2
|
||||
Fix an issue that caused infinite pagination: https://github.com/airbytehq/airbyte/pull/3366
|
||||
|
||||
## 0.1.1
|
||||
Initial Release
|
||||
@@ -213,8 +213,8 @@ class HttpStream(Stream, ABC):
|
||||
stream_state = stream_state or {}
|
||||
pagination_complete = False
|
||||
|
||||
next_page_token = None
|
||||
while not pagination_complete:
|
||||
next_page_token = None
|
||||
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
|
||||
request = self._create_prepared_request(
|
||||
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
|
||||
|
||||
@@ -33,7 +33,7 @@ README = (HERE / "README.md").read_text()
|
||||
|
||||
setup(
|
||||
name="airbyte-cdk",
|
||||
version="0.1.1",
|
||||
version="0.1.2",
|
||||
description="A framework for writing Airbyte Connectors.",
|
||||
long_description=README,
|
||||
long_description_content_type="text/markdown",
|
||||
|
||||
@@ -31,7 +31,6 @@ from airbyte_cdk.sources.streams.http.exceptions import UserDefinedBackoffExcept
|
||||
|
||||
|
||||
class StubBasicReadHttpStream(HttpStream):
|
||||
|
||||
url_base = "https://test_base_url.com"
|
||||
primary_key = ""
|
||||
|
||||
@@ -60,7 +59,6 @@ class StubBasicReadHttpStream(HttpStream):
|
||||
|
||||
|
||||
def test_stub_basic_read_http_stream_read_records(mocker):
|
||||
|
||||
stream = StubBasicReadHttpStream()
|
||||
blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.
|
||||
mocker.patch.object(StubBasicReadHttpStream, "_send_request", return_value=blank_response)
|
||||
@@ -71,35 +69,53 @@ def test_stub_basic_read_http_stream_read_records(mocker):
|
||||
|
||||
|
||||
class StubNextPageTokenHttpStream(StubBasicReadHttpStream):
|
||||
current_page = 0
|
||||
|
||||
current_token = 0
|
||||
max_token_number = 6
|
||||
def __init__(self, pages: int = 5):
|
||||
super().__init__()
|
||||
self._pages = pages
|
||||
|
||||
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
||||
while self.current_token < 5:
|
||||
self.current_token += 1
|
||||
return {"token": "token"}
|
||||
while self.current_page < self._pages:
|
||||
page_token = {"page": self.current_page}
|
||||
self.current_page += 1
|
||||
return page_token
|
||||
return None
|
||||
|
||||
|
||||
def test_stub_next_page_token_http_stream_read_records(mocker):
|
||||
|
||||
stream = StubNextPageTokenHttpStream()
|
||||
def test_next_page_token_is_input_to_other_methods(mocker):
|
||||
""" Validates that the return value from next_page_token is passed into other methods that need it like request_params, headers, body, etc.."""
|
||||
pages = 5
|
||||
stream = StubNextPageTokenHttpStream(pages=pages)
|
||||
blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.
|
||||
mocker.patch.object(StubNextPageTokenHttpStream, "_send_request", return_value=blank_response)
|
||||
|
||||
methods = ["request_params", "request_headers", "request_body_json"]
|
||||
for method in methods:
|
||||
# Wrap all methods we're interested in testing with mocked objects so we can later spy on their input args and verify they were what we expect
|
||||
mocker.patch.object(stream, method, wraps=getattr(stream, method))
|
||||
|
||||
records = list(stream.read_records(SyncMode.full_refresh))
|
||||
|
||||
assert [{"data": 1}, {"data": 2}, {"data": 3}, {"data": 4}, {"data": 5}, {"data": 6}] == records
|
||||
# Since we have 5 pages, we expect 5 tokens which are {"page":1}, {"page":2}, etc...
|
||||
expected_next_page_tokens = [{"page": i} for i in range(pages)]
|
||||
for method in methods:
|
||||
# First assert that they were called with no next_page_token. This is the first call in the pagination loop.
|
||||
getattr(stream, method).assert_any_call(next_page_token=None, stream_slice=None, stream_state={})
|
||||
for token in expected_next_page_tokens:
|
||||
# Then verify that each method
|
||||
getattr(stream, method).assert_any_call(next_page_token=token, stream_slice=None, stream_state={})
|
||||
|
||||
expected = [{"data": 1}, {"data": 2}, {"data": 3}, {"data": 4}, {"data": 5}, {"data": 6}]
|
||||
|
||||
assert expected == records
|
||||
|
||||
|
||||
class StubBadUrlHttpStream(StubBasicReadHttpStream):
|
||||
|
||||
url_base = "bad_url"
|
||||
|
||||
|
||||
def test_stub_bad_url_http_stream_read_records(mocker):
|
||||
|
||||
stream = StubBadUrlHttpStream()
|
||||
|
||||
with pytest.raises(requests.exceptions.RequestException):
|
||||
@@ -112,7 +128,6 @@ class StubCustomBackoffHttpStream(StubBasicReadHttpStream):
|
||||
|
||||
|
||||
def test_stub_custom_backoff_http_stream(mocker):
|
||||
|
||||
stream = StubCustomBackoffHttpStream()
|
||||
req = requests.Response()
|
||||
req.status_code = 429
|
||||
|
||||
Reference in New Issue
Block a user