1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🐛 Source Intercom: refactored slow streams to use /search endpoint (#14403)

This commit is contained in:
Baz
2022-07-06 15:22:05 +03:00
committed by GitHub
parent 10010a442f
commit cae55cc1a5
13 changed files with 513 additions and 140 deletions

View File

@@ -430,7 +430,7 @@
- name: Intercom
sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerRepository: airbyte/source-intercom
dockerImageTag: 0.1.20
dockerImageTag: 0.1.21
documentationUrl: https://docs.airbyte.io/integrations/sources/intercom
icon: intercom.svg
sourceType: api

View File

@@ -3918,7 +3918,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-intercom:0.1.20"
- dockerImage: "airbyte/source-intercom:0.1.21"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/intercom"
connectionSpecification:

View File

@@ -35,5 +35,5 @@ COPY source_intercom ./source_intercom
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.version=0.1.21
LABEL io.airbyte.name=airbyte/source-intercom

View File

@@ -16,11 +16,11 @@ tests:
configured_catalog_path: "integration_tests/configured_catalog.json"
expect_records:
path: "integration_tests/expected_records.txt"
exact_order: yes
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
threshold_days: 365
cursor_paths:
companies: ["updated_at"]
company_segments: ["updated_at"]

View File

@@ -3,13 +3,19 @@
"updated_at": 7626086649
},
"company_segments": {
"updated_at": 7626086649
"updated_at": 7626086649,
"companies": {
"updated_at": 7626086649
}
},
"conversations": {
"updated_at": 7626086649
},
"conversation_parts": {
"updated_at": 7626086649
"updated_at": 7626086649,
"conversations": {
"updated_at": 7626086649
}
},
"contacts": {
"updated_at": 7626086649

View File

@@ -2,9 +2,11 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import time
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Mapping
from unittest.mock import patch
@@ -109,9 +111,10 @@ def test_companies_scroll(stream_attributes):
@patch("source_intercom.source.Companies.can_use_scroll", lambda *args: False)
def test_switch_to_standard_endpoint(stream_attributes):
authenticator = VersionApiAuthenticator(token=stream_attributes["access_token"])
start_date = datetime.strptime(stream_attributes["start_date"], "%Y-%m-%dT%H:%M:%SZ").timestamp()
stream1 = Companies(authenticator=authenticator)
stream2 = Companies(authenticator=authenticator)
stream3 = ConversationParts(authenticator=authenticator)
stream3 = ConversationParts(authenticator=authenticator, start_date=start_date)
# read the first stream and stop
for slice in stream1.stream_slices(sync_mode=SyncMode.full_refresh):

View File

@@ -3,13 +3,19 @@
"updated_at": 1626086649
},
"company_segments": {
"updated_at": 1626086649
"updated_at": 1648469610,
"companies": {
"updated_at": 1625749675
}
},
"conversations": {
"updated_at": 1626086649
"updated_at": 1632835061
},
"conversation_parts": {
"updated_at": 1626086649
"updated_at": 1632835061,
"conversations": {
"updated_at": 1632835061
}
},
"contacts": {
"updated_at": 1626086649
@@ -17,4 +23,4 @@
"segments": {
"updated_at": 1626086649
}
}
}

View File

@@ -9,29 +9,29 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib.parse import parse_qsl, urljoin, urlparse
import requests
import vcr
import vcr.cassette as Cassette
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from requests.auth import AuthBase
from .utils import EagerlyCachedStreamState as stream_state_cache
from .utils import IntercomRateLimiter as limiter
class IntercomStream(HttpStream, ABC):
url_base = "https://api.intercom.io/"
primary_key = "id"
data_fields = ["data"]
# https://developers.intercom.com/intercom-api-reference/reference/pagination-cursor
page_size = 150 # max available
def __init__(
self,
authenticator: AuthBase,
start_date: str = None,
**kwargs,
):
def __init__(self, authenticator: AuthBase, start_date: str = None, **kwargs):
self.start_date = start_date
super().__init__(authenticator=authenticator)
@property
@@ -44,19 +44,20 @@ class IntercomStream(HttpStream, ABC):
return self._session.auth
return super().authenticator
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
def next_page_token(self, response: requests.Response, **kwargs) -> Optional[Mapping[str, Any]]:
"""
Abstract method of HttpStream - should be overwritten.
Returning None means there are no more pages to read in response.
"""
next_page = response.json().get("pages", {}).get("next")
if next_page:
if isinstance(next_page, dict):
return next_page
return dict(parse_qsl(urlparse(next_page).query))
def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = {}
params = {"per_page": self.page_size}
if next_page_token:
params.update(**next_page_token)
return params
@@ -73,8 +74,8 @@ class IntercomStream(HttpStream, ABC):
self.logger.error(f"Stream {self.name}: {e.response.status_code} " f"{e.response.reason} - {error_message}")
raise e
@limiter.balance_rate_limit()
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
data = response.json()
for data_field in self.data_fields:
@@ -103,16 +104,14 @@ class IncrementalIntercomStream(IntercomStream, ABC):
updated_at field in most cases, so we used that as incremental filtering
during the slicing.
"""
if not stream_state or record[self.cursor_field] > stream_state.get(self.cursor_field):
if not stream_state or record[self.cursor_field] >= stream_state.get(self.cursor_field):
yield record
else:
self.has_old_records = True
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
record = super().parse_response(response, stream_state, **kwargs)
for record in record:
records = super().parse_response(response, stream_state, **kwargs)
for record in records:
yield from self.filter_by_state(stream_state=stream_state, record=record)
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]:
@@ -122,25 +121,111 @@ class IncrementalIntercomStream(IntercomStream, ABC):
we then return an updated state object. If this is the first time we
run a sync or no state was passed, current_stream_state will be None.
"""
current_stream_state = current_stream_state or {}
current_stream_state_date = current_stream_state.get(self.cursor_field, self.start_date)
latest_record_date = latest_record.get(self.cursor_field, self.start_date)
return {self.cursor_field: max(current_stream_state_date, latest_record_date)}
class ChildStreamMixin:
parent_stream_class: Optional[IntercomStream] = None
class IncrementalIntercomSearchStream(IncrementalIntercomStream):
http_method = "POST"
sort_order = "ascending"
use_cache = True
def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
parent_stream = self.parent_stream_class(authenticator=self.authenticator, start_date=self.start_date)
for slice in parent_stream.stream_slices(sync_mode=sync_mode):
for item in self.parent_stream_class(
authenticator=self.authenticator, start_date=self.start_date, stream_slice=slice
).read_records(sync_mode=sync_mode):
yield {"id": item["id"]}
def request_cache(self) -> Cassette:
"""
Override the default `request_cache` method, due to `match_on` is different for POST requests.
We should check additional criteria like ['query', 'body'] instead of default ['uri', 'method']
"""
match_on = ["uri", "query", "method", "body"]
cassette = vcr.use_cassette(self.cache_filename, record_mode="new_episodes", serializer="yaml", match_on=match_on)
return cassette
@stream_state_cache.cache_stream_state
def request_params(self, **kwargs) -> MutableMapping[str, Any]:
"""
Override to return None, since we don't need to pass any params along with query.
But we need to cache the state object to re-use the parrent state for certain streams.
"""
return None
def request_body_json(self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs) -> Optional[Mapping]:
"""
https://developers.intercom.com/intercom-api-reference/reference/pagination-search
"""
payload = {
"query": {
"operator": "OR",
"value": [
{
"field": self.cursor_field,
"operator": ">",
"value": stream_state.get(self.cursor_field, self.start_date),
},
{
"field": self.cursor_field,
"operator": "=",
"value": stream_state.get(self.cursor_field, self.start_date),
},
],
},
"sort": {"field": self.cursor_field, "order": self.sort_order},
"pagination": {"per_page": self.page_size},
}
if next_page_token:
next_page_token.update(**{"per_page": self.page_size})
payload.update({"pagination": next_page_token})
return payload
class ChildStreamMixin(IncrementalIntercomStream):
parent_stream_class: Optional[IntercomStream] = None
slice_key: str = "id"
record_key: str = "id"
@property
def parent_stream(self) -> object:
"""
Returns the instance of parent stream, if the child stream has a `parent_stream_class` dependency.
"""
return self.parent_stream_class(authenticator=self.authenticator, start_date=self.start_date) if self.parent_stream_class else None
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""UPDATING THE STATE OBJECT:
Returns:
{
{...},
"child_stream_name": {
"cursor_field": 1632835061,
"parent_stream_name": {
"cursor_field": 1632835061
}
},
{...},
}
"""
updated_state = super().get_updated_state(current_stream_state, latest_record)
# add parent_stream_state to `updated_state`
updated_state[self.parent_stream.name] = stream_state_cache.cached_state.get(self.parent_stream.name)
return updated_state
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
"""
Returns the stream slices, which correspond to conversation IDs. Uses the `Conversations` stream
to get conversations by `sync_mode` and `state`. Unlike `ChildStreamMixin`, it gets slices based
on the `sync_mode`, so that it does not get all conversations at all times. Since we can't do
`filter_by_state` inside `parse_records`, we need to make sure we get the right conversations only.
Otherwise, this stream would always return all conversation_parts.
"""
# reading parent nested stream_state from child stream state
parent_stream_state = stream_state.get(self.parent_stream.name) if stream_state else {}
for record in self.parent_stream.read_records(stream_state=parent_stream_state, **kwargs):
# updating the `stream_state` with the state of it's parent stream
# to have the child stream sync independently from the parent stream
stream_state_cache.cached_state[self.parent_stream.name] = self.parent_stream.get_updated_state({}, record)
yield {self.slice_key: record[self.record_key]}
class Admins(IntercomStream):
@@ -171,6 +256,8 @@ class Companies(IncrementalIntercomStream):
3) Switch to using of the "standard" endpoint.
"""
page_size = 50 # default is 15
class EndpointType(Enum):
scroll = "companies/scroll"
standard = "companies"
@@ -267,7 +354,7 @@ class Companies(IncrementalIntercomStream):
yield from super().parse_response(response, stream_state=stream_state, **kwargs)
class CompanySegments(ChildStreamMixin, IncrementalIntercomStream):
class CompanySegments(ChildStreamMixin):
"""Return list of all company segments.
API Docs: https://developers.intercom.com/intercom-api-reference/reference#list-attached-segments-1
Endpoint: https://api.intercom.io/companies/<id>/segments
@@ -276,68 +363,37 @@ class CompanySegments(ChildStreamMixin, IncrementalIntercomStream):
parent_stream_class = Companies
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"/companies/{stream_slice['id']}/segments"
return f"/companies/{stream_slice[self.slice_key]}/segments"
class Conversations(IncrementalIntercomStream):
"""Return list of all conversations.
API Docs: https://developers.intercom.com/intercom-api-reference/reference#list-conversations
Endpoint: https://api.intercom.io/conversations
class Conversations(IncrementalIntercomSearchStream):
"""Return list of all conversations using search endpoint to provide incremental fetch.
API Docs:
https://developers.intercom.com/intercom-api-reference/reference#list-conversations
https://developers.intercom.com/intercom-api-reference/reference/pagination-search
Endpoint:
https://api.intercom.io/conversations
Search Endpoint:
https://api.intercom.io/conversations/search
"""
use_cache = True
data_fields = ["conversations"]
def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(next_page_token, **kwargs)
params.update({"order": "desc", "sort": self.cursor_field})
return params
# We're sorting by desc. Once we hit the first page with an out-of-date result we can stop.
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self.has_old_records:
return None
return super().next_page_token(response)
def path(self, **kwargs) -> str:
return "conversations"
return "conversations/search"
class ConversationParts(IncrementalIntercomStream):
class ConversationParts(ChildStreamMixin):
"""Return list of all conversation parts.
API Docs: https://developers.intercom.com/intercom-api-reference/reference#retrieve-a-conversation
Endpoint: https://api.intercom.io/conversations/<id>
"""
parent_stream_class = Conversations
data_fields = ["conversation_parts", "conversation_parts"]
def __init__(self, authenticator: AuthBase, start_date: str = None, **kwargs):
super().__init__(authenticator, start_date, **kwargs)
self.conversations_stream = Conversations(authenticator, start_date, **kwargs)
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"/conversations/{stream_slice['id']}"
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
"""
Returns the stream slices, which correspond to conversation IDs. Uses the `Conversations` stream
to get conversations by `sync_mode` and `state`. Unlike `ChildStreamMixin`, it gets slices based
on the `sync_mode`, so that it does not get all conversations at all times. Since we can't do
`filter_by_state` inside `parse_records`, we need to make sure we get the right conversations only.
Otherwise, this stream would always return all conversation_parts.
"""
parent_stream_slices = self.conversations_stream.stream_slices(
sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state
)
for stream_slice in parent_stream_slices:
conversations = self.conversations_stream.read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
)
for conversation in conversations:
yield {"id": conversation["id"]}
return f"/conversations/{stream_slice[self.slice_key]}"
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
"""
@@ -347,7 +403,7 @@ class ConversationParts(IncrementalIntercomStream):
`filter_by_state` logic could potentially end up in data loss.
"""
records = super().parse_response(response=response, stream_state={}, **kwargs)
conversation_id = response.json().get("id")
conversation_id = response.json().get(self.record_key)
for conversation_part in records:
conversation_part.setdefault("conversation_id", conversation_id)
yield conversation_part
@@ -365,28 +421,17 @@ class Segments(IncrementalIntercomStream):
return "segments"
class Contacts(IncrementalIntercomStream):
class Contacts(IncrementalIntercomSearchStream):
"""Return list of all contacts.
API Docs: https://developers.intercom.com/intercom-api-reference/reference#list-contacts
Endpoint: https://api.intercom.io/contacts
API Docs:
https://developers.intercom.com/intercom-api-reference/reference#list-contacts
https://developers.intercom.com/intercom-api-reference/reference/pagination-search
Endpoint:
https://api.intercom.io/contacts
"""
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Abstract method of HttpStream - should be overwritten.
Returning None means there are no more pages to read in response.
"""
next_page = response.json().get("pages", {}).get("next")
if isinstance(next_page, dict):
return {"starting_after": next_page["starting_after"]}
if isinstance(next_page, str):
return super().next_page_token(response)
def path(self, **kwargs) -> str:
return "contacts"
return "contacts/search"
class DataAttributes(IntercomStream):
@@ -444,7 +489,7 @@ class Teams(IntercomStream):
class VersionApiAuthenticator(TokenAuthenticator):
"""Intercom API support its dynamic versions' switching.
But this connector should support only one for any resource account and
it is realised by the additional request header 'Intercom-Version'
it is released by the additional request header 'Intercom-Version'
Docs: https://developers.intercom.com/building-apps/docs/update-your-api-version#section-selecting-the-version-via-the-developer-hub
"""

View File

@@ -0,0 +1,156 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from functools import wraps
from time import sleep
from typing import Dict
import requests
class IntercomRateLimiter:
"""
Define timings for RateLimits. Adjust timings if needed.
:: on_unknown_load = 1.0 sec - Intercom recommended time to hold between each API call.
:: on_low_load = 0.2 sec (200 miliseconds) - ideal ratio between hold time and api call, also the standard hold time between each API call.
:: on_mid_load = 1.5 sec - great timing to retrieve another 15% of request capacity while having mid_load.
:: on_high_load = 5.0 sec - ideally we should wait 2.0 sec while having high_load, but we hold 5 sec to retrieve up to 80% of request capacity.
"""
on_unknown_load: float = 1.0
on_low_load: float = 0.1
on_mid_load: float = 1.5
on_high_load: float = 10.0
threshold: float = 0.1
@staticmethod
def get_wait_time(
*args,
threshold: float = threshold,
rate_limit_header: str = "X-RateLimit-Limit",
rate_limit_remain_header: str = "X-RateLimit-Remaining",
):
"""
To avoid reaching Intercom API Rate Limits, use the 'X-RateLimit-Limit','X-RateLimit-Remaining' header values,
to determine the current rate limits and load and handle wait_time based on load %.
Recomended wait_time between each request is 1 sec, we would handle this dynamicaly.
:: threshold - is the % cutoff for the rate_limits % load, if this cutoff is crossed,
the connector waits `sleep_on_high_load` amount of time, default value = 0.1 (10% left from max capacity)
:: wait_time - time between each request = 200 miliseconds
:: rate_limit_header - responce header item, contains information with max rate_limits available (max)
:: rate_limit_remain_header - responce header item, contains information with how many requests are still available (current)
Header example:
{
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 51
X-RateLimit-Reset: 1487332510
},
where: 51 - requests remains and goes down, 100 - max requests capacity.
More information: https://developers.intercom.com/intercom-api-reference/reference/rate-limiting
"""
# find the requests.Response inside args list
for arg in args:
if isinstance(arg, requests.models.Response):
headers = arg.headers or {}
# Get the rate_limits from response
max_rate_limit = int(headers.get(rate_limit_header, 0)) if headers else None
current_rate = int(headers.get(rate_limit_remain_header, 0)) if headers else None
# define current load and mid_load from rate_limits
if current_rate and max_rate_limit:
mid_load = (max_rate_limit / 2) / max_rate_limit
load = current_rate / max_rate_limit
else:
# to guarantee mid_load value is 0.5 if headers are not available
mid_load = threshold * 10
load = None
# define wait_time based on load conditions
if not load:
# when there is no rate_limits from header, use the `sleep_on_unknown_load`
wait_time = IntercomRateLimiter.on_unknown_load
elif load <= threshold:
wait_time = IntercomRateLimiter.on_high_load
elif load <= mid_load:
wait_time = IntercomRateLimiter.on_mid_load
elif load > mid_load:
wait_time = IntercomRateLimiter.on_low_load
return wait_time
@staticmethod
def wait_time(wait_time: float):
return sleep(wait_time)
def balance_rate_limit(
threshold: float = threshold,
rate_limit_header: str = "X-RateLimit-Limit",
rate_limit_remain_header: str = "X-RateLimit-Remaining",
):
"""
The decorator function.
Adjust `threshold`,`rate_limit_header`,`rate_limit_remain_header` if needed.
"""
def decorator(func):
@wraps(func)
def wrapper_balance_rate_limit(*args, **kwargs):
IntercomRateLimiter.wait_time(
IntercomRateLimiter.get_wait_time(
*args, threshold=threshold, rate_limit_header=rate_limit_header, rate_limit_remain_header=rate_limit_remain_header
)
)
return func(*args, **kwargs)
return wrapper_balance_rate_limit
return decorator
class EagerlyCachedStreamState:
"""
This is the placeholder for the tmp stream state for each incremental stream,
It's empty, once the sync has started and is being updated while sync operation takes place,
It holds the `temporary stream state values` before they are updated to have the opportunity to reuse this state.
"""
cached_state: Dict = {}
@staticmethod
def stream_state_to_tmp(*args, state_object: Dict = cached_state, **kwargs) -> Dict:
"""
Method to save the current stream state for future re-use within slicing.
The method requires having the temporary `state_object` as placeholder.
Because of the specific of Intercom entities relations, we have the opportunity to fetch the updates,
for particular stream using the `Incremental Refresh`, inside slicing.
For example:
if `Conversation Parts` stream records were updated, then the `Conversations` is updated as well
"""
# Map the input *args, the sequece should be always keeped up to the input function
# change the mapping if needed
stream: object = args[0] # the self instance of the stream
current_stream_state: Dict = kwargs["stream_state"] or {}
# get the current tmp_state_value
tmp_stream_state_value = state_object.get(stream.name, {}).get(stream.cursor_field, "")
# Save the curent stream value for current sync, if present.
if current_stream_state:
state_object[stream.name] = {stream.cursor_field: current_stream_state.get(stream.cursor_field, "")}
# Check if we have the saved state and keep the minimun value
if tmp_stream_state_value:
state_object[stream.name] = {
stream.cursor_field: min(current_stream_state.get(stream.cursor_field, ""), tmp_stream_state_value)
}
return state_object
def cache_stream_state(func):
@wraps(func)
def decorator(*args, **kwargs):
EagerlyCachedStreamState.stream_state_to_tmp(*args, **kwargs)
return func(*args, **kwargs)
return decorator

View File

@@ -0,0 +1,81 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import pytest
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_intercom.source import ConversationParts, Conversations
from source_intercom.utils import EagerlyCachedStreamState as stream_state_cache
# Define the Stream instances for the tests
INTERCOM_STREAM = Conversations(authenticator=NoAuth(), start_date=0)
INTERCOM_SUB_STREAM = ConversationParts(authenticator=NoAuth(), start_date=0)
@pytest.mark.parametrize(
"stream, cur_stream_state, state_object, expected_output",
[
# When Full-Refresh: state_object: empty.
(INTERCOM_STREAM, {INTERCOM_STREAM.cursor_field: ""}, {}, {INTERCOM_STREAM.name: {INTERCOM_STREAM.cursor_field: ""}}),
(
INTERCOM_SUB_STREAM,
{INTERCOM_SUB_STREAM.cursor_field: ""},
{},
{INTERCOM_SUB_STREAM.name: {INTERCOM_SUB_STREAM.cursor_field: ""}},
),
],
ids=["Sync Started. Parent.", "Sync Started. Child."],
)
def test_full_refresh(stream, cur_stream_state, state_object, expected_output):
"""
When Sync = Full-Refresh: we don't have any state yet, so we need to keep the state_object at min value, thus empty.
"""
# create the fixure for *args based on input
args = [stream]
# use the external tmp_state_object for this test
actual = stream_state_cache.stream_state_to_tmp(*args, state_object=state_object, stream_state=cur_stream_state)
assert actual == expected_output
@pytest.mark.parametrize(
"stream, cur_stream_state, state_object, expected_output",
[
# When start the incremental refresh, assuming we have the state of STREAM.
(
INTERCOM_STREAM,
{INTERCOM_STREAM.cursor_field: "2021-01-01T01-01-01"},
{},
{INTERCOM_STREAM.name: {INTERCOM_STREAM.cursor_field: "2021-01-01T01-01-01"}},
),
(
INTERCOM_SUB_STREAM,
{INTERCOM_SUB_STREAM.cursor_field: "2021-01-01T01-01-01"},
{},
{INTERCOM_SUB_STREAM.name: {INTERCOM_SUB_STREAM.cursor_field: "2021-01-01T01-01-01"}},
),
# While doing the incremental refresh, we keeping the original state, even if the state is updated during the sync.
(
INTERCOM_STREAM,
{INTERCOM_STREAM.cursor_field: "2021-01-05T02-02-02"},
{},
{INTERCOM_STREAM.name: {INTERCOM_STREAM.cursor_field: "2021-01-05T02-02-02"}},
),
(
INTERCOM_SUB_STREAM,
{INTERCOM_SUB_STREAM.cursor_field: "2021-01-05T02-02-02"},
{},
{INTERCOM_SUB_STREAM.name: {INTERCOM_SUB_STREAM.cursor_field: "2021-01-05T02-02-02"}},
),
],
ids=["Sync Started. Parent", "Sync Started. Child", "Sync in progress. Parent", "Sync in progress. Child"],
)
def test_incremental_sync(stream, cur_stream_state, state_object, expected_output):
"""
When Sync = Incremental Refresh: we already have the saved state from Full-Refresh sync,
we have it passed as input to the Incremental Sync, so we need to back it up and reuse.
"""
# create the fixure for *args based on input
args = [stream]
actual = stream_state_cache.stream_state_to_tmp(*args, state_object=state_object, stream_state=cur_stream_state)
assert actual == expected_output

View File

@@ -0,0 +1,44 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import pytest
import requests
from source_intercom.utils import IntercomRateLimiter as limiter
TEST_DATA_FIELD = "some_data_field"
TEST_RATE_LIMIT_HEADER = "X-RateLimit-Limit"
TEST_RATE_LIMIT_REMAIN_HEADER = "X-RateLimit-Remaining"
TEST_THRESHOLD = 0.1
@pytest.mark.parametrize(
"headers, expected",
[
({"no_rate_limit_header": "no_values"}, limiter.on_unknown_load),
({"X-RateLimit-Limit": "100", "X-RateLimit-Remaining": "90"}, limiter.on_low_load),
({"X-RateLimit-Limit": "167", "X-RateLimit-Remaining": "80"}, limiter.on_mid_load),
({"X-RateLimit-Limit": "200", "X-RateLimit-Remaining": "20"}, limiter.on_high_load),
],
ids=[
"On UNKNOWN load",
"On Low Load",
"On Mid Load",
"On High Load",
],
)
def test_with_unknown_load(requests_mock, headers, expected):
"""
Test simulates the case with unknown load because of missing rate limit header.
In this case we should wait 1.0 sec sleep before next API call.
"""
requests_mock.get("https://api.intercom.io/", headers=headers)
test_response = requests.get("https://api.intercom.io/")
actual_sleep_time = limiter.get_wait_time(
test_response,
threshold=TEST_THRESHOLD,
rate_limit_header=TEST_RATE_LIMIT_HEADER,
rate_limit_remain_header=TEST_RATE_LIMIT_REMAIN_HEADER,
)
assert expected == actual_sleep_time

View File

@@ -134,58 +134,80 @@ def test_streams(config):
@pytest.mark.parametrize(
"stream, endpoint, response, expected",
"stream, http_method, endpoint, response, expected",
[
(Admins, "/admins", {"type": "admin.list", "admins": [{"type": "admin", "id": "id"}]}, [{"id": "id", "type": "admin"}]),
(Admins, "GET", "/admins", {"type": "admin.list", "admins": [{"type": "admin", "id": "id"}]}, [{"id": "id", "type": "admin"}]),
(
Companies,
"GET",
"/companies/scroll",
{"type": "company.list", "data": [{"type": "company", "id": "id"}]},
[{"id": "id", "type": "company"}],
),
(
CompanySegments,
"GET",
"/companies/id/segments",
{"type": "list", "data": [{"type": "segment", "id": "id"}]},
[{"id": "id", "type": "segment"}],
{"type": "list", "data": [{"type": "segment", "id": "id", "updated_at": 123}]},
[{"id": "id", "type": "segment", "updated_at": 123}],
),
(
Contacts,
"POST",
"/contacts/search",
{"type": "list", "data": [{"type": "contact", "id": "id"}]},
[{"id": "id", "type": "contact"}],
),
(Contacts, "/contacts", {"type": "list", "data": [{"type": "contact", "id": "id"}]}, [{"id": "id", "type": "contact"}]),
(
Conversations,
"/conversations",
"POST",
"/conversations/search",
{"type": "conversation.list", "conversations": [{"type": "conversation", "id": "id"}]},
[{"id": "id", "type": "conversation"}],
),
(
ConversationParts,
"GET",
"/conversations/id",
{"id": "id", "conversation_parts": {"conversation_parts": [{"type": "conversation_part", "id": "id"}]}},
[{"conversation_id": "id", "id": "id", "type": "conversation_part"}],
),
(
CompanyAttributes,
"GET",
"/data_attributes",
{"type": "list", "data": [{"type": "data_attribute", "id": "id"}]},
[{"id": "id", "type": "data_attribute"}],
),
(
ContactAttributes,
"GET",
"/data_attributes",
{"type": "list", "data": [{"type": "data_attribute", "id": "id"}]},
[{"id": "id", "type": "data_attribute"}],
),
(Segments, "/segments", {"type": "segment.list", "segments": [{"type": "segment", "id": "id"}]}, [{"id": "id", "type": "segment"}]),
(Tags, "/tags", {"type": "list", "data": [{"type": "tag", "id": "id"}]}, [{"id": "id", "type": "tag"}]),
(Teams, "/teams", {"teams": [{"type": "team", "id": "id"}]}, [{"id": "id", "type": "team"}]),
(
Segments,
"GET",
"/segments",
{"type": "segment.list", "segments": [{"type": "segment", "id": "id"}]},
[{"id": "id", "type": "segment"}],
),
(Tags, "GET", "/tags", {"type": "list", "data": [{"type": "tag", "id": "id"}]}, [{"id": "id", "type": "tag"}]),
(Teams, "GET", "/teams", {"teams": [{"type": "team", "id": "id"}]}, [{"id": "id", "type": "team"}]),
],
)
def test_read(stream, endpoint, response, expected, requests_mock):
requests_mock.get("/conversations", json=response)
requests_mock.get("/companies/scroll", json=response)
requests_mock.get(endpoint, json=response)
stream = stream(authenticator=NoAuth())
def test_read(stream, http_method, endpoint, response, expected, requests_mock):
def get_mock(http_method, endpoint, response):
if http_method == "POST":
requests_mock.post(endpoint, json=response)
elif http_method == "GET":
requests_mock.get(endpoint, json=response)
requests_mock.post("https://api.intercom.io/conversations/search", json=response)
requests_mock.get("/companies/scroll", json=response)
get_mock(http_method, endpoint, response)
stream = stream(authenticator=NoAuth(), start_date=0)
records = []
for slice in stream.stream_slices(sync_mode=SyncMode.full_refresh):
@@ -194,8 +216,8 @@ def test_read(stream, endpoint, response, expected, requests_mock):
assert records == expected
def build_conversations_response_body(conversations, next_url=None):
return {"type": "conversation.list", "pages": {"next": next_url} if next_url else {}, "conversations": conversations}
def build_conversations_response_body(conversations, next_page=None):
return {"type": "conversation.list", "pages": {"next": next_page} if next_page else {}, "conversations": conversations}
def build_conversation_response_body(conversation_id, conversation_parts):
@@ -232,13 +254,17 @@ def single_conversation_response():
def conversation_parts_responses():
return [
(
"https://api.intercom.io/conversations",
"POST",
"https://api.intercom.io/conversations/search",
build_conversations_response_body(
conversations=[{"id": "151272900026677", "updated_at": 1650988600}, {"id": "151272900026666", "updated_at": 1650988500}],
next_url="https://api.intercom.io/conversations?per_page=2&page=2",
conversations=[
{"id": "151272900026677", "updated_at": 1650988600},
{"id": "151272900026666", "updated_at": 1650988500},
],
),
),
(
"GET",
"https://api.intercom.io/conversations?per_page=2&page=2",
build_conversations_response_body(
conversations=[
@@ -248,23 +274,25 @@ def conversation_parts_responses():
),
),
(
"GET",
"https://api.intercom.io/conversations/151272900026677",
build_conversation_response_body(
conversation_id="151272900026677",
conversation_parts=[{"id": "13740311961", "updated_at": 1650988300}, {"id": "13740311962", "updated_at": 1650988450}],
conversation_parts=[
{"id": "1", "updated_at": 1650988300},
{"id": "2", "updated_at": 1650988450},
],
),
),
(
"GET",
"https://api.intercom.io/conversations/151272900026666",
build_conversation_response_body(
conversation_id="151272900026666",
conversation_parts=[{"id": "13740311955", "updated_at": 1650988150}, {"id": "13740312056", "updated_at": 1650988500}],
),
),
(
"https://api.intercom.io/conversations/151272900026466",
build_conversation_response_body(
conversation_id="151272900026466", conversation_parts=[{"id": "13740311970", "updated_at": 1650988600}]
conversation_parts=[
{"id": "3", "updated_at": 1650988150},
{"id": "4", "updated_at": 1650988151},
],
),
),
]
@@ -292,16 +320,19 @@ def test_conversation_part_filtering_based_on_conversation(requests_mock, conver
"""
Test shows that conversation_parts filters conversations (from parent stream) correctly
"""
updated_at = 1650988200
state = {"updated_at": updated_at}
cursor_value = 1650988200
state = {"updated_at": cursor_value}
expected_record_ids = set()
for response_tuple in conversation_parts_responses:
requests_mock.register_uri("GET", response_tuple[0], json=response_tuple[1])
if "conversation_parts" in response_tuple[1]:
expected_record_ids.update([cp["id"] for cp in response_tuple[1]["conversation_parts"]["conversation_parts"]])
http_method = response_tuple[0]
url = response_tuple[1]
response = response_tuple[2]
requests_mock.register_uri(http_method, url, json=response)
if "conversation_parts" in response:
expected_record_ids.update([cp["id"] for cp in response["conversation_parts"]["conversation_parts"]])
records = []
conversation_parts = ConversationParts(authenticator=NoAuth())
conversation_parts = ConversationParts(authenticator=NoAuth(), start_date=0)
for slice in conversation_parts.stream_slices(sync_mode=SyncMode.incremental, stream_state=state):
records.extend(list(conversation_parts.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=state)))

View File

@@ -49,6 +49,7 @@ The Intercom connector should not run into Intercom API limitations under normal
| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.1.21 | 2022-07-05 | [14403](https://github.com/airbytehq/airbyte/pull/14403) | Refactored `Conversations`, `Conversation Parts`, `Company Segments` to increase performance
| 0.1.20 | 2022-06-24 | [14099](https://github.com/airbytehq/airbyte/pull/14099) | Extended `Contacts` stream schema with `sms_consent`,`unsubscribe_from_sms` properties
| 0.1.19 | 2022-05-25 | [13204](https://github.com/airbytehq/airbyte/pull/13204) | Fixed `conversation_parts` stream schema definition |
| 0.1.18 | 2022-05-04 | [12482](https://github.com/airbytehq/airbyte/pull/12482) | Update input configuration copy |