1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🐛 Source Intercom: Fixed filtering of conversation_parts (#12374)

This commit is contained in:
Luis Gomez
2022-04-29 08:55:26 -05:00
committed by GitHub
parent d3dd403c9a
commit 1642630461
6 changed files with 144 additions and 16 deletions

View File

@@ -362,7 +362,7 @@
- name: Intercom
sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerRepository: airbyte/source-intercom
dockerImageTag: 0.1.16
dockerImageTag: 0.1.17
documentationUrl: https://docs.airbyte.io/integrations/sources/intercom
icon: intercom.svg
sourceType: api

View File

@@ -3709,7 +3709,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-intercom:0.1.16"
- dockerImage: "airbyte/source-intercom:0.1.17"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/intercom"
connectionSpecification:

View File

@@ -35,5 +35,5 @@ COPY source_intercom ./source_intercom
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.name=airbyte/source-intercom

View File

@@ -10,6 +10,7 @@ from urllib.parse import parse_qsl, urljoin, urlparse
import requests
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
@@ -284,6 +285,7 @@ class Conversations(IncrementalIntercomStream):
Endpoint: https://api.intercom.io/conversations
"""
use_cache = True
data_fields = ["conversations"]
def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
@@ -302,20 +304,49 @@ class Conversations(IncrementalIntercomStream):
return "conversations"
class ConversationParts(ChildStreamMixin, IncrementalIntercomStream):
class ConversationParts(IncrementalIntercomStream):
"""Return list of all conversation parts.
API Docs: https://developers.intercom.com/intercom-api-reference/reference#retrieve-a-conversation
Endpoint: https://api.intercom.io/conversations/<id>
"""
data_fields = ["conversation_parts", "conversation_parts"]
parent_stream_class = Conversations
def __init__(self, authenticator: AuthBase, start_date: str = None, **kwargs):
super().__init__(authenticator, start_date, **kwargs)
self.conversations_stream = Conversations(authenticator, start_date, **kwargs)
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"/conversations/{stream_slice['id']}"
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
"""
Returns the stream slices, which correspond to conversation IDs. Uses the `Conversations` stream
to get conversations by `sync_mode` and `state`. Unlike `ChildStreamMixin`, it gets slices based
on the `sync_mode`, so that it does not get all conversations at all times. Since we can't do
`filter_by_state` inside `parse_records`, we need to make sure we get the right conversations only.
Otherwise, this stream would always return all conversation_parts.
"""
parent_stream_slices = self.conversations_stream.stream_slices(
sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state
)
for stream_slice in parent_stream_slices:
conversations = self.conversations_stream.read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
)
for conversation in conversations:
yield {"id": conversation["id"]}
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
records = super().parse_response(response, stream_state, **kwargs)
"""
Adds `conversation_id` to every `conversation_part` record before yielding it. Records are not
filtered by state here, because the aggregate list of `conversation_parts` is not sorted by
`updated_at`, because it gets `conversation_parts` for each `conversation`. Hence, using parent's
`filter_by_state` logic could potentially end up in data loss.
"""
records = super().parse_response(response=response, stream_state={}, **kwargs)
conversation_id = response.json().get("id")
for conversation_part in records:
conversation_part.setdefault("conversation_id", conversation_id)

View File

@@ -179,7 +179,7 @@ def test_streams(config):
(Teams, "/teams", {"teams": [{"type": "team", "id": "id"}]}, [{"id": "id", "type": "team"}]),
],
)
def test_read(stream, endpoint, response, expected, config, requests_mock):
def test_read(stream, endpoint, response, expected, requests_mock):
requests_mock.get("/conversations", json=response)
requests_mock.get("/companies/scroll", json=response)
requests_mock.get(endpoint, json=response)
@@ -194,12 +194,29 @@ def test_read(stream, endpoint, response, expected, config, requests_mock):
assert records == expected
def test_conversation_part_has_conversation_id(requests_mock):
"""
Test shows that conversation_part records include the `conversation_id` field.
"""
def build_conversations_response_body(conversations, next_url = None):
return {
"type": "conversation.list",
"pages": {"next": next_url} if next_url else {},
"conversations": conversations
}
response_body = {
def build_conversation_response_body(conversation_id, conversation_parts):
return {
"type": "conversation",
"id": conversation_id,
"conversation_parts": {
"type": "conversation_part.list",
"conversation_parts": conversation_parts,
"total_count": len(conversation_parts),
},
}
@pytest.fixture
def single_conversation_response():
return {
"type": "conversation",
"id": "151272900024304",
"created_at": 1647365706,
@@ -213,14 +230,93 @@ def test_conversation_part_has_conversation_id(requests_mock):
"total_count": 2,
},
}
url = "https://api.intercom.io/conversations/151272900024304"
requests_mock.get(url, json=response_body)
stream1 = ConversationParts(authenticator=NoAuth())
@pytest.fixture
def conversation_parts_responses():
return [
(
"https://api.intercom.io/conversations",
build_conversations_response_body(
conversations=[
{"id":"151272900026677","updated_at":1650988600},
{"id":"151272900026666","updated_at":1650988500}
],
next_url="https://api.intercom.io/conversations?per_page=2&page=2"
)
),
(
"https://api.intercom.io/conversations?per_page=2&page=2",
build_conversations_response_body(
conversations=[
{"id":"151272900026466","updated_at":1650988450},
{"id":"151272900026680","updated_at":1650988100}, # Older than state, won't be processed
]
)
),
(
"https://api.intercom.io/conversations/151272900026677",
build_conversation_response_body(
conversation_id="151272900026677",
conversation_parts=[
{"id": "13740311961","updated_at":1650988300},
{"id": "13740311962","updated_at":1650988450}
]
)
),
(
"https://api.intercom.io/conversations/151272900026666",
build_conversation_response_body(
conversation_id="151272900026666",
conversation_parts=[
{"id": "13740311955","updated_at":1650988150},
{"id": "13740312056","updated_at":1650988500}
]
)
),
(
"https://api.intercom.io/conversations/151272900026466",
build_conversation_response_body(
conversation_id="151272900026466",
conversation_parts=[{"id": "13740311970","updated_at":1650988600}]
)
)
]
def test_conversation_part_has_conversation_id(requests_mock, single_conversation_response):
"""
Test shows that conversation_part records include the `conversation_id` field.
"""
conversation_id = single_conversation_response["id"]
url = f"https://api.intercom.io/conversations/{conversation_id}"
requests_mock.get(url, json=single_conversation_response)
conversation_parts = ConversationParts(authenticator=NoAuth())
record_count = 0
for record in stream1.read_records(sync_mode=SyncMode.incremental, stream_slice={"id": "151272900024304"}):
for record in conversation_parts.read_records(sync_mode=SyncMode.incremental, stream_slice={"id": conversation_id}):
assert record["conversation_id"] == "151272900024304"
record_count += 1
assert record_count == 2
def test_conversation_part_filtering_based_on_conversation(requests_mock, conversation_parts_responses):
"""
Test shows that conversation_parts filters conversations (from parent stream) correctly
"""
updated_at = 1650988200
state = {"updated_at": updated_at}
expected_record_ids = set()
for response_tuple in conversation_parts_responses:
requests_mock.register_uri('GET', response_tuple[0], json=response_tuple[1])
if "conversation_parts" in response_tuple[1]:
expected_record_ids.update([cp["id"] for cp in response_tuple[1]["conversation_parts"]["conversation_parts"]])
records = []
conversation_parts = ConversationParts(authenticator=NoAuth())
for slice in conversation_parts.stream_slices(sync_mode=SyncMode.incremental, stream_state=state):
records.extend(list(conversation_parts.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=state)))
assert expected_record_ids == {r["id"] for r in records}

View File

@@ -60,6 +60,7 @@ Please read [How to get your Access Token](https://developers.intercom.com/build
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.17 | 2022-04-29 | [12374](https://github.com/airbytehq/airbyte/pull/12374) | Fixed filtering of conversation_parts |
| 0.1.16 | 2022-03-23 | [11206](https://github.com/airbytehq/airbyte/pull/11206) | Added conversation_id field to conversation_part records |
| 0.1.15 | 2022-03-22 | [11176](https://github.com/airbytehq/airbyte/pull/11176) | Correct `check_connection` URL |
| 0.1.14 | 2022-03-16 | [11208](https://github.com/airbytehq/airbyte/pull/11208) | Improve 'conversations' incremental sync speed |