1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source: Iterable - improve 500 handling for Events stream (#26014)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>

---------

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Serhii Chvaliuk
2023-05-15 20:30:17 +03:00
committed by GitHub
parent d1e1f5d090
commit eff127ee20
16 changed files with 138 additions and 167 deletions

View File

@@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.27
LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.name=airbyte/source-iterable

View File

@@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerImageTag: 0.1.27
dockerImageTag: 0.1.28
dockerRepository: airbyte/source-iterable
githubIssueLabel: source-iterable
icon: iterable.svg

View File

@@ -12,7 +12,7 @@ MAIN_REQUIREMENTS = [
"requests~=2.25",
]
TEST_REQUIREMENTS = ["pytest~=6.1", "responses==0.13.3", "freezegun==1.1.0"]
TEST_REQUIREMENTS = ["pytest~=6.1", "responses==0.23.1", "freezegun==1.1.0"]
setup(

View File

@@ -9,6 +9,7 @@ from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from source_iterable.utils import read_full_refresh
from .streams import (
AccessCheck,
@@ -79,15 +80,13 @@ class SourceIterable(AbstractSource):
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def all_streams_accessible():
access_check_stream = AccessCheck(authenticator=authenticator)
slice_ = next(iter(access_check_stream.stream_slices(sync_mode=SyncMode.full_refresh)))
try:
list(access_check_stream.read_records(stream_slice=slice_, sync_mode=SyncMode.full_refresh))
next(read_full_refresh(access_check_stream), None)
except requests.exceptions.RequestException as e:
if e.response.status_code == requests.codes.UNAUTHORIZED:
return False
raise
else:
return True
return True
authenticator = TokenAuthenticator(token=config["api_key"], auth_header="Api-Key", auth_method="")
# end date is provided for integration tests only

View File

@@ -15,34 +15,30 @@ from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import package_name_from_class
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, UserDefinedBackoffException
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from pendulum.datetime import DateTime
from requests import codes
from requests import HTTPError, codes
from requests.exceptions import ChunkedEncodingError
from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice
from source_iterable.utils import IterableGenericErrorHandler, dateutil_parse
from source_iterable.utils import dateutil_parse
EVENT_ROWS_LIMIT = 200
CAMPAIGNS_PER_REQUEST = 20
class IterableStream(HttpStream, ABC):
raise_on_http_errors = True
# in case we get a 401 error (api token disabled or deleted) on a stream slice, do not make further requests within the current stream
# to prevent 429 error on other streams
ignore_further_slices = False
# to handle the Generic Errors (500 with msg pattern)
generic_error_handler: IterableGenericErrorHandler = IterableGenericErrorHandler()
url_base = "https://api.iterable.com/api/"
primary_key = "id"
def __init__(self, authenticator):
self._cred = authenticator
self._slice_retry = 0
super().__init__(authenticator)
# placeholder for last slice used for API request
# to reuse it later in logs or whatever
self._last_slice: Mapping[str, Any] = {}
@property
def retry_factor(self) -> int:
@@ -64,38 +60,46 @@ class IterableStream(HttpStream, ABC):
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None
def check_unauthorized_key(self, response: requests.Response) -> bool:
if response.status_code == codes.UNAUTHORIZED:
self.logger.warn(f"Provided API Key has not sufficient permissions to read from stream: {self.data_field}")
self.ignore_further_slices = True
setattr(self, "raise_on_http_errors", False)
return False
return True
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Iterable API does not support pagination
"""
return None
def check_unauthorized_key(self, response: requests.Response) -> bool:
if response.status_code == codes.UNAUTHORIZED:
self.logger.warning(f"Provided API Key has not sufficient permissions to read from stream: {self.data_field}")
return True
def check_generic_error(self, response: requests.Response) -> bool:
"""
https://github.com/airbytehq/oncall/issues/1592#issuecomment-1499109251
https://github.com/airbytehq/oncall/issues/1985
"""
codes = ["Generic Error", "GenericError"]
msg_pattern = "Please try again later"
if response.status_code == 500:
# I am not sure that all 500 errors return valid json
try:
response_json = json.loads(response.text)
except ValueError:
return
if response_json.get("code") in codes and msg_pattern in response_json.get("msg", ""):
return True
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
return []
response_json = response.json() or {}
records = response_json.get(self.data_field, [])
for record in records:
yield record
def should_retry(self, response: requests.Response) -> bool:
# check the authentication
if not self.check_unauthorized_key(response):
if self.check_generic_error(response):
self._slice_retry += 1
if self._slice_retry < 3:
return True
return False
# retry on generic error 500
if response.status_code == 500:
# will retry for 2 times, then give up and skip the fetch for slice
return self.generic_error_handler.handle(response, self.name, self._last_slice)
# all other cases
return super().should_retry(response)
def read_records(
@@ -105,11 +109,20 @@ class IterableStream(HttpStream, ABC):
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
self._slice_retry = 0
if self.ignore_further_slices:
return []
# save last slice
self._last_slice = stream_slice
yield from super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
return
try:
yield from super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
except (HTTPError, UserDefinedBackoffException, DefaultBackoffException) as e:
response = e.response
if self.check_unauthorized_key(response):
self.ignore_further_slices = True
return
if self.check_generic_error(response):
return
raise e
class IterableExportStream(IterableStream, ABC):
@@ -185,8 +198,6 @@ class IterableExportStream(IterableStream, ABC):
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
return []
for obj in response.iter_lines():
record = json.loads(obj)
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
@@ -329,7 +340,6 @@ class ListUsers(IterableStream):
name = "list_users"
# enable caching, because this stream used by other ones
use_cache = True
raise_on_http_errors = False
def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
return f"lists/{self.data_field}?listId={stream_slice['list_id']}"
@@ -340,14 +350,6 @@ class ListUsers(IterableStream):
yield {"list_id": list_record["id"]}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not response.ok:
if not self.check_unauthorized_key(response):
return []
# Avoid block whole of sync if a slice is broken. Skip current slice on 500 Internal Server Error.
# See on-call: https://github.com/airbytehq/oncall/issues/1592#issuecomment-1499109251
if response.status_code == codes.INTERNAL_SERVER_ERROR:
return []
response.raise_for_status()
list_id = self._get_list_id(response.url)
for user in response.iter_lines():
yield {"email": user.decode(), "listId": list_id}
@@ -406,8 +408,6 @@ class CampaignsMetrics(IterableStream):
yield {"campaign_ids": campaign_ids}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
return []
content = response.content.decode()
records = self._parse_csv_string_to_dict(content)
@@ -505,8 +505,6 @@ class Events(IterableStream):
Put common event fields at the top level.
Put the rest of the fields in the `data` subobject.
"""
if not self.check_unauthorized_key(response):
return []
jsonl_records = StringIO(response.text)
for record in jsonl_records:
record_dict = json.loads(record)
@@ -668,8 +666,6 @@ class Templates(IterableExportStreamRanged):
yield from super().read_records(stream_slice=stream_slice, **kwargs)
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
return []
response_json = response.json()
records = response_json.get(self.data_field, [])

View File

@@ -2,12 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Any, Mapping
import dateutil.parser
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
def dateutil_parse(text):
@@ -28,30 +26,8 @@ def dateutil_parse(text):
)
class IterableGenericErrorHandler:
logger = logging.getLogger("airbyte")
error_count = 0
max_retry = 2
def handle(self, response: requests.Response, stream_name: str, last_slice: Mapping[str, Any] = {}) -> bool:
# error pattern to check
code_patterns = ["Generic Error", "GenericError"]
msg_pattern = "Please try again later"
# prepare warning message
warning_msg = f"Generic Server Error occured for stream: `{stream_name}`. "
# For cases when there is a slice to go with, but server returns Generic Error - Please try again
# we reetry 2 times, then skipp the record and move on with warning message.
if response.json().get("code") in code_patterns and msg_pattern in response.json().get("msg"):
self.error_count += 1
setattr(self, "raise_on_http_errors", False)
if self.error_count > self.max_retry:
self.logger.warn(warning_msg + f"Skip fetching for slice {last_slice}.")
return False
else:
self.logger.warn(warning_msg + f"Retrying for slice {last_slice}, attempt {self.error_count}")
return True
else:
# All other cases
return True
def read_full_refresh(stream_instance: Stream):
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
for _slice in slices:
for record in stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh):
yield record

View File

@@ -6,6 +6,11 @@ import pytest
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
@pytest.fixture(autouse=True)
def disable_cache(mocker):
mocker.patch("source_iterable.streams.ListUsers.use_cache", False)
@pytest.fixture
def catalog(request):
return ConfiguredAirbyteCatalog(
@@ -32,12 +37,11 @@ def mock_lists_resp(mocker):
@pytest.fixture(name="lists_stream")
def lists_stream():
# local imports
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_iterable.streams import Lists
# return the instance of the stream so we could make global tests on it,
# to cover the different `should_retry` logic
return Lists(authenticator=NoAuth())
return Lists(authenticator=None)
@pytest.fixture(autouse=True)

View File

@@ -114,4 +114,4 @@ def test_email_stream_chunked_encoding(mocker, mock_lists_resp, catalog, days_du
assert sum(ranges) == days_duration
assert len(ranges) == len(records)
# since read is called on source instance, under the hood .streams() is called which triggers one more http call
assert len(responses.calls) == 3 * len(ranges)
assert len(responses.calls) == 3 * len(ranges) + 1

View File

@@ -9,7 +9,6 @@ import pendulum
import pytest
import responses
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_iterable.slice_generators import StreamSlice
from source_iterable.streams import Users
@@ -26,7 +25,7 @@ def session_mock():
def test_send_email_stream(session_mock):
stream = Users(start_date="2020", authenticator=NoAuth())
stream = Users(start_date="2020", authenticator=None)
stream_slice = StreamSlice(start_date=pendulum.parse("2020"), end_date=pendulum.parse("2021"))
_ = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=stream_slice, stream_state={}))
@@ -42,6 +41,6 @@ def test_stream_correct():
NUMBER_OF_RECORDS = 10**2
resp_body = "\n".join([json.dumps(record_js)] * NUMBER_OF_RECORDS)
responses.add("GET", "https://api.iterable.com/api/export/data.json", body=resp_body)
stream = Users(start_date="2020", authenticator=NoAuth())
stream = Users(start_date="2020", authenticator=None)
records = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=stream_slice, stream_state={}))
assert len(records) == NUMBER_OF_RECORDS

View File

@@ -11,7 +11,7 @@ from source_iterable.streams import Lists
@responses.activate
@pytest.mark.parametrize("body, status, expected_streams", ((b"", 401, 44), (b"", 200, 44), (b"alpha@gmail.com\nbeta@gmail.com", 200, 44)))
@pytest.mark.parametrize("body, status, expected_streams", ((b"", 401, 7), (b"alpha@gmail.com\nbeta@gmail.com", 200, 44)))
def test_source_streams(mock_lists_resp, config, body, status, expected_streams):
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", body=body, status=status)
streams = SourceIterable().streams(config=config)

View File

@@ -7,7 +7,6 @@ import json
import pytest
import requests
import responses
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_iterable.streams import Events
@@ -193,7 +192,7 @@ def test_events_parse_response(response_objects, expected_objects, jsonl_body):
response_body = json.dumps(response_objects)
responses.add(responses.GET, "https://example.com", body=response_body)
response = requests.get("https://example.com")
stream = Events(authenticator=NoAuth())
stream = Events(authenticator=None)
if jsonl_body:
records = [record for record in stream.parse_response(response)]

View File

@@ -2,12 +2,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
import pendulum
import pytest
import requests
import responses
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_iterable.streams import (
Campaigns,
CampaignsMetrics,
@@ -20,7 +21,7 @@ from source_iterable.streams import (
Templates,
Users,
)
from source_iterable.utils import dateutil_parse
from source_iterable.utils import dateutil_parse, read_full_refresh
@pytest.mark.parametrize(
@@ -38,7 +39,7 @@ from source_iterable.utils import dateutil_parse
],
)
def test_path(config, stream, date, slice, expected_path):
args = {"authenticator": NoAuth()}
args = {"authenticator": None}
if date:
args["start_date"] = "2019-10-10T00:00:00"
@@ -64,19 +65,19 @@ def test_list_users_get_list_id(url, id):
def test_campaigns_metrics_request_params():
stream = CampaignsMetrics(authenticator=NoAuth(), start_date="2019-10-10T00:00:00")
stream = CampaignsMetrics(authenticator=None, start_date="2019-10-10T00:00:00")
params = stream.request_params(stream_slice={"campaign_ids": "c101"}, stream_state=None)
assert params == {"campaignId": "c101", "startDateTime": "2019-10-10T00:00:00"}
def test_events_request_params():
stream = Events(authenticator=NoAuth())
stream = Events(authenticator=None)
params = stream.request_params(stream_slice={"email": "a@a.a"}, stream_state=None)
assert params == {"email": "a@a.a", "includeCustomEvents": "true"}
def test_templates_parse_response():
stream = Templates(authenticator=NoAuth(), start_date="2019-10-10T00:00:00")
stream = Templates(authenticator=None, start_date="2019-10-10T00:00:00")
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
@@ -93,7 +94,7 @@ def test_templates_parse_response():
def test_list_users_parse_response():
stream = ListUsers(authenticator=NoAuth())
stream = ListUsers(authenticator=None)
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
@@ -111,7 +112,7 @@ def test_list_users_parse_response():
def test_campaigns_metrics_parse_response():
stream = CampaignsMetrics(authenticator=NoAuth(), start_date="2019-10-10T00:00:00")
stream = CampaignsMetrics(authenticator=None, start_date="2019-10-10T00:00:00")
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
@@ -134,7 +135,7 @@ def test_campaigns_metrics_parse_response():
def test_iterable_stream_parse_response():
stream = Lists(authenticator=NoAuth())
stream = Lists(authenticator=None)
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
@@ -151,42 +152,15 @@ def test_iterable_stream_parse_response():
def test_iterable_stream_backoff_time():
stream = Lists(authenticator=NoAuth())
stream = Lists(authenticator=None)
assert stream.backoff_time(response=None) is None
def test_iterable_export_stream_backoff_time():
stream = Users(authenticator=NoAuth(), start_date="2019-10-10T00:00:00")
stream = Users(authenticator=None, start_date="2019-10-10T00:00:00")
assert stream.backoff_time(response=None) is None
@pytest.mark.parametrize(
"status, json, expected",
[
(429, {}, True),
# for 500 - Generic error we should make 2 retry attempts
# and give up on third one!
(500, {"msg": "...Please try again later...1", "code": "Generic Error"}, True),
(500, {"msg": "...Please try again later...2", "code": "Generic Error"}, True),
# This one should return False
(500, {"msg": "...Please try again later...3", "code": "Generic Error"}, False)
],
ids=[
"Retry on 429",
"Retry on 500 - Generic (first)",
"Retry on 500 - Generic (second)",
"Retry on 500 - Generic (third)",
]
)
def test_should_retry(status, json, expected, requests_mock, lists_stream):
stream = lists_stream
url = f"{stream.url_base}/{stream.path()}"
requests_mock.get(url, json=json, status_code=status)
test_response = requests.get(url)
result = stream.should_retry(test_response)
assert result is expected
@pytest.mark.parametrize(
"current_state,record_date,expected_state",
[
@@ -196,7 +170,7 @@ def test_should_retry(status, json, expected, requests_mock, lists_stream):
],
)
def test_get_updated_state(current_state, record_date, expected_state):
stream = Users(authenticator=NoAuth(), start_date="2019-10-10T00:00:00")
stream = Users(authenticator=None, start_date="2019-10-10T00:00:00")
state = stream.get_updated_state(
current_stream_state=current_state,
latest_record={"profileUpdatedAt": pendulum.parse(record_date)},
@@ -207,8 +181,8 @@ def test_get_updated_state(current_state, record_date, expected_state):
@responses.activate
def test_stream_stops_on_401(mock_lists_resp):
# no requests should be made after getting 401 error despite the multiple slices
users_stream = ListUsers(authenticator=NoAuth())
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=2", json={}, status=401)
users_stream = ListUsers(authenticator=None)
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", json={}, status=401)
slices = 0
for slice_ in users_stream.stream_slices(sync_mode=SyncMode.full_refresh):
slices += 1
@@ -219,36 +193,59 @@ def test_stream_stops_on_401(mock_lists_resp):
@responses.activate
def test_listuser_stream_keep_working_on_500():
users_stream = ListUsers(authenticator=NoAuth())
responses.add(
responses.GET,
"https://api.iterable.com/api/lists",
json={"lists": [{"id": 1000}, {"id": 2000}]},
status=200
)
responses.add(
responses.GET,
"https://api.iterable.com/api/lists/getUsers?listId=1000",
json={
"msg": "An error occurred. Please try again later. If problem persists, please contact your CSM",
"code": "GenericError",
"params": None
},
status=500
)
responses.add(
responses.GET,
"https://api.iterable.com/api/lists/getUsers?listId=2000",
body="one@example.com\ntwo@example.com\nthree@example.com",
status=200
)
users_stream = ListUsers(authenticator=None)
msg_error = "An error occurred. Please try again later. If problem persists, please contact your CSM"
generic_error1 = {"msg": msg_error, "code": "GenericError"}
generic_error2 = {"msg": msg_error, "code": "Generic Error"}
responses.get("https://api.iterable.com/api/lists", json={"lists": [{"id": 1000}, {"id": 2000}, {"id": 3000}]})
responses.get("https://api.iterable.com/api/lists/getUsers?listId=1000", json=generic_error1, status=500)
responses.get("https://api.iterable.com/api/lists/getUsers?listId=2000", body="one@d1.com\ntwo@d1.com\nthree@d1.com")
responses.get("https://api.iterable.com/api/lists/getUsers?listId=3000", json=generic_error2, status=500)
responses.get("https://api.iterable.com/api/lists/getUsers?listId=3000", body="one@d2.com\ntwo@d2.com\nthree@d2.com")
expected_records = [
{'email': 'one@example.com', 'listId': 2000},
{'email': 'two@example.com', 'listId': 2000},
{'email': 'three@example.com', 'listId': 2000}
{'email': 'one@d1.com', 'listId': 2000},
{'email': 'two@d1.com', 'listId': 2000},
{'email': 'three@d1.com', 'listId': 2000},
{'email': 'one@d2.com', 'listId': 3000},
{'email': 'two@d2.com', 'listId': 3000},
{'email': 'three@d2.com', 'listId': 3000},
]
records = []
for stream_slice in users_stream.stream_slices(sync_mode=SyncMode.full_refresh):
records += list(users_stream.read_records(stream_slice=stream_slice, sync_mode=SyncMode.full_refresh))
records = list(read_full_refresh(users_stream))
assert records == expected_records
@responses.activate
def test_events_read_full_refresh():
stream = Events(authenticator=None)
responses.get("https://api.iterable.com/api/lists", json={"lists": [{"id": 1}]})
responses.get("https://api.iterable.com/api/lists/getUsers?listId=1", body='user1\nuser2\nuser3\nuser4\nuser5\nuser6')
def get_body(emails):
return "\n".join([json.dumps({"email": email}) for email in emails]) + "\n"
msg_error = "An error occurred. Please try again later. If problem persists, please contact your CSM"
generic_error1 = {"msg": msg_error, "code": "GenericError"}
generic_error2 = {"msg": msg_error, "code": "Generic Error"}
responses.get("https://api.iterable.com/api/export/userEvents?email=user1&includeCustomEvents=true", body=get_body(["user1"]))
responses.get("https://api.iterable.com/api/export/userEvents?email=user2&includeCustomEvents=true", json=generic_error1, status=500)
responses.get("https://api.iterable.com/api/export/userEvents?email=user2&includeCustomEvents=true", body=get_body(["user2"]))
responses.get("https://api.iterable.com/api/export/userEvents?email=user3&includeCustomEvents=true", body=get_body(["user3"]))
responses.get("https://api.iterable.com/api/export/userEvents?email=user4&includeCustomEvents=true", json=generic_error1, status=500)
responses.get("https://api.iterable.com/api/export/userEvents?email=user5&includeCustomEvents=true", json=generic_error2, status=500)
responses.get("https://api.iterable.com/api/export/userEvents?email=user5&includeCustomEvents=true", json=generic_error2, status=500)
responses.get("https://api.iterable.com/api/export/userEvents?email=user5&includeCustomEvents=true", body=get_body(["user5"]))
m = responses.get("https://api.iterable.com/api/export/userEvents?email=user6&includeCustomEvents=true", json=generic_error2, status=500)
records = list(read_full_refresh(stream))
assert [r["email"] for r in records] == ['user1', 'user2', 'user3', 'user5']
assert m.call_count == 3