Low-code: fix duplicate stream_slicer update (#24827)
* first tentative fix * cleaner fix * refactor test * format * format * move to utils file * use simpler implementation
This commit is contained in:
@@ -372,12 +372,14 @@ class SimpleRetriever(Retriever, HttpStream):
|
||||
stream_slice,
|
||||
stream_state,
|
||||
)
|
||||
cursor_updated = False
|
||||
for record in records_generator:
|
||||
# Only record messages should be parsed to update the cursor which is indicated by the Mapping type
|
||||
if isinstance(record, Mapping):
|
||||
self.stream_slicer.update_cursor(stream_slice, last_record=record)
|
||||
cursor_updated = True
|
||||
yield record
|
||||
else:
|
||||
if not cursor_updated:
|
||||
last_record = self._last_records[-1] if self._last_records else None
|
||||
if last_record and isinstance(last_record, Mapping):
|
||||
self.stream_slicer.update_cursor(stream_slice, last_record=last_record)
|
||||
|
||||
@@ -32,7 +32,7 @@ request_response_logs = [
|
||||
config = {}
|
||||
|
||||
|
||||
@patch.object(HttpStream, "_read_pages", return_value=[])
|
||||
@patch.object(HttpStream, "_read_pages", return_value=iter([]))
|
||||
def test_simple_retriever_full(mock_http_stream):
|
||||
requester = MagicMock()
|
||||
request_params = {"param": "value"}
|
||||
@@ -117,7 +117,7 @@ def test_simple_retriever_full(mock_http_stream):
|
||||
paginator.reset.assert_called()
|
||||
|
||||
|
||||
@patch.object(HttpStream, "_read_pages", return_value=[*request_response_logs, *records])
|
||||
@patch.object(HttpStream, "_read_pages", return_value=iter([*request_response_logs, *records]))
|
||||
def test_simple_retriever_with_request_response_logs(mock_http_stream):
|
||||
requester = MagicMock()
|
||||
paginator = MagicMock()
|
||||
@@ -153,7 +153,7 @@ def test_simple_retriever_with_request_response_logs(mock_http_stream):
|
||||
assert actual_messages[3] == records[1]
|
||||
|
||||
|
||||
@patch.object(HttpStream, "_read_pages", return_value=[])
|
||||
@patch.object(HttpStream, "_read_pages", return_value=iter([]))
|
||||
def test_simple_retriever_with_request_response_log_last_records(mock_http_stream):
|
||||
requester = MagicMock()
|
||||
paginator = MagicMock()
|
||||
@@ -667,5 +667,37 @@ def test_limit_stream_slices():
|
||||
assert truncated_slices == _generate_slices(maximum_number_of_slices)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_name, last_records, records, expected_stream_slicer_update_count",
|
||||
[
|
||||
("test_two_records", [{"id": -1}], records, 2),
|
||||
("test_no_records", [{"id": -1}], [], 1),
|
||||
("test_no_records_no_previous_records", [], [], 0)
|
||||
]
|
||||
)
|
||||
def test_read_records_updates_stream_slicer_once_if_no_records(test_name, last_records, records, expected_stream_slicer_update_count):
|
||||
with patch.object(HttpStream, "_read_pages", return_value=iter(records)):
|
||||
requester = MagicMock()
|
||||
paginator = MagicMock()
|
||||
record_selector = MagicMock()
|
||||
stream_slicer = MagicMock()
|
||||
|
||||
retriever = SimpleRetriever(
|
||||
name="stream_name",
|
||||
primary_key=primary_key,
|
||||
requester=requester,
|
||||
paginator=paginator,
|
||||
record_selector=record_selector,
|
||||
stream_slicer=stream_slicer,
|
||||
parameters={},
|
||||
config={},
|
||||
)
|
||||
retriever._last_records = last_records
|
||||
|
||||
list(retriever.read_records(sync_mode=SyncMode.incremental, stream_slice={"repository": "airbyte"}))
|
||||
|
||||
assert stream_slicer.update_cursor.call_count == expected_stream_slicer_update_count
|
||||
|
||||
|
||||
def _generate_slices(number_of_slices):
|
||||
return [{"date": f"2022-01-0{day + 1}"} for day in range(number_of_slices)]
|
||||
|
||||
Reference in New Issue
Block a user