1
0
mirror of synced 2025-12-23 21:03:15 -05:00

🐛 Source Zendesk chat: fix chats stream is only pulling for first page (#7210)

* fix: chats stream is only pulling for first page

* fix: integration tests

* fix: remove cred

* bump version in source definition

* Fixed: integration tests
This commit is contained in:
Harshith Mullapudi
2021-11-01 18:18:54 +05:30
committed by GitHub
parent f3fa71b1fb
commit a80d388e50
15 changed files with 125 additions and 74 deletions

View File

@@ -2,7 +2,7 @@
"sourceDefinitionId": "40d24d0f-b8f9-4fe0-9e6c-b06c0f3f45e4",
"name": "Zendesk Chat",
"dockerRepository": "airbyte/source-zendesk-chat",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/zendesk-chat",
"icon": "zendesk.svg"
}

View File

@@ -554,7 +554,7 @@
- name: Zendesk Chat
sourceDefinitionId: 40d24d0f-b8f9-4fe0-9e6c-b06c0f3f45e4
dockerRepository: airbyte/source-zendesk-chat
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-chat
icon: zendesk.svg
sourceType: api

View File

@@ -1,6 +1,7 @@
*
!Dockerfile
!Dockerfile.test
!main_dev.py
!source_zendesk_chat
!setup.py
!secrets

View File

@@ -1,4 +1,4 @@
FROM airbyte/integration-base-python:0.1.6
FROM python:3.7-slim
# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
@@ -6,13 +6,15 @@ RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
ENV CODE_PATH="source_zendesk_chat"
ENV AIRBYTE_IMPL_MODULE="source_zendesk_chat"
ENV AIRBYTE_IMPL_PATH="SourceZendeskChat"
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main_dev.py"
WORKDIR /airbyte/integration_code
COPY $CODE_PATH ./$CODE_PATH
COPY main_dev.py ./
COPY setup.py ./
RUN pip install .
ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh"
ENTRYPOINT ["python", "/airbyte/integration_code/main_dev.py"]
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-zendesk-chat

View File

@@ -14,7 +14,7 @@ tests:
configured_catalog_path: "sample_files/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalog_incremental.json"
# Unable to use 'state_path' because Zendesk Chat API returns an error when specifying a date in the future.
# future_state_path: "integration_tests/abnormal_state.json"
cursor_paths:

View File

@@ -0,0 +1,13 @@
{
"streams": [
{
"stream": {
"name": "chats",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -0,0 +1,40 @@
{
"streams": [
{
"stream": {
"name": "agents",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["id"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["id"]
},
{
"stream": {
"name": "agent_timeline",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["start_time"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["start_time"]
},
{
"stream": {
"name": "bans",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["id"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["id"]
}
]
}

View File

@@ -1,6 +1,4 @@
{
"credentials": {
"access_token": "wrongkey-access-token",
"start_date": "2020-12-12T00:00:00Z"
}
"access_token": "wrongkey-access-token",
"start_date": "2020-12-12T00:00:00Z"
}

View File

@@ -5,7 +5,7 @@
import sys
from base_python.entrypoint import launch
from airbyte_cdk.entrypoint import launch
from source_zendesk_chat import SourceZendeskChat
if __name__ == "__main__":

View File

@@ -1,29 +1,5 @@
{
"streams": [
{
"stream": {
"name": "agents",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["id"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["id"]
},
{
"stream": {
"name": "agent_timeline",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["start_time"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["start_time"]
},
{
"stream": {
"name": "accounts",
@@ -60,18 +36,6 @@
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "bans",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["id"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["id"]
},
{
"stream": {
"name": "departments",
@@ -107,15 +71,6 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "routing_settings",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -1,4 +1,4 @@
{
"access_token": "<your_access_token>",
"start_date": "2020-11-01T00:00:00"
"start_date": "2020-11-01T00:00:00Z"
}

View File

@@ -6,13 +6,12 @@
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = [
"airbyte-protocol",
"base-python",
"airbyte-cdk~=0.1",
"pendulum==1.2.0",
"requests==2.25.1",
]
TEST_REQUIREMENTS = ["pytest==6.1.2", "source-acceptance-test"]
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock"]
setup(
name="source_zendesk_chat",

View File

@@ -5,23 +5,26 @@
from typing import Any, List, Mapping, Tuple
from airbyte_protocol import SyncMode
from base_python import AbstractSource, Stream, TokenAuthenticator
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from .api import Accounts, Agents, AgentTimelines, Bans, Chats, Departments, Goals, Roles, RoutingSettings, Shortcuts, Skills, Triggers
from .streams import Accounts, Agents, AgentTimelines, Bans, Chats, Departments, Goals, Roles, RoutingSettings, Shortcuts, Skills, Triggers
class SourceZendeskChat(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
try:
authenticator = TokenAuthenticator(token=config["access_token"])
list(RoutingSettings(authenticator=authenticator).read_records(SyncMode.full_refresh))
authenticator = TokenAuthenticator(config["access_token"])
records = RoutingSettings(authenticator=authenticator).read_records(sync_mode=SyncMode.full_refresh)
next(records)
return True, None
except Exception as error:
return False, f"Unable to connect to Zendesk Chat API with the provided credentials - {error}"
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(token=config["access_token"])
authenticator = TokenAuthenticator(config["access_token"])
return [
Agents(authenticator=authenticator),
AgentTimelines(authenticator=authenticator, start_date=config["start_date"]),

View File

@@ -5,16 +5,19 @@
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib.parse import parse_qs, urlparse
import pendulum
import requests
from base_python import HttpStream
from airbyte_cdk.sources.streams.http import HttpStream
class Stream(HttpStream):
class Stream(HttpStream, ABC):
url_base = "https://www.zopim.com/api/v2/"
primary_key = "id"
data_field = None
limit = 100
def backoff_time(self, response: requests.Response) -> Optional[float]:
@@ -26,7 +29,12 @@ class Stream(HttpStream):
return self.name
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return {}
response_data = response.json()
if "next_url" in response_data:
next_url = response_data["next_url"]
cursor = parse_qs(urlparse(next_url).query)["cursor"]
return {"cursor": cursor}
def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
@@ -48,12 +56,15 @@ class Stream(HttpStream):
response_data = response_data.get(self.data_field, [])
if isinstance(response_data, list):
return response_data
return list(map(self.parse_response_obj, response_data))
elif isinstance(response_data, dict):
return [response_data]
return [self.parse_response_obj(response_data)]
else:
raise Exception(f"Unsupported type of response data for stream {self.name}")
def parse_response_obj(self, response_obj: dict) -> dict:
return response_obj
class BaseIncrementalStream(Stream, ABC):
@property
@@ -95,8 +106,9 @@ class TimeIncrementalStream(BaseIncrementalStream, ABC):
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
latest_benchmark = self._field_to_datetime(latest_record[self.cursor_field])
if current_stream_state.get(self.cursor_field):
return {self.cursor_field: str(max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field])))}
return {self.cursor_field: str(latest_benchmark)}
state = max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field]))
return {self.cursor_field: state.strftime("%Y-%m-%dT%H:%M:%SZ")}
return {self.cursor_field: latest_benchmark.strftime("%Y-%m-%dT%H:%M:%SZ")}
def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
@@ -117,6 +129,10 @@ class TimeIncrementalStream(BaseIncrementalStream, ABC):
def path(self, **kwargs) -> str:
return f"incremental/{self.name}"
def parse_response_obj(self, response_obj: dict) -> dict:
response_obj[self.cursor_field] = pendulum.parse(response_obj[self.cursor_field]).strftime("%Y-%m-%dT%H:%M:%SZ")
return response_obj
class IdIncrementalStream(BaseIncrementalStream):
cursor_field = "id"
@@ -167,12 +183,28 @@ class AgentTimelines(TimeIncrementalStream):
params["start_time"] = params["start_time"] * 1000000
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()
stream_data = self.get_stream_data(response_data)
def generate_key(record):
record.update({"id": "|".join((str(record.get("agent_id", "")), str(record.get("start_time", ""))))})
return record
# associate the surrogate key
yield from map(
generate_key,
stream_data,
)
class Accounts(Stream):
"""
Accounts Stream: https://developer.zendesk.com/rest_api/docs/chat/accounts#show-account
"""
primary_key = "account_key"
def path(self, **kwargs) -> str:
return "account"
@@ -237,8 +269,15 @@ class RoutingSettings(Stream):
Routing Settings Stream: https://developer.zendesk.com/rest_api/docs/chat/routing_settings#show-account-routing-settings
"""
primary_key = ""
name = "routing_settings"
data_field = "data"
def path(self, **kwargs) -> str:
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
return "routing_settings/account"

View File

@@ -62,6 +62,7 @@ We recommend creating a restricted, read-only key specifically for Airbyte acces
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.3 | 2021-10-21 | [7210](https://github.com/airbytehq/airbyte/pull/7210) | Chats stream is only getting data from first page |
| 0.1.2 | 2021-08-17 | [5476](https://github.com/airbytehq/airbyte/pull/5476) | Correct field unread to boolean type |
| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |
| 0.1.0 | 2021-05-03 | [3088](https://github.com/airbytehq/airbyte/pull/3088) | Initial release |