1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source-Iterable: ignore 500 per slice for ListUsers stream (#24962)

* Source-Iterable: ignore 500 per slice for ListUsers stream

* Source-Iterable: add unittest

* Source-Iterable: fix unittest

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Roman Yermilov [GL]
2023-04-07 13:52:22 +04:00
committed by GitHub
parent b28efbf9f7
commit 56038d0368
10 changed files with 63 additions and 9 deletions

View File

@@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.26
LABEL io.airbyte.version=0.1.27
LABEL io.airbyte.name=airbyte/source-iterable

View File

@@ -329,6 +329,7 @@ class ListUsers(IterableStream):
name = "list_users"
# enable caching, because this stream used by other ones
use_cache = True
raise_on_http_errors = False
def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
return f"lists/{self.data_field}?listId={stream_slice['list_id']}"
@@ -339,8 +340,14 @@ class ListUsers(IterableStream):
yield {"list_id": list_record["id"]}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
return []
if not response.ok:
if not self.check_unauthorized_key(response):
return []
# Avoid block whole of sync if a slice is broken. Skip current slice on 500 Internal Server Error.
# See on-call: https://github.com/airbytehq/oncall/issues/1592#issuecomment-1499109251
if response.status_code == codes.INTERNAL_SERVER_ERROR:
return []
response.raise_for_status()
list_id = self._get_list_id(response.url)
for user in response.iter_lines():
yield {"email": user.decode(), "listId": list_id}

View File

@@ -37,13 +37,13 @@ class IterableGenericErrorHandler:
def handle(self, response: requests.Response, stream_name: str, last_slice: Mapping[str, Any] = {}) -> bool:
# error pattern to check
code_pattern = "Generic Error"
code_patterns = ["Generic Error", "GenericError"]
msg_pattern = "Please try again later"
# prepare warning message
warning_msg = f"Generic Server Error occured for stream: `{stream_name}`. "
# For cases when there is a slice to go with, but server returns Generic Error - Please try again
# we reetry 2 times, then skipp the record and move on with warning message.
if response.json().get("code") == code_pattern and msg_pattern in response.json().get("msg"):
if response.json().get("code") in code_patterns and msg_pattern in response.json().get("msg"):
self.error_count += 1
setattr(self, "raise_on_http_errors", False)
if self.error_count > self.max_retry:

View File

@@ -38,3 +38,8 @@ def lists_stream():
# return the instance of the stream so we could make global tests on it,
# to cover the different `should_retry` logic
return Lists(authenticator=NoAuth())
@pytest.fixture(autouse=True)
def mock_sleep(mocker):
mocker.patch("time.sleep")

View File

@@ -215,3 +215,40 @@ def test_stream_stops_on_401(mock_lists_resp):
_ = list(users_stream.read_records(stream_slice=slice_, sync_mode=SyncMode.full_refresh))
assert len(responses.calls) == 1
assert slices > 1
@responses.activate
def test_listuser_stream_keep_working_on_500():
users_stream = ListUsers(authenticator=NoAuth())
responses.add(
responses.GET,
"https://api.iterable.com/api/lists",
json={"lists": [{"id": 1000}, {"id": 2000}]},
status=200
)
responses.add(
responses.GET,
"https://api.iterable.com/api/lists/getUsers?listId=1000",
json={
"msg": "An error occurred. Please try again later. If problem persists, please contact your CSM",
"code": "GenericError",
"params": None
},
status=500
)
responses.add(
responses.GET,
"https://api.iterable.com/api/lists/getUsers?listId=2000",
body="one@example.com\ntwo@example.com\nthree@example.com",
status=200
)
expected_records = [
{'email': 'one@example.com', 'listId': 2000},
{'email': 'two@example.com', 'listId': 2000},
{'email': 'three@example.com', 'listId': 2000}
]
records = []
for stream_slice in users_stream.stream_slices(sync_mode=SyncMode.full_refresh):
records += list(users_stream.read_records(stream_slice=stream_slice, sync_mode=SyncMode.full_refresh))
assert records == expected_records