1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source Freshdesk: support incremental sync and add conversations and satisfaction ratings streams (#2052)

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
This commit is contained in:
Eugene K
2021-02-16 15:55:03 -03:00
committed by GitHub
parent 6a78cc327d
commit a45dbd3097
19 changed files with 1084 additions and 308 deletions

View File

@@ -2,6 +2,6 @@
"sourceDefinitionId": "ec4b9503-13cb-48ab-a4ab-6ade4be46567",
"name": "Freshdesk",
"dockerRepository": "airbyte/source-freshdesk",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-freshdesk"
}

View File

@@ -106,7 +106,7 @@
- sourceDefinitionId: ec4b9503-13cb-48ab-a4ab-6ade4be46567
name: Freshdesk
dockerRepository: airbyte/source-freshdesk
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://hub.docker.com/r/airbyte/source-freshdesk
- sourceDefinitionId: 396e4ca3-8a97-4b85-aa4e-c9d8c2d5f992
name: Braintree

View File

@@ -30,7 +30,7 @@ from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Generator, List, Mapping, Tuple
import pkg_resources
from airbyte_protocol import AirbyteStream, SyncMode
from airbyte_protocol import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode
from jsonschema import RefResolver
@@ -186,3 +186,10 @@ class BaseClient(StreamStateMixin, ABC):
@abstractmethod
def health_check(self) -> Tuple[bool, str]:
"""Check if service is up and running"""
def configured_catalog_from_client(client: BaseClient) -> ConfiguredAirbyteCatalog:
"""Helper to generate configured catalog for testing"""
catalog = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=stream) for stream in client.streams])
return catalog

View File

@@ -12,5 +12,5 @@ COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install .
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-freshdesk

View File

@@ -1,22 +0,0 @@
FROM airbyte/base-python-test:dev
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
ENV CODE_PATH="integration_tests"
ENV AIRBYTE_TEST_MODULE="integration_tests"
ENV AIRBYTE_TEST_PATH="SourceFreshdeskStandardTest"
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-freshdesk-standard-test
WORKDIR /airbyte/integration_code
COPY source_freshdesk source_freshdesk
COPY $CODE_PATH $CODE_PATH
COPY sample_files/*.json $CODE_PATH/
COPY secrets/* $CODE_PATH/
COPY source_freshdesk/*.json $CODE_PATH/
COPY setup.py ./
RUN pip install ".[tests]"
WORKDIR /airbyte

View File

@@ -1,14 +1,21 @@
plugins {
id 'airbyte-python'
id 'airbyte-docker'
id 'airbyte-source-test'
id 'airbyte-integration-test-java'
id 'airbyte-standard-source-test-file'
}
airbytePython {
moduleDirectory 'source_freshdesk'
}
airbyteStandardSourceTestFile {
// For more information on standard source tests, see https://docs.airbyte.io/contributing-to-airbyte/building-new-connector/testing-connectors
specPath = "source_freshdesk/spec.json"
configPath = "secrets/config.json"
configuredCatalogPath = "sample_files/configured_catalog.json"
}
dependencies {
implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs)
}

View File

@@ -1,27 +0,0 @@
"""
MIT License
Copyright (c) 2020 Airbyte
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from .standard_source_test import SourceFreshdeskStandardTest
__all__ = ["SourceFreshdeskStandardTest"]

View File

@@ -1,139 +0,0 @@
{
"streams": [
{
"stream": {
"stream": {
"name": "agents",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"available": {
"type": "boolean"
},
"occasional": {
"type": "boolean"
},
"id": {
"type": "integer"
},
"signature": {
"type": ["string", "null"]
},
"ticket_scope": {
"type": "integer"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"last_active_at": {
"type": ["string", "null"]
},
"available_since": {
"type": ["string", "null"]
},
"type": {
"type": "string"
},
"contact": {
"type": "object",
"properties": {
"active": {
"type": "boolean"
},
"email": {
"type": "string"
},
"job_title": {
"type": ["string", "null"]
},
"language": {
"type": "string"
},
"last_login_at": {
"type": ["string", "null"]
},
"mobile": {
"type": ["string", "integer", "null"]
},
"name": {
"type": "string"
},
"phone": {
"type": ["string", "integer", "null"]
},
"time_zone": {
"type": "string"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
}
}
}
}
},
"supported_sync_modes": ["full_refresh"]
}
}
},
{
"stream": {
"stream": {
"name": "companies",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string"
},
"description": {
"type": ["string", "null"]
},
"note": {
"type": ["string", "null"]
},
"domains": {
"type": "array",
"items": {
"type": "string"
}
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"custom_fields": {
"type": "object"
},
"health_score": {
"type": ["number", "null"]
},
"account_tier": {
"type": "string"
},
"renewal_date": {
"type": ["string", "null"]
},
"industry": {
"type": ["string", "null"]
}
}
},
"supported_sync_modes": ["full_refresh"]
}
}
}
]
}

View File

@@ -1,48 +0,0 @@
"""
MIT License
Copyright (c) 2020 Airbyte
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import json
import pkgutil
from airbyte_protocol import ConfiguredAirbyteCatalog, ConnectorSpecification
from base_python_test import StandardSourceTestIface
class SourceFreshdeskStandardTest(StandardSourceTestIface):
def get_spec(self) -> ConnectorSpecification:
raw_spec = pkgutil.get_data(self.__class__.__module__.split(".")[0], "spec.json")
return ConnectorSpecification.parse_obj(json.loads(raw_spec))
def get_config(self) -> object:
return json.loads(pkgutil.get_data(self.__class__.__module__.split(".")[0], "config.json"))
def get_catalog(self) -> ConfiguredAirbyteCatalog:
raw_catalog = pkgutil.get_data(self.__class__.__module__.split(".")[0], "configured_catalog.json")
return ConfiguredAirbyteCatalog.parse_obj(json.loads(raw_catalog))
def setup(self) -> None:
pass
def teardown(self) -> None:
pass

View File

@@ -77,8 +77,12 @@
}
}
},
"supported_sync_modes": ["full_refresh"]
}
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
},
{
"stream": {
@@ -128,8 +132,573 @@
}
}
},
"supported_sync_modes": ["full_refresh"]
}
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false,
"default_cursor_field": null
},
"sync_mode": "full_refresh",
"cursor_field": null
},
{
"stream": {
"name": "contacts",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"active": {
"type": "boolean"
},
"address": {
"type": ["string", "null"]
},
"company_id": {
"type": ["integer", "null"]
},
"description": {
"type": ["string", "null"]
},
"email": {
"type": "string"
},
"id": {
"type": "integer"
},
"job_title": {
"type": ["string", "null"]
},
"language": {
"type": "string"
},
"mobile": {
"type": ["string", "integer", "null"]
},
"name": {
"type": "string"
},
"phone": {
"type": ["string", "integer", "null"]
},
"time_zone": {
"type": "string"
},
"twitter_id": {
"type": ["integer", "null"]
},
"custom_fields": {
"type": "object"
},
"facebook_id": {
"type": ["integer", "null"]
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"csat_rating": {
"type": ["integer", "null"]
},
"preferred_source": {
"type": "string"
},
"unique_external_id": {
"type": ["integer", "null"]
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
},
{
"stream": {
"name": "conversations",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"body": {
"type": "string"
},
"body_text": {
"type": "string"
},
"id": {
"type": "integer"
},
"incoming": {
"type": "boolean"
},
"private": {
"type": "boolean"
},
"user_id": {
"type": "integer"
},
"support_email": {
"type": ["string", "null"]
},
"source": {
"type": "integer"
},
"category": {
"type": "integer"
},
"to_emails": {
"type": ["array", "null"]
},
"from_email": {
"type": ["string", "null"]
},
"cc_emails": {
"type": ["array", "null"]
},
"bcc_emails": {
"type": ["array", "null"]
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"attachments": {
"type": ["array", "null"]
},
"ticket_id": {
"type": "integer"
},
"source_additional_info": {
"type": ["object", "null"]
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
},
{
"stream": {
"name": "groups",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string"
},
"description": {
"type": "string"
},
"escalate_to": {
"type": ["integer", "null"]
},
"unassigned_for": {
"type": ["string", "null"]
},
"business_hour_id": {
"type": ["integer", "null"]
},
"group_type": {
"type": "string"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"auto_ticket_assign": {
"type": "boolean"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
},
{
"stream": {
"name": "roles",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string"
},
"description": {
"type": "string"
},
"default": {
"type": "boolean"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
},
{
"stream": {
"name": "satisfaction_ratings",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"survey_id": {
"type": "integer"
},
"user_id": {
"type": "integer"
},
"agent_id": {
"type": "integer"
},
"feedback": {
"type": "string"
},
"group_id": {
"type": "integer"
},
"ticket_id": {
"type": "integer"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"ratings": {
"type": "object"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
},
{
"stream": {
"name": "skills",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string"
},
"rank": {
"type": "integer"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"agents": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "integer"
}
}
}
},
"match_type": {
"type": "string"
},
"conditions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"resource_type": {
"type": "string"
},
"field_name": {
"type": "string"
},
"operator": {
"type": "string"
},
"value": {
"type": "array",
"items": {
"type": "string"
}
}
}
}
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
},
{
"stream": {
"name": "surveys",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"title": {
"type": "string"
},
"active": {
"type": "boolean"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"questions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"label": {
"type": "string"
},
"accepted_ratings": {
"type": "array",
"items": {
"type": "integer"
}
},
"default": {
"type": "boolean"
}
}
}
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
},
{
"stream": {
"name": "tickets",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"cc_emails": {
"type": "array"
},
"fwd_emails": {
"type": "array"
},
"reply_cc_emails": {
"type": "array"
},
"ticket_cc_emails": {
"type": "array"
},
"fr_escalated": {
"type": "boolean"
},
"spam": {
"type": "boolean"
},
"email_config_id": {
"type": ["integer", "null"]
},
"group_id": {
"type": ["integer", "null"]
},
"priority": {
"type": "integer"
},
"requester_id": {
"type": "integer"
},
"responder_id": {
"type": "integer"
},
"source": {
"type": "integer"
},
"company_id": {
"type": ["integer", "null"]
},
"status": {
"type": "integer"
},
"subject": {
"type": "string"
},
"association_type": {
"type": ["integer", "null"]
},
"to_emails": {
"type": ["array", "null"],
"items": {
"type": "string"
}
},
"product_id": {
"type": ["integer", "null"]
},
"id": {
"type": "integer"
},
"type": {
"type": "string"
},
"due_by": {
"type": "string"
},
"fr_due_by": {
"type": "string"
},
"is_escalated": {
"type": "boolean"
},
"custom_fields": {
"type": "object"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"associated_tickets_count": {
"type": ["integer", "null"]
},
"tags": {
"type": "array"
},
"nr_due_by": {
"type": ["string", "null"]
},
"nr_escalated": {
"type": "boolean"
},
"description": {
"type": "string"
},
"description_text": {
"type": "string"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
},
{
"stream": {
"name": "time_entries",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"billable": {
"type": "boolean"
},
"note": {
"type": "string"
},
"id": {
"type": "integer"
},
"timer_running": {
"type": "boolean"
},
"agent_id": {
"type": "integer"
},
"ticket_id": {
"type": "integer"
},
"company_id": {
"type": ["integer", "null"]
},
"time_spent": {
"type": "string"
},
"executed_at": {
"type": "string"
},
"start_time": {
"type": "string"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": null
},
"sync_mode": "incremental",
"cursor_field": null
}
]
}

View File

@@ -0,0 +1,35 @@
{
"agents": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"companies": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"contacts": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"conversations": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"groups": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"roles": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"satisfaction_ratings": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"skills": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"surveys": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"tickets": {
"updated_at": "2020-04-21T13:34:26-0700"
},
"time_entries": {
"updated_at": "2020-04-21T13:34:26-0700"
}
}

View File

@@ -33,11 +33,10 @@ setup(
install_requires=[
"airbyte-protocol",
"base-python",
"python_freshdesk==1.3.7",
"requests==2.25.1",
"pendulum==1.2.0",
],
package_data={"": ["*.json", "schemas/*.json"]},
setup_requires=["pytest-runner"],
tests_require=["pytest"],
extras_require={
"tests": ["airbyte_python_test", "pytest"],
},

View File

@@ -0,0 +1,298 @@
"""
MIT License
Copyright (c) 2020 Airbyte
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import time
from abc import ABC, abstractmethod
from functools import partial
from typing import Any, Callable, Iterator, Mapping, MutableMapping, Sequence, Optional
import pendulum
import requests
from base_python.entrypoint import logger # FIXME (Eugene K): use standard logger
from requests import HTTPError
class FreshdeskError(HTTPError):
"""
Base error class.
Subclassing HTTPError to avoid breaking existing code that expects only HTTPErrors.
"""
class FreshdeskBadRequest(FreshdeskError):
"""Most 40X and 501 status codes"""
class FreshdeskUnauthorized(FreshdeskError):
"""401 Unauthorized"""
class FreshdeskAccessDenied(FreshdeskError):
"""403 Forbidden"""
class FreshdeskNotFound(FreshdeskError):
"""404"""
class FreshdeskRateLimited(FreshdeskError):
"""429 Rate Limit Reached"""
class FreshdeskServerError(FreshdeskError):
"""50X errors"""
class API:
def __init__(self, domain: str, api_key: str, verify: bool = True, proxies: MutableMapping[str, str] = None):
"""Basic HTTP interface to read from endpoints"""
self._api_prefix = f"https://{domain.rstrip('/')}/api/v2/"
self._session = requests.Session()
self._session.auth = (api_key, "unused_with_api_key")
self._session.verify = verify
self._session.proxies = proxies
self._session.headers = {"Content-Type": "application/json"}
if domain.find("freshdesk.com") < 0:
raise AttributeError("Freshdesk v2 API works only via Freshdesk domains and not via custom CNAMEs")
@staticmethod
def _parse_and_handle_errors(req):
try:
j = req.json()
except ValueError:
j = {}
error_message = "Freshdesk Request Failed"
if "errors" in j:
error_message = "{}: {}".format(j.get("description"), j.get("errors"))
# API docs don't mention this clearly, but in the case of bad credentials the returned JSON will have a
# "message" field at the top level
elif "message" in j:
error_message = j["message"]
if req.status_code == 400:
raise FreshdeskBadRequest(error_message)
elif req.status_code == 401:
raise FreshdeskUnauthorized(error_message)
elif req.status_code == 403:
raise FreshdeskAccessDenied(error_message)
elif req.status_code == 404:
raise FreshdeskNotFound(error_message)
elif req.status_code == 429:
raise FreshdeskRateLimited(
"429 Rate Limit Exceeded: API rate-limit has been reached until {} seconds. See "
"http://freshdesk.com/api#ratelimit".format(req.headers.get("Retry-After"))
)
elif 500 <= req.status_code < 600:
raise FreshdeskServerError("{}: Server Error".format(req.status_code))
# Catch any other errors
try:
req.raise_for_status()
except HTTPError as e:
raise FreshdeskError("{}: {}".format(e, j))
return j
def get(self, url: str, params: Mapping = None):
"""Wrapper around request.get() to use the API prefix. Returns a JSON response."""
for _ in range(10):
params = params or {}
response = self._session.get(self._api_prefix + url, params=params)
try:
return self._parse_and_handle_errors(response)
except FreshdeskRateLimited:
retry_after = int(response.headers["Retry-After"])
logger.info(f"Rate limit reached. Sleeping for {retry_after} seconds")
time.sleep(retry_after + 1) # extra second to cover any fractions of second
raise Exception("Max retry limit reached")
class StreamAPI(ABC):
"""Basic stream API that allows to iterate over entities"""
result_return_limit = 100 # maximum value
maximum_page = 500 # see https://developers.freshdesk.com/api/#best_practices
def __init__(self, api: API, *args, **kwargs):
super().__init__(*args, **kwargs)
self._api = api
@abstractmethod
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
"""Read using getter"""
params = params or {}
for page in range(1, self.maximum_page):
batch = list(getter(params={**params, "per_page": self.result_return_limit, "page": page}))
yield from batch
if len(batch) < self.result_return_limit:
break
class IncrementalStreamAPI(StreamAPI, ABC):
state_pk = "updated_at" # Name of the field associated with the state
state_filter = "updated_since" # Name of filter that corresponds to the state
@property
def state(self) -> Optional[Mapping[str, Any]]:
"""Current state, if wasn't set return None"""
if self._state:
return {self.state_pk: str(self._state)}
return None
@state.setter
def state(self, value: Mapping[str, Any]):
self._state = pendulum.parse(value[self.state_pk])
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._state: Optional[Mapping[str, Any]] = None
def _state_params(self) -> Mapping[str, Any]:
"""Build query parameters responsible for current state"""
if self._state:
return {self.state_filter: self._state}
return {}
@property
def name(self):
"""Name of the stream"""
stream_name = self.__class__.__name__
if stream_name.endswith("API"):
stream_name = stream_name[:-3]
return stream_name
def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
"""Read using getter, patched to respect current state"""
params = params or {}
params = {**params, **self._state_params()}
latest_cursor = None
for record in super().read(getter, params):
cursor = pendulum.parse(record[self.state_pk])
# filter out records older then state
if self._state and self._state >= cursor:
continue
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record
if latest_cursor:
logger.info(f"Advancing bookmark for {self.name} stream from {self._state} to {latest_cursor}")
self._state = max(latest_cursor, self._state) if self._state else latest_cursor
class ClientIncrementalStreamAPI(IncrementalStreamAPI, ABC):
"""Incremental stream that don't have native API support, i.e we filter on the client side only"""
def _state_params(self) -> Mapping[str, Any]:
"""Build query parameters responsible for current state, override because API doesn't support this"""
return {}
class AgentsAPI(ClientIncrementalStreamAPI):
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
yield from self.read(partial(self._api.get, url="agents"))
class CompaniesAPI(ClientIncrementalStreamAPI):
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
yield from self.read(partial(self._api.get, url="companies"))
class ContactsAPI(IncrementalStreamAPI):
state_filter = "_updated_since"
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
yield from self.read(partial(self._api.get, url="contacts"))
class GroupsAPI(ClientIncrementalStreamAPI):
"""Only users with admin privileges can access the following APIs."""
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
yield from self.read(partial(self._api.get, url="groups"))
class RolesAPI(ClientIncrementalStreamAPI):
"""Only users with admin privileges can access the following APIs."""
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
yield from self.read(partial(self._api.get, url="roles"))
class SkillsAPI(ClientIncrementalStreamAPI):
"""Only users with admin privileges can access the following APIs."""
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
yield from self.read(partial(self._api.get, url="skills"))
class SurveysAPI(ClientIncrementalStreamAPI):
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
yield from self.read(partial(self._api.get, url="surveys"))
class TicketsAPI(IncrementalStreamAPI):
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
params = {"include": "description"}
yield from self.read(partial(self._api.get, url="tickets"), params=params)
class TimeEntriesAPI(ClientIncrementalStreamAPI):
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
yield from self.read(partial(self._api.get, url="time_entries"))
class ConversationsAPI(ClientIncrementalStreamAPI):
"""Notes and Replies"""
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
tickets = TicketsAPI(self._api)
if self.state:
tickets.state = self.state
for ticket in tickets.list():
url = f"tickets/{ticket['id']}/conversations"
yield from self.read(partial(self._api.get, url=url))
class SatisfactionRatingsAPI(ClientIncrementalStreamAPI):
"""Surveys satisfaction replies"""
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""Iterate over entities"""
yield from self.read(partial(self._api.get, url="surveys/satisfaction_ratings"))

View File

@@ -22,49 +22,63 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from functools import partial
from typing import Callable, Dict, Mapping, Tuple
from typing import Any, Mapping, Tuple
from base_python import BaseClient
from freshdesk.api import API
from freshdesk.v2.errors import FreshdeskError
DEFAULT_ITEMS_PER_PAGE = 100
def paginator(request: Callable, params: Dict, page: int = None, per_page: int = DEFAULT_ITEMS_PER_PAGE, **kwargs):
"""Split requests in multiple batches and return records as generator"""
read_all = page is None
page = page or 1
while True:
rows = request(params={**params, "page": page, "per_page": per_page})
yield from rows
if len(rows) < per_page or not read_all:
break
page += 1
from .api import (
API,
AgentsAPI,
CompaniesAPI,
ContactsAPI,
ConversationsAPI,
FreshdeskError,
GroupsAPI,
RolesAPI,
SatisfactionRatingsAPI,
SkillsAPI,
SurveysAPI,
TicketsAPI,
TimeEntriesAPI,
)
class Client(BaseClient):
ENTITIES = ["agents", "contacts", "companies", "groups", "roles", "skills", "surveys", "tickets", "time_entries"]
def __init__(self, domain, api_key):
self._client = API(domain=domain, api_key=api_key, version=2)
self._api = API(domain=domain, api_key=api_key)
self._apis = {
"agents": AgentsAPI(self._api),
"companies": CompaniesAPI(self._api),
"contacts": ContactsAPI(self._api),
"conversations": ConversationsAPI(self._api),
"groups": GroupsAPI(self._api),
"roles": RolesAPI(self._api),
"skills": SkillsAPI(self._api),
"surveys": SurveysAPI(self._api),
"tickets": TicketsAPI(self._api),
"time_entries": TimeEntriesAPI(self._api),
"satisfaction_ratings": SatisfactionRatingsAPI(self._api),
}
super().__init__()
def list(self, name, **kwargs):
# for now exact matching
url = name
request = partial(self._client._get, url=url)
yield from paginator(request, params={}, **kwargs)
def settings(self, **kwargs):
def settings(self):
url = "settings/helpdesk"
request = partial(self._client._get, url=url)
return list(paginator(request, params={}, **kwargs))
return self._api.get(url)
def stream_has_state(self, name: str) -> bool:
"""Tell if stream supports incremental sync"""
return hasattr(self._apis[name], "state")
def get_stream_state(self, name: str) -> Any:
"""Get state of stream with corresponding name"""
return self._apis[name].state
def set_stream_state(self, name: str, state: Any):
"""Set state of stream with corresponding name"""
self._apis[name].state = state
def _enumerate_methods(self) -> Mapping[str, callable]:
return {entity: partial(self.list, name=entity) for entity in self.ENTITIES}
return {name: api.list for name, api in self._apis.items()}
def health_check(self) -> Tuple[bool, str]:
alive = True

View File

@@ -62,27 +62,5 @@
"unique_external_id": {
"type": ["integer", "null"]
}
},
"required": [
"active",
"address",
"company_id",
"created_at",
"csat_rating",
"custom_fields",
"description",
"email",
"facebook_id",
"id",
"job_title",
"language",
"mobile",
"name",
"phone",
"preferred_source",
"time_zone",
"twitter_id",
"unique_external_id",
"updated_at"
]
}
}

View File

@@ -0,0 +1,60 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"body": {
"type": "string"
},
"body_text": {
"type": "string"
},
"id": {
"type": "integer"
},
"incoming": {
"type": "boolean"
},
"private": {
"type": "boolean"
},
"user_id": {
"type": "integer"
},
"support_email": {
"type": ["string", "null"]
},
"source": {
"type": "integer"
},
"category": {
"type": "integer"
},
"to_emails": {
"type": ["array", "null"]
},
"from_email": {
"type": ["string", "null"]
},
"cc_emails": {
"type": ["array", "null"]
},
"bcc_emails": {
"type": ["array", "null"]
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"attachments": {
"type": ["array", "null"]
},
"ticket_id": {
"type": "integer"
},
"source_additional_info": {
"type": ["object", "null"]
}
}
}

View File

@@ -0,0 +1,36 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"survey_id": {
"type": "integer"
},
"user_id": {
"type": "integer"
},
"agent_id": {
"type": "integer"
},
"feedback": {
"type": "string"
},
"group_id": {
"type": "integer"
},
"ticket_id": {
"type": "integer"
},
"created_at": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"ratings": {
"type": "object"
}
}
}

View File

@@ -94,6 +94,12 @@
},
"nr_escalated": {
"type": "boolean"
},
"description": {
"type": "string"
},
"description_text": {
"type": "string"
}
}
}

View File

@@ -2,7 +2,10 @@
## Overview
The Freshdesk source supports Full Refresh syncs. That is, every time a sync is run, Airbyte will copy all rows in the tables and columns you set up for replication into the destination in a new table.
The Freshdesk supports full refresh and incremental sync. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run.
There are two types of incremental sync:
* server level (native) - when API supports filter on specific columns that Airbyte use to track changes (`updated_at`, `created_at`, etc)
* client level - when API doesn't support filter and Airbyte performs filtration on its side.
### Output schema
@@ -10,12 +13,14 @@ Several output streams are available from this source:
* [Agents](https://developers.freshdesk.com/api/#agents)
* [Companies](https://developers.freshdesk.com/api/#companies)
* [Contacts](https://developers.freshdesk.com/api/#contacts)
* [Contacts](https://developers.freshdesk.com/api/#contacts) (Native Incremental Sync)
* [Conversations](https://developers.freshdesk.com/api/#conversations)
* [Groups](https://developers.freshdesk.com/api/#groups)
* [Roles](https://developers.freshdesk.com/api/#roles)
* [Satisfaction Ratings](https://developers.freshdesk.com/api/#satisfaction-ratings)
* [Skills](https://developers.freshdesk.com/api/#skills)
* [Surveys](https://developers.freshdesk.com/api/#surveys)
* [Tickets](https://developers.freshdesk.com/api/#tickets)
* [Tickets](https://developers.freshdesk.com/api/#tickets) (Native Incremental Sync)
* [Time Entries](https://developers.freshdesk.com/api/#time-entries)
If there are more endpoints you'd like Airbyte to support, please [create an issue.](https://github.com/airbytehq/airbyte/issues/new/choose)
@@ -25,8 +30,7 @@ If there are more endpoints you'd like Airbyte to support, please [create an iss
| Feature | Supported? |
| :--- | :--- |
| Full Refresh Sync | Yes |
| Incremental Sync | Coming soon |
| Replicate Incremental Deletes | Coming soon |
| Incremental Sync | Yes |
| SSL connection | Yes |
### Performance considerations
@@ -43,4 +47,3 @@ The Freshdesk connector should not run into Freshdesk API limitations under norm
### Setup guide
Please read [How to find your API key](https://support.freshdesk.com/support/solutions/articles/215517).