low-code connectors: reset pagination between stream slices (#15330)
* reset pagination between stream slices * Update airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py Co-authored-by: Sherif A. Nada <snadalive@gmail.com> * Update airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py Co-authored-by: Sherif A. Nada <snadalive@gmail.com> * patch Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
This commit is contained in:
@@ -147,6 +147,9 @@ class LimitPaginator(Paginator, JsonSchemaMixin):
|
||||
) -> Mapping[str, Any]:
|
||||
return self._get_request_options(RequestOptionType.body_json)
|
||||
|
||||
def reset(self):
|
||||
self.pagination_strategy.reset()
|
||||
|
||||
def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
|
||||
options = {}
|
||||
if self.page_token_option.inject_into == option_type:
|
||||
|
||||
@@ -59,3 +59,7 @@ class NoPagination(Paginator):
|
||||
|
||||
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]:
|
||||
return {}
|
||||
|
||||
def reset(self):
|
||||
# No state to reset
|
||||
pass
|
||||
|
||||
@@ -19,6 +19,12 @@ class Paginator(RequestOptionsProvider):
|
||||
If the next_page_token is the path to the next page of records, then it should be accessed through the `path` method
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def reset(self):
|
||||
"""
|
||||
Reset the pagination's inner state
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
|
||||
"""
|
||||
|
||||
@@ -46,3 +46,7 @@ class CursorPaginationStrategy(PaginationStrategy, JsonSchemaMixin):
|
||||
return None
|
||||
token = self.cursor_value.eval(config=self.config, last_records=last_records, response=decoded_response)
|
||||
return token if token else None
|
||||
|
||||
def reset(self):
|
||||
# No state to reset
|
||||
pass
|
||||
|
||||
@@ -31,3 +31,6 @@ class OffsetIncrement(PaginationStrategy, JsonSchemaMixin):
|
||||
else:
|
||||
self._offset += len(last_records)
|
||||
return self._offset
|
||||
|
||||
def reset(self):
|
||||
self._offset = 0
|
||||
|
||||
@@ -23,11 +23,14 @@ class PageIncrement(PaginationStrategy, JsonSchemaMixin):
|
||||
options: InitVar[Mapping[str, Any]]
|
||||
|
||||
def __post_init__(self, options: Mapping[str, Any]):
|
||||
self._offset = 0
|
||||
self._page = 0
|
||||
|
||||
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
|
||||
if len(last_records) < self.page_size:
|
||||
return None
|
||||
else:
|
||||
self._offset += 1
|
||||
return self._offset
|
||||
self._page += 1
|
||||
return self._page
|
||||
|
||||
def reset(self):
|
||||
self._page = 0
|
||||
|
||||
@@ -24,3 +24,9 @@ class PaginationStrategy(JsonSchemaMixin):
|
||||
:return: next page token. Returns None if there are no more pages to fetch
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def reset(self):
|
||||
"""
|
||||
Reset the pagination's inner state
|
||||
"""
|
||||
|
||||
@@ -342,6 +342,7 @@ class SimpleRetriever(Retriever, HttpStream, JsonSchemaMixin):
|
||||
) -> Iterable[Mapping[str, Any]]:
|
||||
# Warning: use self.state instead of the stream_state passed as argument!
|
||||
stream_slice = stream_slice or {} # None-check
|
||||
self.paginator.reset()
|
||||
records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state)
|
||||
for r in records_generator:
|
||||
self.stream_slicer.update_cursor(stream_slice, last_record=r)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
#
|
||||
|
||||
import json
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
@@ -159,3 +160,13 @@ def test_limit_cannot_be_set_in_path():
|
||||
assert False
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_reset():
|
||||
limit_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="limit", options={})
|
||||
page_token_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="offset", options={})
|
||||
url_base = "https://airbyte.io"
|
||||
config = {}
|
||||
strategy = MagicMock()
|
||||
LimitPaginator(2, limit_request_option, page_token_request_option, strategy, config, url_base, options={}).reset()
|
||||
assert strategy.reset.called
|
||||
|
||||
@@ -30,3 +30,6 @@ def test_offset_increment_paginator_strategy(test_name, page_size, expected_next
|
||||
next_page_token = paginator_strategy.next_page_token(response, last_records)
|
||||
assert expected_next_page_token == next_page_token
|
||||
assert expected_offset == paginator_strategy._offset
|
||||
|
||||
paginator_strategy.reset()
|
||||
assert 0 == paginator_strategy._offset
|
||||
|
||||
@@ -18,7 +18,7 @@ from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_incre
|
||||
)
|
||||
def test_page_increment_paginator_strategy(test_name, page_size, expected_next_page_token, expected_offset):
|
||||
paginator_strategy = PageIncrement(page_size, options={})
|
||||
assert paginator_strategy._offset == 0
|
||||
assert paginator_strategy._page == 0
|
||||
|
||||
response = requests.Response()
|
||||
|
||||
@@ -29,4 +29,7 @@ def test_page_increment_paginator_strategy(test_name, page_size, expected_next_p
|
||||
|
||||
next_page_token = paginator_strategy.next_page_token(response, last_records)
|
||||
assert expected_next_page_token == next_page_token
|
||||
assert expected_offset == paginator_strategy._offset
|
||||
assert expected_offset == paginator_strategy._page
|
||||
|
||||
paginator_strategy.reset()
|
||||
assert 0 == paginator_strategy._page
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status
|
||||
import pytest
|
||||
@@ -15,13 +15,15 @@ from airbyte_cdk.sources.declarative.requesters.request_option import RequestOpt
|
||||
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
|
||||
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
|
||||
from airbyte_cdk.sources.streams.http.auth import NoAuth
|
||||
from airbyte_cdk.sources.streams.http.http import HttpStream
|
||||
|
||||
primary_key = "pk"
|
||||
records = [{"id": 1}, {"id": 2}]
|
||||
config = {}
|
||||
|
||||
|
||||
def test_simple_retriever_full():
|
||||
@patch.object(HttpStream, "read_records", return_value=[])
|
||||
def test_simple_retriever_full(mock_http_stream):
|
||||
requester = MagicMock()
|
||||
request_params = {"param": "value"}
|
||||
requester.get_request_params.return_value = request_params
|
||||
@@ -53,6 +55,9 @@ def test_simple_retriever_full():
|
||||
backoff_time = 60
|
||||
should_retry = ResponseStatus.retry(backoff_time)
|
||||
requester.should_retry.return_value = should_retry
|
||||
request_body_json = {"body": "json"}
|
||||
requester.request_body_json.return_value = request_body_json
|
||||
|
||||
request_body_data = {"body": "data"}
|
||||
requester.get_request_body_data.return_value = request_body_data
|
||||
request_body_json = {"body": "json"}
|
||||
@@ -92,12 +97,14 @@ def test_simple_retriever_full():
|
||||
assert not retriever.raise_on_http_errors
|
||||
assert retriever.should_retry(requests.Response())
|
||||
assert retriever.backoff_time(requests.Response()) == backoff_time
|
||||
assert retriever.request_body_data(None, None, None) == request_body_data
|
||||
assert retriever.request_body_json(None, None, None) == request_body_json
|
||||
assert retriever.request_kwargs(None, None, None) == request_kwargs
|
||||
assert retriever.cache_filename == cache_filename
|
||||
assert retriever.use_cache == use_cache
|
||||
|
||||
[r for r in retriever.read_records(SyncMode.full_refresh)]
|
||||
paginator.reset.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_name, requester_response, expected_should_retry, expected_backoff_time",
|
||||
|
||||
Reference in New Issue
Block a user