Source Iterable: distinguish 401 from empty stream (#18537)
* #829 source iterable: distinguish 401 from empty stream * #829 source iterable: upd changelog * auto-bump connector version Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
@@ -547,7 +547,7 @@
|
||||
- name: Iterable
|
||||
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
|
||||
dockerRepository: airbyte/source-iterable
|
||||
dockerImageTag: 0.1.20
|
||||
dockerImageTag: 0.1.21
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/iterable
|
||||
icon: iterable.svg
|
||||
sourceType: api
|
||||
|
||||
@@ -5303,7 +5303,7 @@
|
||||
oauthFlowInitParameters: []
|
||||
oauthFlowOutputParameters:
|
||||
- - "access_token"
|
||||
- dockerImage: "airbyte/source-iterable:0.1.20"
|
||||
- dockerImage: "airbyte/source-iterable:0.1.21"
|
||||
spec:
|
||||
documentationUrl: "https://docs.airbyte.com/integrations/sources/iterable"
|
||||
connectionSpecification:
|
||||
|
||||
@@ -12,5 +12,5 @@ RUN pip install .
|
||||
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
|
||||
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
|
||||
|
||||
LABEL io.airbyte.version=0.1.20
|
||||
LABEL io.airbyte.version=0.1.21
|
||||
LABEL io.airbyte.name=airbyte/source-iterable
|
||||
|
||||
@@ -4,12 +4,14 @@
|
||||
|
||||
from typing import Any, List, Mapping, Tuple
|
||||
|
||||
import requests.exceptions
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources import AbstractSource
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
|
||||
|
||||
from .streams import (
|
||||
AccessCheck,
|
||||
Campaigns,
|
||||
CampaignsMetrics,
|
||||
Channels,
|
||||
@@ -75,6 +77,18 @@ class SourceIterable(AbstractSource):
|
||||
return False, f"Unable to connect to Iterable API with the provided credentials - {e}"
|
||||
|
||||
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
||||
def all_streams_accessible():
|
||||
access_check_stream = AccessCheck(authenticator=authenticator)
|
||||
slice_ = next(iter(access_check_stream.stream_slices(sync_mode=SyncMode.full_refresh)))
|
||||
try:
|
||||
list(access_check_stream.read_records(stream_slice=slice_, sync_mode=SyncMode.full_refresh))
|
||||
except requests.exceptions.RequestException as e:
|
||||
if e.response.status_code == requests.codes.UNAUTHORIZED:
|
||||
return False
|
||||
raise
|
||||
else:
|
||||
return True
|
||||
|
||||
authenticator = TokenAuthenticator(token=config["api_key"], auth_header="Api-Key", auth_method="")
|
||||
# end date is provided for integration tests only
|
||||
start_date, end_date = config["start_date"], config.get("end_date")
|
||||
@@ -95,13 +109,7 @@ class SourceIterable(AbstractSource):
|
||||
# A simple check is done - a read operation on a stream that can be accessed only via a Server side API key.
|
||||
# If read is successful - other streams should be supported as well.
|
||||
# More on this - https://support.iterable.com/hc/en-us/articles/360043464871-API-Keys-
|
||||
users_stream = ListUsers(authenticator=authenticator)
|
||||
for slice_ in users_stream.stream_slices(sync_mode=SyncMode.full_refresh):
|
||||
users = users_stream.read_records(stream_slice=slice_, sync_mode=SyncMode.full_refresh)
|
||||
# first slice is enough
|
||||
break
|
||||
|
||||
if next(users, None):
|
||||
if all_streams_accessible():
|
||||
streams.extend(
|
||||
[
|
||||
Users(authenticator=authenticator, **date_range),
|
||||
|
||||
@@ -668,3 +668,11 @@ class Templates(IterableExportStreamRanged):
|
||||
class Users(IterableExportStreamRanged):
|
||||
data_field = "user"
|
||||
cursor_field = "profileUpdatedAt"
|
||||
|
||||
|
||||
class AccessCheck(ListUsers):
|
||||
# since 401 error is failed silently in all the streams,
|
||||
# we need another class to distinguish an empty stream from 401 response
|
||||
def check_unauthorized_key(self, response: requests.Response) -> bool:
|
||||
# this allows not retrying 401 and raising the error upstream
|
||||
return response.status_code != codes.UNAUTHORIZED
|
||||
|
||||
@@ -11,9 +11,9 @@ from source_iterable.streams import Lists
|
||||
|
||||
|
||||
@responses.activate
|
||||
@pytest.mark.parametrize("body, status, expected_streams", (({}, 401, 7), ({"lists": [{"id": 1}]}, 200, 44)))
|
||||
@pytest.mark.parametrize("body, status, expected_streams", ((b"", 401, 7), (b"", 200, 44), (b"alpha@gmail.com\nbeta@gmail.com", 200, 44)))
|
||||
def test_source_streams(mock_lists_resp, config, body, status, expected_streams):
|
||||
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", json=body, status=status)
|
||||
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", body=body, status=status)
|
||||
streams = SourceIterable().streams(config=config)
|
||||
assert len(streams) == expected_streams
|
||||
|
||||
|
||||
@@ -92,6 +92,7 @@ The Iterable source connector supports the following [sync modes](https://docs.a
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------|
|
||||
| 0.1.21 | 2022-10-27 | [18537](https://github.com/airbytehq/airbyte/pull/18537) | Improve streams discovery |
|
||||
| 0.1.20 | 2022-10-21 | [18292](https://github.com/airbytehq/airbyte/pull/18292) | Better processing of 401 and 429 errors |
|
||||
| 0.1.19 | 2022-10-05 | [17602](https://github.com/airbytehq/airbyte/pull/17602) | Add check for stream permissions |
|
||||
| 0.1.18 | 2022-10-04 | [17573](https://github.com/airbytehq/airbyte/pull/17573) | Limit time range for SATs |
|
||||
|
||||
Reference in New Issue
Block a user