1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source Zendesk Support: Fix cursor pagination (#28487)

* Source Zendesk Support: Remove extra page from params

* Source Zendesk Support: bump version

* Source Zendesk Support: Refactor + Fix pagination issues

* Source Zendesk Support: fix object count

* Source Zendesk Support: update timeout for tests

* Source Zendesk Support: update expected_records

* Source Zendesk Support: update expected_records

* Source Zendesk Support: update ignore pagination
This commit is contained in:
Artem Inzhyyants
2023-07-21 22:06:51 +02:00
committed by GitHub
parent 53da5baa7d
commit a7b5a9c6b7
9 changed files with 181 additions and 496 deletions

View File

@@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.10.1
LABEL io.airbyte.version=0.10.2
LABEL io.airbyte.name=airbyte/source-zendesk-support

View File

@@ -21,6 +21,7 @@ acceptance_tests:
basic_read:
tests:
- config_path: "secrets/config.json"
timeout_seconds: 2400
expect_records:
path: "integration_tests/expected_records.jsonl"
extra_fields: no
@@ -49,3 +50,4 @@ acceptance_tests:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 2400

View File

@@ -29,9 +29,9 @@
{"stream":"ticket_fields","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_fields/360002833096.json","id":360002833096,"type":"description","title":"Description","raw_title":"Description","description":"Please enter the details of your request. A member of our support staff will respond as soon as possible.","raw_description":"Please enter the details of your request. A member of our support staff will respond as soon as possible.","position":2,"active":true,"required":false,"collapsed_for_agents":false,"regexp_for_validation":null,"title_in_portal":"Description","raw_title_in_portal":"Description","visible_in_portal":true,"editable_in_portal":true,"required_in_portal":true,"tag":null,"created_at":"2020-12-11T18:34:05Z","updated_at":"2020-12-11T18:34:05Z","removable":false,"key":null,"agent_description":null},"emitted_at":1687861693520}
{"stream":"ticket_fields","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_fields/360002833116.json","id":360002833116,"type":"status","title":"Status","raw_title":"Status","description":"Request status","raw_description":"Request status","position":3,"active":true,"required":false,"collapsed_for_agents":false,"regexp_for_validation":null,"title_in_portal":"Status","raw_title_in_portal":"Status","visible_in_portal":false,"editable_in_portal":false,"required_in_portal":false,"tag":null,"created_at":"2020-12-11T18:34:05Z","updated_at":"2020-12-11T18:34:05Z","removable":false,"key":null,"agent_description":null,"system_field_options":[{"name":"Open","value":"open"},{"name":"Pending","value":"pending"},{"name":"Solved","value":"solved"}],"sub_type_id":0},"emitted_at":1687861693521}
{"stream":"ticket_forms","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_forms/360000084116.json","name":"Default Ticket Form","display_name":"Default Ticket Form","id":360000084116,"raw_name":"Default Ticket Form","raw_display_name":"Default Ticket Form","end_user_visible":true,"position":1,"ticket_field_ids":[360002833076,360002833096,360002833116,360002833136,360002833156,360002833176,360002833196],"active":true,"default":true,"created_at":"2020-12-11T18:34:37Z","updated_at":"2020-12-11T18:34:37Z","in_all_brands":true,"restricted_brand_ids":[],"end_user_conditions":[],"agent_conditions":[]},"emitted_at":1687861694440}
{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7283000498191.json","id":7283000498191,"ticket_id":153,"created_at":"2023-06-26T11:31:48Z","updated_at":"2023-06-26T12:13:42Z","group_stations":2,"assignee_stations":2,"reopens":0,"replies":0,"assignee_updated_at":"2023-06-26T11:31:48Z","requester_updated_at":"2023-06-26T11:31:48Z","status_updated_at":"2023-06-26T11:31:48Z","initially_assigned_at":"2023-06-26T11:31:48Z","assigned_at":"2023-06-26T12:13:42Z","solved_at":null,"latest_comment_added_at":"2023-06-26T11:31:48Z","reply_time_in_minutes":{"calendar":null,"business":null},"first_resolution_time_in_minutes":{"calendar":null,"business":null},"full_resolution_time_in_minutes":{"calendar":null,"business":null},"agent_wait_time_in_minutes":{"calendar":null,"business":null},"requester_wait_time_in_minutes":{"calendar":null,"business":null},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:31:48Z"},"emitted_at":1687861695566}
{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7282909551759.json","id":7282909551759,"ticket_id":152,"created_at":"2023-06-26T11:10:33Z","updated_at":"2023-06-26T11:25:43Z","group_stations":1,"assignee_stations":1,"reopens":0,"replies":1,"assignee_updated_at":"2023-06-26T11:25:43Z","requester_updated_at":"2023-06-26T11:10:33Z","status_updated_at":"2023-06-26T11:25:43Z","initially_assigned_at":"2023-06-26T11:10:33Z","assigned_at":"2023-06-26T11:10:33Z","solved_at":"2023-06-26T11:25:43Z","latest_comment_added_at":"2023-06-26T11:21:06Z","reply_time_in_minutes":{"calendar":11,"business":0},"first_resolution_time_in_minutes":{"calendar":15,"business":0},"full_resolution_time_in_minutes":{"calendar":15,"business":0},"agent_wait_time_in_minutes":{"calendar":15,"business":0},"requester_wait_time_in_minutes":{"calendar":0,"business":0},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:25:43Z"},"emitted_at":1687861695567}
{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7282901696015.json","id":7282901696015,"ticket_id":151,"created_at":"2023-06-26T11:09:33Z","updated_at":"2023-06-26T12:03:38Z","group_stations":1,"assignee_stations":1,"reopens":0,"replies":1,"assignee_updated_at":"2023-06-26T12:03:37Z","requester_updated_at":"2023-06-26T11:09:33Z","status_updated_at":"2023-06-26T11:09:33Z","initially_assigned_at":"2023-06-26T11:09:33Z","assigned_at":"2023-06-26T11:09:33Z","solved_at":null,"latest_comment_added_at":"2023-06-26T12:03:37Z","reply_time_in_minutes":{"calendar":54,"business":0},"first_resolution_time_in_minutes":{"calendar":null,"business":null},"full_resolution_time_in_minutes":{"calendar":null,"business":null},"agent_wait_time_in_minutes":{"calendar":null,"business":null},"requester_wait_time_in_minutes":{"calendar":null,"business":null},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:09:33Z"},"emitted_at":1687861695567}
{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7283000498191.json","id":7283000498191,"ticket_id":153,"created_at":"2023-06-26T11:31:48Z","updated_at":"2023-06-26T12:13:42Z","group_stations":2,"assignee_stations":2,"reopens":0,"replies":0,"assignee_updated_at":"2023-06-26T11:31:48Z","requester_updated_at":"2023-06-26T11:31:48Z","status_updated_at":"2023-06-26T11:31:48Z","initially_assigned_at":"2023-06-26T11:31:48Z","assigned_at":"2023-06-26T12:13:42Z","solved_at":null,"latest_comment_added_at":"2023-06-26T11:31:48Z","reply_time_in_minutes":{"calendar":null,"business":null},"first_resolution_time_in_minutes":{"calendar":null,"business":null},"full_resolution_time_in_minutes":{"calendar":null,"business":null},"agent_wait_time_in_minutes":{"calendar":null,"business":null},"requester_wait_time_in_minutes":{"calendar":null,"business":null},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:31:48Z"},"emitted_at":1689884373175}
{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7282909551759.json","id":7282909551759,"ticket_id":152,"created_at":"2023-06-26T11:10:33Z","updated_at":"2023-06-26T11:25:43Z","group_stations":1,"assignee_stations":1,"reopens":0,"replies":1,"assignee_updated_at":"2023-06-26T11:25:43Z","requester_updated_at":"2023-06-26T11:10:33Z","status_updated_at":"2023-07-16T12:01:39Z","initially_assigned_at":"2023-06-26T11:10:33Z","assigned_at":"2023-06-26T11:10:33Z","solved_at":"2023-06-26T11:25:43Z","latest_comment_added_at":"2023-06-26T11:21:06Z","reply_time_in_minutes":{"calendar":11,"business":0},"first_resolution_time_in_minutes":{"calendar":15,"business":0},"full_resolution_time_in_minutes":{"calendar":15,"business":0},"agent_wait_time_in_minutes":{"calendar":15,"business":0},"requester_wait_time_in_minutes":{"calendar":0,"business":0},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:25:43Z"},"emitted_at":1689884373175}
{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7282901696015.json","id":7282901696015,"ticket_id":151,"created_at":"2023-06-26T11:09:33Z","updated_at":"2023-06-26T12:03:38Z","group_stations":1,"assignee_stations":1,"reopens":0,"replies":1,"assignee_updated_at":"2023-06-26T12:03:37Z","requester_updated_at":"2023-06-26T11:09:33Z","status_updated_at":"2023-06-26T11:09:33Z","initially_assigned_at":"2023-06-26T11:09:33Z","assigned_at":"2023-06-26T11:09:33Z","solved_at":null,"latest_comment_added_at":"2023-06-26T12:03:37Z","reply_time_in_minutes":{"calendar":54,"business":0},"first_resolution_time_in_minutes":{"calendar":null,"business":null},"full_resolution_time_in_minutes":{"calendar":null,"business":null},"agent_wait_time_in_minutes":{"calendar":null,"business":null},"requester_wait_time_in_minutes":{"calendar":null,"business":null},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:09:33Z"},"emitted_at":1689884373175}
{"stream":"ticket_metric_events","data":{"id":4992797383183,"ticket_id":121,"metric":"agent_work_time","instance_id":0,"type":"measure","time":"2022-06-17T14:49:20Z"},"emitted_at":1687861699258}
{"stream":"ticket_metric_events","data":{"id":4992797383311,"ticket_id":121,"metric":"pausable_update_time","instance_id":0,"type":"measure","time":"2022-06-17T14:49:20Z"},"emitted_at":1687861699259}
{"stream":"ticket_metric_events","data":{"id":4992797383439,"ticket_id":121,"metric":"reply_time","instance_id":0,"type":"measure","time":"2022-06-17T14:49:20Z"},"emitted_at":1687861699260}
@@ -52,4 +52,4 @@
{"stream": "attribute_definitions", "data": {"title": "Form", "subject": "ticket_form_id", "type": "list", "group": "ticket", "nullable": false, "repeatable": false, "operators": [{"value": "is", "title": "Is", "terminal": false}, {"value": "is_not", "title": "Is not", "terminal": false}], "values": [{"value": "360000084116", "title": "Default Ticket Form", "enabled": true}], "condition": "all"}, "emitted_at": 1687777243928}
{"stream":"ticket_skips","data":{"id":7290033348623,"ticket_id":121,"user_id":360786799676,"reason":"I have no idea.","created_at":"2023-06-27T08:24:02Z","updated_at":"2023-06-27T08:24:02Z","ticket":{"url":"https://d3v-airbyte.zendesk.com/api/v2/tickets/121.json","id":121,"external_id":null,"via":{"channel":"voice","source":{"rel":"voicemail","from":{"formatted_phone":"+1 (689) 689-8023","phone":"+16896898023","name":"Caller +1 (689) 689-8023"},"to":{"formatted_phone":"+1 (205) 953-1462","phone":"+12059531462","name":"Airbyte","brand_id":360000358316}}},"created_at":"2022-06-17T14:49:20Z","updated_at":"2022-06-17T16:01:42Z","type":null,"subject":"Voicemail from: Caller +1 (689) 689-8023","raw_subject":"Voicemail from: Caller +1 (689) 689-8023","description":"Call from: +1 (689) 689-8023\\nTime of call: June 17, 2022 at 2:48:27 PM","priority":null,"status":"new","recipient":null,"requester_id":4992781783439,"submitter_id":4992781783439,"assignee_id":null,"organization_id":null,"group_id":null,"collaborator_ids":[],"follower_ids":[],"email_cc_ids":[],"forum_topic_id":null,"problem_id":null,"has_incidents":false,"is_public":false,"due_at":null,"tags":[],"custom_fields":[],"satisfaction_rating":{"score":"offered"},"sharing_agreement_ids":[],"custom_status_id":4044356,"fields":[],"followup_ids":[],"ticket_form_id":360000084116,"deleted_ticket_form_id":null,"brand_id":360000358316,"allow_channelback":false,"allow_attachments":true,"from_messaging_channel":false}},"emitted_at":1687861697932}
{"stream":"ticket_skips","data":{"id":7290088475023,"ticket_id":125,"user_id":360786799676,"reason":"Another test skip.","created_at":"2023-06-27T08:30:01Z","updated_at":"2023-06-27T08:30:01Z","ticket":{"url":"https://d3v-airbyte.zendesk.com/api/v2/tickets/125.json","id":125,"external_id":null,"via":{"channel":"web","source":{"from":{},"to":{},"rel":null}},"created_at":"2022-07-18T10:16:53Z","updated_at":"2022-07-18T10:36:02Z","type":"question","subject":"Ticket Test 2","raw_subject":"Ticket Test 2","description":"238473846","priority":"urgent","status":"open","recipient":null,"requester_id":360786799676,"submitter_id":360786799676,"assignee_id":361089721035,"organization_id":360033549136,"group_id":5059439464079,"collaborator_ids":[360786799676],"follower_ids":[360786799676],"email_cc_ids":[],"forum_topic_id":null,"problem_id":null,"has_incidents":false,"is_public":false,"due_at":null,"tags":[],"custom_fields":[],"satisfaction_rating":{"score":"unoffered"},"sharing_agreement_ids":[],"custom_status_id":4044376,"fields":[],"followup_ids":[],"ticket_form_id":360000084116,"deleted_ticket_form_id":null,"brand_id":360000358316,"allow_channelback":false,"allow_attachments":true,"from_messaging_channel":false}},"emitted_at":1687861697934}
{"stream":"posts","data":{"id":7253375870607,"title":"Which topics should I add to my community?","details":"<p>That depends. If you support several products, you might add a topic for each product. If you have one big product, you might add a topic for each major feature area or task. If you have different types of users (for example, end users and API developers), you might add a topic or topics for each type of user.</p><p>A General Discussion topic is a place for users to discuss issues that don't quite fit in the other topics. You could monitor this topic for emerging issues that might need their own topics.</p>\n\n<p>To create your own topics, see <a href=\"https://support.zendesk.com/hc/en-us/articles/205586568#topic_dqx_znw_3k\" target=\"_blank\" rel=\"nofollow noreferrer\">Adding community discussion topics</a>.</p>","author_id":360786799676,"vote_sum":0,"vote_count":0,"comment_count":0,"follower_count":0,"topic_id":7253351897871,"html_url":"https://d3v-airbyte.zendesk.com/hc/en-us/community/posts/7253375870607-Which-topics-should-I-add-to-my-community-","created_at":"2023-06-22T00:32:21Z","updated_at":"2023-06-22T00:32:21Z","url":"https://d3v-airbyte.zendesk.com/api/v2/help_center/community/posts/7253375870607-Which-topics-should-I-add-to-my-community-.json","featured":false,"pinned":false,"closed":false,"frozen":false,"status":"none","non_author_editor_id":null,"non_author_updated_at":null,"content_tag_ids":[]},"emitted_at":1688581224313}
{"stream":"posts","data":{"id":7253375870607,"title":"Which topics should I add to my community?","details":"<p>That depends. If you support several products, you might add a topic for each product. If you have one big product, you might add a topic for each major feature area or task. If you have different types of users (for example, end users and API developers), you might add a topic or topics for each type of user.</p><p>A General Discussion topic is a place for users to discuss issues that don't quite fit in the other topics. You could monitor this topic for emerging issues that might need their own topics.</p>\n\n<p>To create your own topics, see <a href=\"https://support.zendesk.com/hc/en-us/articles/205586568#topic_dqx_znw_3k\" target=\"_blank\" rel=\"nofollow noreferrer\">Adding community discussion topics</a>.</p>","author_id":360786799676,"vote_sum":0,"vote_count":0,"comment_count":0,"follower_count":0,"topic_id":7253351897871,"html_url":"https://d3v-airbyte.zendesk.com/hc/en-us/community/posts/7253375870607-Which-topics-should-I-add-to-my-community-","created_at":"2023-06-22T00:32:21Z","updated_at":"2023-06-22T00:32:21Z","url":"https://d3v-airbyte.zendesk.com/api/v2/help_center/community/posts/7253375870607-Which-topics-should-I-add-to-my-community-.json","featured":false,"pinned":false,"closed":false,"frozen":false,"status":"none","non_author_editor_id":null,"non_author_updated_at":null},"emitted_at":1689889045524}

View File

@@ -7,7 +7,7 @@ data:
connectorType: source
maxSecondsBetweenMessages: 10800
definitionId: 79c1aa37-dae3-42ae-b333-d1c105477715
dockerImageTag: 0.10.1
dockerImageTag: 0.10.2
dockerRepository: airbyte/source-zendesk-support
githubIssueLabel: source-zendesk-support
icon: zendesk-support.svg

View File

@@ -5,7 +5,7 @@
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = ["airbyte-cdk", "pytz", "requests-futures~=1.0.0", "pendulum~=2.1.2"]
MAIN_REQUIREMENTS = ["airbyte-cdk", "pytz"]
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6", "connector-acceptance-test", "requests-mock==1.9.3"]

View File

@@ -3,19 +3,12 @@
#
import calendar
import functools
import logging
import re
import time
from abc import ABC
from collections import deque
from concurrent.futures import Future, ProcessPoolExecutor
from datetime import datetime, timedelta
from functools import partial
from math import ceil
from pickle import PickleError, dumps
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib.parse import urljoin
from urllib.parse import parse_qsl, urljoin, urlparse
import pendulum
import pytz
@@ -24,14 +17,9 @@ from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import package_name_from_class
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.sources.streams.http.rate_limiting import TRANSIENT_EXCEPTIONS
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from requests.auth import AuthBase
from requests_futures.sessions import PICKLE_ERROR, FuturesSession
from source_zendesk_support.ZendeskSupportAvailabilityStrategy import ZendeskSupportAvailabilityStrategy
DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%SZ"
@@ -40,34 +28,9 @@ END_OF_STREAM_KEY: str = "end_of_stream"
logger = logging.getLogger("airbyte")
# For some streams, multiple http requests are running at the same time for performance reasons.
# However, it may result in hitting the rate limit, therefore subsequent requests have to be made after a pause.
# The idea is to sustain a pause once and continue making multiple requests at a time.
# A single `retry_at` variable is introduced here, which prevents us from duplicate sleeping in the main thread
# before each request is made as it used to be in prior versions.
# It acts like a global counter - increased each time a 429 status is met
# only if it is greater than the current value. On the other hand, no request may be made before this moment.
# Because the requests are made in parallel, time.sleep will be called in parallel as well.
# This is possible because it is a point in time, not timedelta.
retry_at: Optional[datetime] = None
def sleep_before_executing(sleep_time: float):
def wrapper(function):
@functools.wraps(function)
def inner(*args, **kwargs):
logger.info(f"Sleeping {sleep_time} seconds before next request")
time.sleep(int(sleep_time))
result = function(*args, **kwargs)
return result, datetime.utcnow()
return inner
return wrapper
def to_int(s):
"https://github.com/airbytehq/airbyte/issues/13673"
"""https://github.com/airbytehq/airbyte/issues/13673"""
if isinstance(s, str):
res = re.findall(r"[-+]?\d+", s)
if res:
@@ -79,41 +42,7 @@ class SourceZendeskException(Exception):
"""default exception of custom SourceZendesk logic"""
class SourceZendeskSupportFuturesSession(FuturesSession):
"""
Check the docs at https://github.com/ross/requests-futures
Used to async execute a set of requests.
"""
def send_future(self, request: requests.PreparedRequest, **kwargs) -> Future:
"""
Use instead of default `Session.send()` method.
`Session.send()` should not be overridden as it used by `requests-futures` lib.
"""
if self.session:
func = self.session.send
else:
sleep_time = 0
now = datetime.utcnow()
if retry_at and retry_at > now:
sleep_time = (retry_at - datetime.utcnow()).seconds
# avoid calling super to not break pickled method
func = partial(requests.Session.send, self)
func = sleep_before_executing(sleep_time)(func)
if isinstance(self.executor, ProcessPoolExecutor):
logger.warning("ProcessPoolExecutor is used to perform IO related tasks for unknown reason!")
# verify function can be pickled
try:
dumps(func)
except (TypeError, PickleError):
raise RuntimeError(PICKLE_ERROR)
return self.executor.submit(func, request, **kwargs)
class BaseSourceZendeskSupportStream(HttpStream, ABC):
class BaseZendeskSupportStream(HttpStream, ABC):
raise_on_http_errors = True
def __init__(self, subdomain: str, start_date: str, ignore_pagination: bool = False, **kwargs):
@@ -201,7 +130,7 @@ class BaseSourceZendeskSupportStream(HttpStream, ABC):
return super().should_retry(response)
class SourceZendeskSupportStream(BaseSourceZendeskSupportStream):
class SourceZendeskSupportStream(BaseZendeskSupportStream):
"""Basic Zendesk class"""
primary_key = "id"
@@ -210,17 +139,9 @@ class SourceZendeskSupportStream(BaseSourceZendeskSupportStream):
cursor_field = "updated_at"
response_list_name: str = None
future_requests: deque = None
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None, **kwargs):
super().__init__(**kwargs)
self._session = SourceZendeskSupportFuturesSession()
self._session.auth = authenticator
self.future_requests = deque()
@property
def url_base(self) -> str:
return f"https://{self._subdomain}.zendesk.com/api/v2/"
@@ -256,52 +177,11 @@ class SourceZendeskSupportStream(BaseSourceZendeskSupportStream):
if start_date:
params["start_time"] = self.str2datetime(start_date)
response = self._session.request("get", count_url).result()
response = self._session.request("get", count_url)
records_count = response.json().get("count", {}).get("value", 0)
return records_count
def generate_future_requests(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
):
records_count = self.get_api_records_count(stream_slice=stream_slice, stream_state=stream_state)
self.logger.info(f"Records count is {records_count}")
page_count = ceil(records_count / self.page_size)
for page_number in range(1, page_count + 1):
params = self.request_params(stream_state=stream_state, stream_slice=stream_slice)
params["page"] = page_number
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=params,
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice)
self.future_requests.append(
{
"future": self._send_request(request, request_kwargs),
"request": request,
"request_kwargs": request_kwargs,
"retries": 0,
}
)
self.logger.info(f"Generated {len(self.future_requests)} future requests")
def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> Future:
response: Future = self._session.send_future(request, **request_kwargs)
return response
def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> Future:
return self._send(request, request_kwargs)
def request_params(
self,
stream_state: Mapping[str, Any],
@@ -325,65 +205,8 @@ class SourceZendeskSupportStream(BaseSourceZendeskSupportStream):
return params
def _retry(
self,
request: requests.PreparedRequest,
retries: int,
original_exception: Exception = None,
response: requests.Response = None,
finished_at: Optional[datetime] = None,
**request_kwargs,
):
if retries == self.max_retries:
if original_exception:
raise original_exception
raise DefaultBackoffException(request=request, response=response)
if response is not None:
sleep_time = self.backoff_time(response)
if finished_at and sleep_time:
current_retry_at = finished_at + timedelta(seconds=sleep_time)
global retry_at
if not retry_at or (retry_at < current_retry_at):
retry_at = current_retry_at
self.logger.info(f"Adding a request to be retried in {sleep_time} seconds")
self.future_requests.append(
{
"future": self._send_request(request, request_kwargs),
"request": request,
"request_kwargs": request_kwargs,
"retries": retries + 1,
}
)
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
self.generate_future_requests(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
while len(self.future_requests) > 0:
self.logger.info("Starting another while loop iteration")
item = self.future_requests.popleft()
request, retries, future, kwargs = item["request"], item["retries"], item["future"], item["request_kwargs"]
try:
response, finished_at = future.result()
except TRANSIENT_EXCEPTIONS as exc:
self.logger.info("Will retry the request because of a transient exception")
self._retry(request=request, retries=retries, original_exception=exc, **kwargs)
continue
if self.should_retry(response):
self.logger.info("Will retry the request for other reason")
self._retry(request=request, retries=retries, response=response, finished_at=finished_at, **kwargs)
continue
self.logger.info("Request successful, will parse the response now")
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream):
class FullRefreshZendeskSupportStream(BaseZendeskSupportStream):
"""
Endpoints don't provide the updated_at/created_at fields
Thus we can't implement an incremental logic for them
@@ -413,14 +236,13 @@ class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream):
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
params.update({"page[size]": self.page_size})
params = {"page[size]": self.page_size}
if next_page_token:
params.update(next_page_token)
return params
class SourceZendeskSupportCursorPaginationStream(SourceZendeskSupportFullRefreshStream):
class IncrementalZendeskSupportStream(FullRefreshZendeskSupportStream):
"""
Endpoints provide a cursor pagination and sorting mechanism
"""
@@ -442,6 +264,42 @@ class SourceZendeskSupportCursorPaginationStream(SourceZendeskSupportFullRefresh
state = stream_state.get(self.cursor_field) or self._start_date if stream_state else self._start_date
return calendar.timegm(pendulum.parse(state).utctimetuple())
class CursorPaginationZendeskSupportStream(IncrementalZendeskSupportStream):
"""Zendesk Support Cursor Pagination, see https://developer.zendesk.com/api-reference/introduction/pagination/#using-cursor-pagination"""
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self._ignore_pagination:
return None
meta = response.json().get("meta", {})
return {"page[after]": meta.get("after_cursor")} if meta.get("has_more") else None
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = {
"start_time": self.check_stream_state(stream_state),
"page[size]": self.page_size,
}
if next_page_token:
params.pop("start_time", None)
params.update(next_page_token)
return params
class TimeBasedPaginationZendeskSupportStream(IncrementalZendeskSupportStream):
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self._ignore_pagination:
return None
start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time")
if start_time != self.prev_start_time:
self.prev_start_time = start_time
return {self.cursor_field: int(start_time)}
def request_params(
self,
stream_state: Mapping[str, Any],
@@ -457,7 +315,7 @@ class SourceZendeskSupportCursorPaginationStream(SourceZendeskSupportFullRefresh
return params
class SourceZendeskIncrementalExportStream(SourceZendeskSupportCursorPaginationStream):
class SourceZendeskIncrementalExportStream(IncrementalZendeskSupportStream):
"""Incremental Export from Tickets stream:
https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based
@@ -487,8 +345,10 @@ class SourceZendeskIncrementalExportStream(SourceZendeskSupportCursorPaginationS
"""
Returns next_page_token based on `end_of_stream` parameter inside of response
"""
next_page_token = super().next_page_token(response)
return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token
if self._ignore_pagination:
return None
response_json = response.json()
return None if response_json.get(END_OF_STREAM_KEY, False) else {"cursor": response_json.get("after_cursor")}
def request_params(
self,
@@ -501,6 +361,9 @@ class SourceZendeskIncrementalExportStream(SourceZendeskSupportCursorPaginationS
params["start_time"] = self.check_start_time_param(params["start_time"])
if self.sideload_param:
params["include"] = self.sideload_param
if next_page_token:
params.pop("start_time", None)
params.update(next_page_token)
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
@@ -524,6 +387,42 @@ class SourceZendeskSupportTicketEventsExportStream(SourceZendeskIncrementalExpor
list_entities_from_event: List[str] = None
event_type: str = None
def path(
self,
*,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
return f"incremental/{self.response_list_name}.json"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Returns next_page_token based on `end_of_stream` parameter inside of response
"""
response_json = response.json()
return None if response_json.get(END_OF_STREAM_KEY, False) else {"start_time": response_json.get("end_time")}
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
next_page_token = next_page_token or {}
parsed_state = self.check_stream_state(stream_state)
if self.cursor_field:
params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)}
else:
params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())}
# check "start_time" is not in the future
params["start_time"] = self.check_start_time_param(params["start_time"])
if self.sideload_param:
params["include"] = self.sideload_param
if next_page_token:
params.update(next_page_token)
return params
@property
def update_event_from_record(self) -> bool:
"""Returns True/False based on list_entities_from_event property"""
@@ -539,29 +438,14 @@ class SourceZendeskSupportTicketEventsExportStream(SourceZendeskIncrementalExpor
yield event
class OrganizationMemberships(SourceZendeskSupportCursorPaginationStream):
class OrganizationMemberships(CursorPaginationZendeskSupportStream):
"""OrganizationMemberships stream: https://developer.zendesk.com/api-reference/ticketing/organizations/organization_memberships/"""
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = {
"start_time": self.check_stream_state(stream_state),
"page[size]": self.page_size,
}
if next_page_token:
params.pop("start_time", None)
params.update(next_page_token)
return params
class AuditLogs(SourceZendeskSupportCursorPaginationStream):
class AuditLogs(CursorPaginationZendeskSupportStream):
"""AuditLogs stream: https://developer.zendesk.com/api-reference/ticketing/account-configuration/audit_logs/#list-audit-logs"""
# can request a maximum of 1,00 results
# can request a maximum of 100 results
page_size = 100
# audit_logs doesn't have the 'updated_by' field
cursor_field = "created_at"
@@ -572,12 +456,35 @@ class Users(SourceZendeskIncrementalExportStream):
response_list_name: str = "users"
def path(self, **kwargs) -> str:
return "incremental/users/cursor.json"
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
next_page_token = next_page_token or {}
parsed_state = self.check_stream_state(stream_state)
if self.cursor_field:
params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)}
else:
params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())}
# check "start_time" is not in the future
params["start_time"] = self.check_start_time_param(params["start_time"])
if self.sideload_param:
params["include"] = self.sideload_param
if next_page_token:
params.update(next_page_token)
return params
class Organizations(SourceZendeskSupportStream):
"""Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/"""
class Posts(SourceZendeskSupportCursorPaginationStream):
class Posts(CursorPaginationZendeskSupportStream):
"""Posts stream: https://developer.zendesk.com/api-reference/help_center/help-center-api/posts/#list-posts"""
use_cache = True
@@ -594,13 +501,35 @@ class Tickets(SourceZendeskIncrementalExportStream):
response_list_name: str = "tickets"
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
@staticmethod
def check_start_time_param(requested_start_time: int, value: int = 1):
def path(self, **kwargs) -> str:
return "incremental/tickets/cursor.json"
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
next_page_token = next_page_token or {}
parsed_state = self.check_stream_state(stream_state)
if self.cursor_field:
params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)}
else:
params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())}
# check "start_time" is not in the future
params["start_time"] = self.check_start_time_param(params["start_time"])
if self.sideload_param:
params["include"] = self.sideload_param
if next_page_token:
params.update(next_page_token)
return params
def check_start_time_param(self, requested_start_time: int, value: int = 1):
"""
The stream returns 400 Bad Request StartTimeTooRecent when requesting tasks 1 second before now.
Figured out during experiments that the most recent time needed for request to be successful is 3 seconds before now.
"""
return SourceZendeskIncrementalExportStream.check_start_time_param(requested_start_time, value=3)
return super().check_start_time_param(requested_start_time, value=3)
class TicketComments(SourceZendeskSupportTicketEventsExportStream):
@@ -624,7 +553,7 @@ class Groups(SourceZendeskSupportStream):
"""Groups stream: https://developer.zendesk.com/api-reference/ticketing/groups/groups/"""
class GroupMemberships(SourceZendeskSupportCursorPaginationStream):
class GroupMemberships(CursorPaginationZendeskSupportStream):
"""GroupMemberships stream: https://developer.zendesk.com/api-reference/ticketing/groups/group_memberships/"""
def request_params(
@@ -635,15 +564,10 @@ class GroupMemberships(SourceZendeskSupportCursorPaginationStream):
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
params.update({"sort_by": "asc"})
start_time = self.str2unixtime((stream_state or {}).get(self.cursor_field))
params["start_time"] = start_time if start_time else self.str2unixtime(self._start_date)
if next_page_token:
params.pop("start_time", None)
params.update(next_page_token)
return params
class SatisfactionRatings(SourceZendeskSupportCursorPaginationStream):
class SatisfactionRatings(CursorPaginationZendeskSupportStream):
"""
SatisfactionRatings stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/satisfaction_ratings/
"""
@@ -656,10 +580,6 @@ class SatisfactionRatings(SourceZendeskSupportCursorPaginationStream):
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
params.update({"sort_by": "asc"})
start_time = self.str2unixtime((stream_state or {}).get(self.cursor_field))
params["start_time"] = start_time if start_time else self.str2unixtime(self._start_date)
if next_page_token:
params["page"] = next_page_token
return params
@@ -667,36 +587,15 @@ class TicketFields(SourceZendeskSupportStream):
"""TicketFields stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_fields/"""
class TicketForms(SourceZendeskSupportCursorPaginationStream):
class TicketForms(TimeBasedPaginationZendeskSupportStream):
"""TicketForms stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_forms"""
class TicketMetrics(SourceZendeskSupportCursorPaginationStream):
class TicketMetrics(CursorPaginationZendeskSupportStream):
"""TicketMetric stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metrics/"""
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
"""
To make the Cursor Pagination to return `after_cursor` we should follow these instructions:
https://developer.zendesk.com/documentation/api-basics/pagination/paginating-through-lists-using-cursor-pagination/#enabling-cursor-pagination
"""
params = {
"start_time": self.check_stream_state(stream_state),
"page[size]": self.page_size,
}
if next_page_token:
# when cursor pagination is used, we can pass only `after` and `page size` params,
# other params should be omitted.
params.pop("start_time", None)
params.update(next_page_token)
return params
class TicketSkips(SourceZendeskSupportCursorPaginationStream):
class TicketSkips(CursorPaginationZendeskSupportStream):
"""TicketSkips stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_skips/"""
response_list_name = "skips"
@@ -704,23 +603,8 @@ class TicketSkips(SourceZendeskSupportCursorPaginationStream):
def path(self, **kwargs):
return "skips.json"
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = {
"start_time": self.check_stream_state(stream_state),
"page[size]": self.page_size,
}
if next_page_token:
params.pop("start_time", None)
params.update(next_page_token)
return params
class TicketMetricEvents(SourceZendeskSupportCursorPaginationStream):
class TicketMetricEvents(CursorPaginationZendeskSupportStream):
"""
TicketMetricEvents stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metric_events/
"""
@@ -735,7 +619,7 @@ class Macros(SourceZendeskSupportStream):
"""Macros stream: https://developer.zendesk.com/api-reference/ticketing/business-rules/macros/"""
class TicketAudits(SourceZendeskSupportCursorPaginationStream):
class TicketAudits(IncrementalZendeskSupportStream):
"""TicketAudits stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_audits/"""
# can request a maximum of 1,000 results
@@ -768,14 +652,14 @@ class TicketAudits(SourceZendeskSupportCursorPaginationStream):
return {"cursor": response.json().get("before_cursor")} if response_json.get("before_cursor") else None
class Tags(SourceZendeskSupportFullRefreshStream):
class Tags(FullRefreshZendeskSupportStream):
"""Tags stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/tags/"""
# doesn't have the 'id' field
primary_key = "name"
class SlaPolicies(SourceZendeskSupportFullRefreshStream):
class SlaPolicies(FullRefreshZendeskSupportStream):
"""SlaPolicies stream: https://developer.zendesk.com/api-reference/ticketing/business-rules/sla_policies/"""
def path(self, *args, **kwargs) -> str:
@@ -790,11 +674,11 @@ class SlaPolicies(SourceZendeskSupportFullRefreshStream):
return {}
class Brands(SourceZendeskSupportFullRefreshStream):
class Brands(FullRefreshZendeskSupportStream):
"""Brands stream: https://developer.zendesk.com/api-reference/ticketing/account-configuration/brands/#list-brands"""
class CustomRoles(SourceZendeskSupportFullRefreshStream):
class CustomRoles(FullRefreshZendeskSupportStream):
"""CustomRoles stream: https://developer.zendesk.com/api-reference/ticketing/account-configuration/custom_roles/#list-custom-roles"""
def request_params(
@@ -806,14 +690,14 @@ class CustomRoles(SourceZendeskSupportFullRefreshStream):
return {}
class Schedules(SourceZendeskSupportFullRefreshStream):
class Schedules(FullRefreshZendeskSupportStream):
"""Schedules stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/schedules/#list-schedules"""
def path(self, *args, **kwargs) -> str:
return "business_hours/schedules.json"
class AccountAttributes(SourceZendeskSupportFullRefreshStream):
class AccountAttributes(FullRefreshZendeskSupportStream):
"""Account attributes stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/skill_based_routing/#list-account-attributes"""
response_list_name = "attributes"
@@ -830,7 +714,7 @@ class AccountAttributes(SourceZendeskSupportFullRefreshStream):
return {}
class AttributeDefinitions(SourceZendeskSupportFullRefreshStream):
class AttributeDefinitions(FullRefreshZendeskSupportStream):
"""Attribute definitions stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/skill_based_routing/#list-routing-attribute-definitions"""
primary_key = None
@@ -855,7 +739,7 @@ class AttributeDefinitions(SourceZendeskSupportFullRefreshStream):
return {}
class UserSettingsStream(SourceZendeskSupportFullRefreshStream):
class UserSettingsStream(FullRefreshZendeskSupportStream):
"""Stream for checking of a request token and permissions"""
def path(self, *args, **kwargs) -> str:
@@ -885,7 +769,7 @@ class UserSettingsStream(SourceZendeskSupportFullRefreshStream):
return {}
class PostComments(SourceZendeskSupportFullRefreshStream, HttpSubStream):
class PostComments(FullRefreshZendeskSupportStream, HttpSubStream):
response_list_name = "comments"
def __init__(self, **kwargs):
@@ -903,7 +787,7 @@ class PostComments(SourceZendeskSupportFullRefreshStream, HttpSubStream):
return f"community/posts/{post_id}/comments"
class AbstractVotes(SourceZendeskSupportFullRefreshStream, ABC):
class AbstractVotes(FullRefreshZendeskSupportStream, ABC):
response_list_name = "votes"
def get_json_schema(self) -> Mapping[str, Any]:

View File

@@ -1,176 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import datetime
import json
from datetime import timedelta
from urllib.parse import urljoin
import pendulum
import pytest
import requests
import requests_mock
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from requests.exceptions import ConnectionError
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import Macros, Organizations
STREAM_ARGS: dict = {
"subdomain": "fake-subdomain",
"start_date": "2021-01-27T00:00:00Z",
"authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"),
}
@pytest.fixture()
def time_sleep_mock(mocker):
time_mock = mocker.patch("time.sleep", lambda x: None)
yield time_mock
@pytest.mark.parametrize(
"records_count,page_size,expected_futures_deque_len",
[
(1000, 100, 10),
(1000, 10, 100),
(0, 100, 0),
(1, 100, 1),
(101, 100, 2),
],
)
def test_proper_number_of_future_requests_generated(records_count, page_size, expected_futures_deque_len, time_sleep_mock):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(count_url, text=json.dumps({"count": {"value": records_count}}))
records_url = urljoin(stream.url_base, stream.path())
m.get(records_url)
stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field)
assert len(stream.future_requests) == expected_futures_deque_len
@pytest.mark.parametrize(
"records_count,page_size,expected_futures_deque_len",
[
(10, 10, 10),
(10, 100, 10),
(10, 10, 0),
],
)
def test_parse_future_records(records_count, page_size, expected_futures_deque_len, time_sleep_mock):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
expected_records = [
{f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
for i in range(records_count)
]
with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(
count_url,
text=json.dumps({"count": {"value": records_count}}),
)
records_url = urljoin(stream.url_base, stream.path())
m.get(records_url, text=json.dumps({stream.name: expected_records}))
stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field)
if not stream.future_requests and not expected_futures_deque_len:
assert len(stream.future_requests) == 0 and not expected_records
else:
response, _ = stream.future_requests[0]["future"].result()
records = list(stream.parse_response(response, stream_state=None, stream_slice=None))
assert records == expected_records
@pytest.mark.parametrize(
"records_count, page_size, expected_futures_deque_len, expected_exception",
[
(1000, 10, 100, DefaultBackoffException),
(0, 100, 0, DefaultBackoffException),
(150, 100, 2, ConnectionError),
(1, 100, 1, None),
(101, 101, 2, None),
],
)
def test_read_records(mocker, records_count, page_size, expected_futures_deque_len, expected_exception, time_sleep_mock):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
should_retry = bool(expected_exception)
expected_records_count = min(page_size, records_count) if should_retry else records_count
def record_gen(start=0, end=page_size):
for i in range(start, end):
yield {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(count_url, text=json.dumps({"count": {"value": records_count}}))
records_url = urljoin(stream.url_base, stream.path())
responses = [
{
"status_code": 429 if should_retry else 200,
"headers": {"X-Rate-Limit": "700"},
"text": "{}"
if should_retry
else json.dumps({"macros": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))}),
}
for page in range(expected_futures_deque_len)
]
m.get(records_url, responses)
if expected_exception is ConnectionError:
mocker.patch.object(requests.Session, "send", side_effect=ConnectionError())
if should_retry and expected_futures_deque_len:
with pytest.raises(expected_exception):
list(stream.read_records(sync_mode=SyncMode.full_refresh))
else:
assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == list(record_gen(end=expected_records_count))
def test_sleep_time():
page_size = 100
x_rate_limit = 10
records_count = 350
pages = 4
start = datetime.datetime.now()
stream = Organizations(**STREAM_ARGS)
stream.page_size = page_size
def record_gen(start=0, end=100):
for i in range(start, end):
yield {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(count_url, text=json.dumps({"count": {"value": records_count}}))
records_url = urljoin(stream.url_base, stream.path())
responses = [
{
"status_code": 429,
"headers": {"X-Rate-Limit": str(x_rate_limit)},
"text": "{}"
}
for _ in range(pages)
] + [
{
"status_code": 200,
"headers": {},
"text": json.dumps({"organizations": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))})
}
for page in range(pages)
]
m.get(records_url, responses)
records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
assert len(records) == records_count
end = datetime.datetime.now()
sleep_time = int(60 / x_rate_limit)
assert sleep_time - 1 <= (end - start).seconds <= sleep_time + 1

View File

@@ -23,7 +23,7 @@ from source_zendesk_support.streams import (
AccountAttributes,
AttributeDefinitions,
AuditLogs,
BaseSourceZendeskSupportStream,
BaseZendeskSupportStream,
Brands,
CustomRoles,
GroupMemberships,
@@ -37,7 +37,6 @@ from source_zendesk_support.streams import (
Schedules,
SlaPolicies,
SourceZendeskIncrementalExportStream,
SourceZendeskSupportStream,
Tags,
TicketAudits,
TicketComments,
@@ -170,19 +169,19 @@ def time_sleep_mock(mocker):
def test_str2datetime():
expected = datetime.strptime(DATETIME_STR, DATETIME_FORMAT)
output = BaseSourceZendeskSupportStream.str2datetime(DATETIME_STR)
output = BaseZendeskSupportStream.str2datetime(DATETIME_STR)
assert output == expected
def test_datetime2str():
expected = datetime.strftime(DATETIME_FROM_STR.replace(tzinfo=pytz.UTC), DATETIME_FORMAT)
output = BaseSourceZendeskSupportStream.datetime2str(DATETIME_FROM_STR)
output = BaseZendeskSupportStream.datetime2str(DATETIME_FROM_STR)
assert output == expected
def test_str2unixtime():
expected = calendar.timegm(DATETIME_FROM_STR.utctimetuple())
output = BaseSourceZendeskSupportStream.str2unixtime(DATETIME_STR)
output = BaseZendeskSupportStream.str2unixtime(DATETIME_STR)
assert output == expected
@@ -228,14 +227,6 @@ def test_parse_response(requests_mock):
assert True if entity in parsed_output else False
def test_retry(mocker):
backoff_time_mock = mocker.Mock()
with mocker.patch.object(SourceZendeskSupportStream, "backoff_time", return_value=backoff_time_mock):
stream = SourceZendeskSupportStream(**STREAM_ARGS)
stream._retry(request=mocker.Mock(), retries=0)
assert not backoff_time_mock.called, "backoff_time should not have been called"
class TestAllStreams:
@pytest.mark.parametrize(
"expected_stream_cls",
@@ -320,8 +311,8 @@ class TestAllStreams:
(TicketMetrics, "ticket_metrics"),
(TicketSkips, "skips.json"),
(TicketMetricEvents, "incremental/ticket_metric_events"),
(Tickets, "incremental/tickets.json"),
(Users, "incremental/users.json"),
(Tickets, "incremental/tickets/cursor.json"),
(Users, "incremental/users/cursor.json"),
(Brands, "brands"),
(CustomRoles, "custom_roles"),
(Schedules, "business_hours/schedules.json"),
@@ -477,14 +468,12 @@ class TestSourceZendeskSupportStream:
"stream_cls, expected",
[
(Macros, {"start_time": 1622505600}),
(Posts, {"start_time": 1622505600}),
(Organizations, {"start_time": 1622505600}),
(Groups, {"start_time": 1622505600}),
(TicketFields, {"start_time": 1622505600}),
],
ids=[
"Macros",
"Posts",
"Organizations",
"Groups",
"TicketFields",
@@ -699,11 +688,11 @@ class TestSourceZendeskSupportCursorPaginationStream:
@pytest.mark.parametrize(
"stream_cls, expected",
[
(GroupMemberships, {"sort_by": "asc", "start_time": 1622505600}),
(GroupMemberships, {'page[size]': 100, 'sort_by': 'asc', 'start_time': 1622505600}),
(TicketForms, {"start_time": 1622505600}),
(TicketMetricEvents, {"start_time": 1622505600}),
(TicketMetricEvents, {'page[size]': 100, 'start_time': 1622505600}),
(TicketAudits, {"sort_by": "created_at", "sort_order": "desc", "limit": 1000}),
(SatisfactionRatings, {"sort_by": "asc", "start_time": 1622505600}),
(SatisfactionRatings, {'page[size]': 100, 'sort_by': 'asc', 'start_time': 1622505600}),
(TicketMetrics, {"page[size]": 100, "start_time": 1622505600}),
(OrganizationMemberships, {"page[size]": 100, "start_time": 1622505600}),
(TicketSkips, {"page[size]": 100, "start_time": 1622505600}),
@@ -743,22 +732,6 @@ class TestSourceZendeskIncrementalExportStream:
result = stream.check_start_time_param(expected)
assert result == expected
@pytest.mark.parametrize(
"stream_cls, expected",
[
(Users, "incremental/users.json"),
(Tickets, "incremental/tickets.json"),
],
ids=[
"Users",
"Tickets",
],
)
def test_path(self, stream_cls, expected):
stream = stream_cls(**STREAM_ARGS)
result = stream.path()
assert result == expected
@pytest.mark.parametrize(
"stream_cls",
[
@@ -776,7 +749,7 @@ class TestSourceZendeskIncrementalExportStream:
requests_mock.get(STREAM_URL, json={stream_name: {}})
test_response = requests.get(STREAM_URL)
output = stream.next_page_token(test_response)
assert output is None
assert output == {'cursor': None}
@pytest.mark.parametrize(
"stream_cls, expected",
@@ -920,7 +893,7 @@ class TestSourceZendeskSupportTicketEventsExportStream:
def test_read_tickets_stream(requests_mock):
requests_mock.get(
"https://subdomain.zendesk.com/api/v2/incremental/tickets.json",
"https://subdomain.zendesk.com/api/v2/incremental/tickets/cursor.json",
json={
"tickets": [
{"custom_fields": []},
@@ -933,7 +906,8 @@ def test_read_tickets_stream(requests_mock):
{"id": 360023712840, "value": False},
]
},
]
],
"end_of_stream": True
},
)

View File

@@ -79,6 +79,7 @@ The Zendesk connector ideally should not run into Zendesk API limitations under
| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `0.10.2` | 2023-07-19 | [28487](https://github.com/airbytehq/airbyte/pull/28487) | Remove extra page from params |
| `0.10.1` | 2023-07-10 | [28096](https://github.com/airbytehq/airbyte/pull/28096) | Replace `offset` pagination with `cursor` pagination |
| `0.10.0` | 2023-07-06 | [27991](https://github.com/airbytehq/airbyte/pull/27991) | add streams: `PostVotes`, `PostCommentVotes` |
| `0.9.0` | 2023-07-05 | [27961](https://github.com/airbytehq/airbyte/pull/27961) | Add stream: `Post Comments` |