1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🎉Source Zenloop: migrate to lowcode (#19624)

* Migrate to lowcode

* Updated PR

* Updated configured catalog

* Updated to review

* Updated after review

* Add expected records

* Updated docs

* Updated docs

* Fix to linter check

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Serhii Lazebnyi
2022-12-09 23:56:16 +01:00
committed by GitHub
parent aa0d171e73
commit 17c767aac1
17 changed files with 545 additions and 572 deletions

View File

@@ -1895,10 +1895,10 @@
- name: Zenloop
sourceDefinitionId: f1e4c7f6-db5c-4035-981f-d35ab4998794
dockerRepository: airbyte/source-zenloop
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.com/integrations/sources/zenloop
sourceType: api
releaseStage: alpha
releaseStage: beta
- sourceDefinitionId: cdaf146a-9b75-49fd-9dd2-9d64a0bb4781
name: Sentry
dockerRepository: airbyte/source-sentry

View File

@@ -16433,7 +16433,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-zenloop:0.1.3"
- dockerImage: "airbyte/source-zenloop:0.1.4"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/zenloop"
connectionSpecification:

View File

@@ -34,5 +34,5 @@ COPY source_zenloop ./source_zenloop
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-zenloop

View File

@@ -82,7 +82,7 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
pip install .'[tests]'
```
### Unit Tests
To run unit tests locally, from the connector directory run:

View File

@@ -13,31 +13,16 @@ tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_answers.json"
empty_streams: []
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_surveys.json"
empty_streams: []
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_answers_survey_group.json"
empty_streams: []
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_survey_groups.json"
empty_streams: []
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_properties.json"
empty_streams: []
configured_catalog_path: "integration_tests/configured_catalog.json"
expect_records:
path: "integration_tests/expected_records.txt"
extra_fields: no
exact_order: no
extra_records: yes
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_answers.json"
future_state_path: "integration_tests/abnormal_state.json"
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_answers_survey_group.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_survey_groups.json"
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_surveys.json"
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_properties.json"
configured_catalog_path: "integration_tests/configured_catalog.json"

View File

@@ -0,0 +1,49 @@
{
"streams": [
{
"stream": {
"name": "answers",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "surveys",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "survey_groups",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "answers_survey_group",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "properties",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -0,0 +1,10 @@
{"stream": "answers", "data": {"additional_answers": [], "additional_questions": {}, "comment": "Airbyte is amazing", "email": "sajarin@airbyte.io", "id": "TVRrMlptWmtPRGd0TWprMFl5MDBZbUkwTFRrd01XWXROVE5pWldNMU56WXpNbVV4", "identity": "sajarin@airbyte.io", "identity_type": "email", "inserted_at": "2022-09-19T08:46:22.373163Z", "labels": [], "labels_with_keywords": {}, "metatags": {}, "name": "Sajarin Dider", "property_ids": [348043522, 348043523], "recipient_id": "TVRFMU1qTXhaV1F0Wm1Oa01TMDBOMkUzTFRsbE1ETXRObVEzTjJNd1lqWmpZMkpp", "score": 10, "score_type": "promoter", "sentiment": "positive", "sentiment_per_label_name": {}, "translated_comment": null}, "emitted_at": 1670592045869}
{"stream": "answers", "data": {"additional_answers": [], "additional_questions": {}, "comment": "I love airbyte", "email": "integration-test@airbyte.io", "id": "TXpCaU5tRmtNekl0T1dKa09TMDBPRGd5TFdKbFlURXROalF6WkRkbFlqVmhaR0kw", "identity": "integration-test@airbyte.io", "identity_type": "email", "inserted_at": "2022-09-19T08:46:14.348616Z", "labels": [], "labels_with_keywords": {}, "metatags": {}, "name": "Test Account", "property_ids": [348044048, 348043523], "recipient_id": "TkdOaU5HRTVOMlV0WWpReE1TMDBNRGxrTFdJeU9UUXROVGcxTVRCbE5UVXhaakpo", "score": 10, "score_type": "promoter", "sentiment": "positive", "sentiment_per_label_name": {}, "translated_comment": null}, "emitted_at": 1670592045874}
{"stream": "surveys", "data": {"inserted_at": "2022-09-19T08:36:26Z", "public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "status": "active", "title": "New Survey 2022-09-19 08:36:26.262267"}, "emitted_at": 1670592046346}
{"stream": "survey_groups", "data": {"inserted_at": "2021-11-09T13:06:59Z", "name": "All Surveys & Survey Groups", "public_hash_id": "WmpGa1ltTmlZbVl0TWpGa015MDBOemhsTFdKbE1XSXRaV05sTXpnMk9USmlOalZp", "surveys": [{"inserted_at": "2022-09-19T08:36:26Z", "public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "status": "active", "title": "New Survey 2022-09-19 08:36:26.262267"}]}, "emitted_at": 1670592046589}
{"stream": "survey_groups", "data": {"inserted_at": "2022-09-19T08:48:21Z", "name": "Test Group", "public_hash_id": "TnpKaE1UVmhObUV0WkdFME15MDBZMkUyTFRsalpXRXROamt5TkRVd05EZzVOelEy", "surveys": [{"inserted_at": "2022-09-19T08:36:26Z", "public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "status": "active", "title": "New Survey 2022-09-19 08:36:26.262267"}]}, "emitted_at": 1670592046592}
{"stream": "answers_survey_group", "data": {"additional_questions": {}, "comment": "Airbyte is amazing", "email": "sajarin@airbyte.io", "id": "TVRrMlptWmtPRGd0TWprMFl5MDBZbUkwTFRrd01XWXROVE5pWldNMU56WXpNbVV4", "identity": "sajarin@airbyte.io", "identity_type": "email", "inserted_at": "2022-09-19T08:46:22.373163Z", "labels": [], "labels_with_keywords": {}, "metatags": {}, "name": "Sajarin Dider", "property_ids": [348043522, 348043523], "recipient_id": "TVRFMU1qTXhaV1F0Wm1Oa01TMDBOMkUzTFRsbE1ETXRObVEzTjJNd1lqWmpZMkpp", "score": 10, "score_type": "promoter", "sentiment": "positive", "sentiment_per_label_name": {}, "survey_public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "translated_comment": null}, "emitted_at": 1670592046844}
{"stream": "answers_survey_group", "data": {"additional_questions": {}, "comment": "I love airbyte", "email": "integration-test@airbyte.io", "id": "TXpCaU5tRmtNekl0T1dKa09TMDBPRGd5TFdKbFlURXROalF6WkRkbFlqVmhaR0kw", "identity": "integration-test@airbyte.io", "identity_type": "email", "inserted_at": "2022-09-19T08:46:14.348616Z", "labels": [], "labels_with_keywords": {}, "metatags": {}, "name": "Test Account", "property_ids": [348044048, 348043523], "recipient_id": "TkdOaU5HRTVOMlV0WWpReE1TMDBNRGxrTFdJeU9UUXROVGcxTVRCbE5UVXhaakpo", "score": 10, "score_type": "promoter", "sentiment": "positive", "sentiment_per_label_name": {}, "survey_public_hash_id": "WlRBek9ESTFNREl0TmpJMk9DMDBOR0V4TFRoaE16UXRZV1UyWW1SbU56WTNPVGRs", "translated_comment": null}, "emitted_at": 1670592046851}
{"stream": "properties", "data": {"id": "WldSa1pUTmtaVGN0TWprNU5pMDBPVEUyTFdGbE9XSXRPVEkwTVRjM1lqUXlNMlU0", "name": "gender", "value": "agender"}, "emitted_at": 1670592047292}
{"stream": "properties", "data": {"id": "WWpsa1pUUmlaVFl0Tmpoa055MDBZV0l3TFRsbE9USXRaV1pqWVdFMFpESTFNR1E0", "name": "gender", "value": "male"}, "emitted_at": 1670592047295}
{"stream": "properties", "data": {"id": "TldKak1qQTNaRFF0WWpVeU5DMDBaVEpqTFdJNFkyVXRPVFJqTldFNU9Ea3haRFps", "name": "country", "value": "US"}, "emitted_at": 1670592047297}

View File

@@ -0,0 +1,34 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Iterable
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import SubstreamSlicer
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
@dataclass
class ZenloopSubstreamSlicer(SubstreamSlicer):
def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Iterable[StreamSlice]:
"""
config_parent_field : parent field name in config
Use parent id's as stream state value if it specified in config or
create stream_slices according SubstreamSlicer logic.
"""
config = self._options.get("config")
parent_field = self._options.get("config_parent_field")
custom_stream_state_value = config.get(parent_field)
if not custom_stream_state_value:
yield from super().stream_slices(sync_mode, stream_state)
else:
for parent_stream_config in self.parent_stream_configs:
stream_state_field = parent_stream_config.stream_slice_field or None
yield {stream_state_field: custom_stream_state_value, "parent_slice": {}}

View File

@@ -2,253 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
import math
from abc import ABC
from datetime import datetime, timedelta
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
WARNING: Do not modify this file.
"""
class ZenloopStream(HttpStream, ABC):
url_base = "https://api.zenloop.com/v1/"
extra_params = None
has_date_param = False
def __init__(self, api_token: str, date_from: Optional[str], survey_id, survey_group_id: Optional[str], **kwargs):
super().__init__(authenticator=api_token)
self.api_token = api_token
self.date_from = date_from or datetime.today().strftime("%Y-%m-%d")
self.survey_id = survey_id or None
self.survey_group_id = survey_group_id or None
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
decoded_response = response.json()
page = decoded_response["meta"]["page"]
per_page = decoded_response["meta"]["per_page"]
total = decoded_response["meta"]["total"]
if page < math.ceil(total / per_page):
return {"page": page + 1}
else:
return None
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
if self.has_date_param:
params = {"date_from": self.date_from}
else:
params = {}
if self.extra_params:
params.update(self.extra_params)
if next_page_token:
params.update(**next_page_token)
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
yield response_json
class ChildStreamMixin:
parent_stream_class: Optional[ZenloopStream] = None
def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
# determine if parent_stream_class is Surveys or SurveyGroups
if self.parent_stream_class.__name__ == "Surveys":
public_hash_id = self.survey_id
else:
public_hash_id = self.survey_group_id
# loop through all survey_id's if None was provided
# return nothing otherwise
if not public_hash_id:
for item in self.parent_stream_class(
api_token=self.api_token, date_from=self.date_from, survey_id=self.survey_id, survey_group_id=self.survey_group_id
).read_records(sync_mode=sync_mode):
# set date_from to most current cursor_field or date_from if not incremental
if stream_state:
date_from = stream_state[self.cursor_field]
else:
date_from = self.date_from
yield {"survey_slice": item["public_hash_id"], "date_from": date_from}
else:
yield None
class IncrementalZenloopStream(ZenloopStream, ABC):
# checkpoint stream reads after 1000 records.
state_checkpoint_interval = 1000
cursor_field = "inserted_at"
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
# latest_record has objects in answers
if latest_record:
# add 1 second to not pull latest_record again
latest_record_date = (
datetime.strptime(latest_record[self.cursor_field], "%Y-%m-%dT%H:%M:%S.%fZ") + timedelta(seconds=1)
).isoformat() + str("Z")
else:
latest_record_date = ""
max_record = max(latest_record_date, current_stream_state.get(self.cursor_field, ""))
return {self.cursor_field: max_record}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
if stream_state:
# if looped through all slices take its date_from parameter
# else no survey_id or survey_group_id provided -> take cursor_field
if stream_slice:
params["date_from"] = stream_slice["date_from"]
else:
params["date_from"] = stream_state[self.cursor_field]
return params
class Surveys(ZenloopStream):
# API Doc: https://docs.zenloop.com/reference#get-list-of-surveys
primary_key = None
has_date_param = False
extra_params = {"page": "1"}
use_cache = True
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return "surveys"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
yield from response_json.get("surveys", [])
class Answers(ChildStreamMixin, IncrementalZenloopStream):
# API Doc: https://docs.zenloop.com/reference#get-answers
primary_key = "id"
has_date_param = True
parent_stream_class = Surveys
extra_params = {
"page": "1",
"order_type": "desc",
"order_by": "inserted_at",
"date_shortcut": "custom",
"date_to": datetime.today().strftime("%Y-%m-%d"),
}
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
# take optional survey_id if entered
if self.survey_id:
return f"surveys/{self.survey_id}/answers"
# slice all survey_id's if nothing provided
else:
return f"surveys/{stream_slice['survey_slice']}/answers"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
# select answers and surveys to be able to link answer to a survey
yield from response_json.get("answers", [])
class Properties(ChildStreamMixin, ZenloopStream):
# API Doc: https://docs.zenloop.com/reference/get-list-of-properties
primary_key = "id"
has_date_param = False
extra_params = {"page": "1"}
parent_stream_class = Surveys
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
# take optional survey_id if entered
if self.survey_id:
return f"surveys/{self.survey_id}/properties"
# slice all survey_id's if nothing provided
else:
return f"surveys/{stream_slice['survey_slice']}/properties"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
# select properties and surveys to be able to link properties to a survey
yield from response_json.get("properties", [])
class SurveyGroups(ZenloopStream):
# API Doc: https://docs.zenloop.com/reference#get-list-of-survey-groups
primary_key = None
has_date_param = False
extra_params = {"page": "1"}
use_cache = True
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return "survey_groups"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
yield from response_json.get("survey_groups", [])
class AnswersSurveyGroup(ChildStreamMixin, IncrementalZenloopStream):
# API Doc: https://docs.zenloop.com/reference#get-answers-for-survey-group
primary_key = "id"
has_date_param = True
parent_stream_class = SurveyGroups
extra_params = {
"page": "1",
"order_type": "desc",
"order_by": "inserted_at",
"date_shortcut": "custom",
"date_to": datetime.today().strftime("%Y-%m-%d"),
}
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
# take optional survey_group_id if entered
if self.survey_group_id:
return f"survey_groups/{self.survey_group_id}/answers"
# slice all survey_group_id's if nothing provided
else:
return f"survey_groups/{stream_slice['survey_slice']}/answers"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
# select answers and surveys to be able to link answer to a survey
yield from response_json.get("answers", [])
class SourceZenloop(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
try:
authenticator = TokenAuthenticator(config["api_token"])
url = f"{ZenloopStream.url_base}surveys"
session = requests.get(url, headers=authenticator.get_auth_header())
session.raise_for_status()
return True, None
except Exception as error:
return False, f"Unable to connect to Zenloop API with the provided credentials - {error}"
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
args = {
"api_token": TokenAuthenticator(token=config["api_token"]),
"date_from": config["date_from"],
"survey_id": config.get("survey_id"),
"survey_group_id": config.get("survey_group_id"),
}
return [Surveys(**args), Answers(**args), Properties(**args), SurveyGroups(**args), AnswersSurveyGroup(**args)]
# Declarative Source
class SourceZenloop(YamlDeclarativeSource):
def __init__(self):
super().__init__(path_to_yaml="zenloop.yaml")

View File

@@ -0,0 +1,229 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import math
from abc import ABC
from datetime import datetime, timedelta
from typing import Any, Iterable, Mapping, MutableMapping, Optional
import requests
from airbyte_cdk.sources.streams.http import HttpStream
class ZenloopStream(HttpStream, ABC):
url_base = "https://api.zenloop.com/v1/"
extra_params = None
has_date_param = False
def __init__(self, api_token: str, date_from: Optional[str], survey_id, survey_group_id: Optional[str], **kwargs):
super().__init__(authenticator=api_token)
self.api_token = api_token
self.date_from = date_from or datetime.today().strftime("%Y-%m-%d")
self.survey_id = survey_id or None
self.survey_group_id = survey_group_id or None
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
decoded_response = response.json()
page = decoded_response["meta"]["page"]
per_page = decoded_response["meta"]["per_page"]
total = decoded_response["meta"]["total"]
if page < math.ceil(total / per_page):
return {"page": page + 1}
else:
return None
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
if self.has_date_param:
params = {"date_from": self.date_from}
else:
params = {}
if self.extra_params:
params.update(self.extra_params)
if next_page_token:
params.update(**next_page_token)
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
yield response_json
class ChildStreamMixin:
parent_stream_class: Optional[ZenloopStream] = None
def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
# determine if parent_stream_class is Surveys or SurveyGroups
if self.parent_stream_class.__name__ == "Surveys":
public_hash_id = self.survey_id
else:
public_hash_id = self.survey_group_id
# loop through all survey_id's if None was provided
# return nothing otherwise
if not public_hash_id:
for item in self.parent_stream_class(
api_token=self.api_token, date_from=self.date_from, survey_id=self.survey_id, survey_group_id=self.survey_group_id
).read_records(sync_mode=sync_mode):
# set date_from to most current cursor_field or date_from if not incremental
if stream_state:
date_from = stream_state[self.cursor_field]
else:
date_from = self.date_from
yield {"survey_slice": item["public_hash_id"], "date_from": date_from}
else:
yield None
class IncrementalZenloopStream(ZenloopStream, ABC):
# checkpoint stream reads after 1000 records.
state_checkpoint_interval = 1000
cursor_field = "inserted_at"
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
# latest_record has objects in answers
if latest_record:
# add 1 second to not pull latest_record again
latest_record_date = (
datetime.strptime(latest_record[self.cursor_field], "%Y-%m-%dT%H:%M:%S.%fZ") + timedelta(seconds=1)
).isoformat() + str("Z")
else:
latest_record_date = ""
max_record = max(latest_record_date, current_stream_state.get(self.cursor_field, ""))
return {self.cursor_field: max_record}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
if stream_state:
# if looped through all slices take its date_from parameter
# else no survey_id or survey_group_id provided -> take cursor_field
if stream_slice:
params["date_from"] = stream_slice["date_from"]
else:
params["date_from"] = stream_state[self.cursor_field]
return params
class Surveys(ZenloopStream):
# API Doc: https://docs.zenloop.com/reference#get-list-of-surveys
primary_key = None
has_date_param = False
extra_params = {"page": "1"}
use_cache = True
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return "surveys"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
yield from response_json.get("surveys", [])
class Answers(ChildStreamMixin, IncrementalZenloopStream):
# API Doc: https://docs.zenloop.com/reference#get-answers
primary_key = "id"
has_date_param = True
parent_stream_class = Surveys
extra_params = {
"page": "1",
"order_type": "desc",
"order_by": "inserted_at",
"date_shortcut": "custom",
"date_to": datetime.today().strftime("%Y-%m-%d"),
}
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
# take optional survey_id if entered
if self.survey_id:
return f"surveys/{self.survey_id}/answers"
# slice all survey_id's if nothing provided
else:
return f"surveys/{stream_slice['survey_slice']}/answers"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
# select answers and surveys to be able to link answer to a survey
yield from response_json.get("answers", [])
class Properties(ChildStreamMixin, ZenloopStream):
# API Doc: https://docs.zenloop.com/reference/get-list-of-properties
primary_key = "id"
has_date_param = False
extra_params = {"page": "1"}
parent_stream_class = Surveys
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
# take optional survey_id if entered
if self.survey_id:
return f"surveys/{self.survey_id}/properties"
# slice all survey_id's if nothing provided
else:
return f"surveys/{stream_slice['survey_slice']}/properties"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
# select properties and surveys to be able to link properties to a survey
yield from response_json.get("properties", [])
class SurveyGroups(ZenloopStream):
# API Doc: https://docs.zenloop.com/reference#get-list-of-survey-groups
primary_key = None
has_date_param = False
extra_params = {"page": "1"}
use_cache = True
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return "survey_groups"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
yield from response_json.get("survey_groups", [])
class AnswersSurveyGroup(ChildStreamMixin, IncrementalZenloopStream):
# API Doc: https://docs.zenloop.com/reference#get-answers-for-survey-group
primary_key = "id"
has_date_param = True
parent_stream_class = SurveyGroups
extra_params = {
"page": "1",
"order_type": "desc",
"order_by": "inserted_at",
"date_shortcut": "custom",
"date_to": datetime.today().strftime("%Y-%m-%d"),
}
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
# take optional survey_group_id if entered
if self.survey_group_id:
return f"survey_groups/{self.survey_group_id}/answers"
# slice all survey_group_id's if nothing provided
else:
return f"survey_groups/{stream_slice['survey_slice']}/answers"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
# select answers and surveys to be able to link answer to a survey
yield from response_json.get("answers", [])

View File

@@ -0,0 +1,149 @@
version: "0.1.0"
definitions:
selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_pointer: ["{{ options['data_field'] }}"]
requester:
type: HttpRequester
name: "{{ options['name'] }}"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_token'] }}"
retriever:
type: SimpleRetriever
$options:
url_base: "https://api.zenloop.com/v1/"
name: "{{ options['name'] }}"
record_selector:
$ref: "*ref(definitions.selector)"
paginator:
type: DefaultPaginator
pagination_strategy:
type: PageIncrement
page_size: 50
start_from_page: 1
page_size_option:
field_name: "per_page"
inject_into: "request_parameter"
page_token_option:
inject_into: "path"
base_stream:
retriever:
$ref: "*ref(definitions.retriever)"
requester:
$ref: "*ref(definitions.requester)"
incremental_base_stream:
$ref: "*ref(definitions.base_stream)"
stream_cursor_field: "inserted_at"
retriever:
$ref: "*ref(definitions.retriever)"
requester:
$ref: "*ref(definitions.requester)"
request_options_provider:
request_parameters:
order_type: "desc"
order_by: "inserted_at"
date_shortcut: "custom"
surveys:
$ref: "*ref(definitions.base_stream)"
$options:
name: "surveys"
path: "surveys"
data_field: "surveys"
surveys_slicer:
class_name: source_zenloop.components.ZenloopSubstreamSlicer
$options:
config_parent_field: "survey_id"
parent_stream_configs:
- stream: "*ref(definitions.surveys)"
parent_key: public_hash_id
stream_slice_field: id
survey_groups:
$ref: "*ref(definitions.base_stream)"
$options:
name: "survey_groups"
path: "survey_groups"
data_field: "survey_groups"
survey_groups_slicer:
class_name: source_zenloop.components.ZenloopSubstreamSlicer
$options:
config_parent_field: "survey_group_id"
parent_stream_configs:
- stream: "*ref(definitions.survey_groups)"
parent_key: public_hash_id
stream_slice_field: id
date_slicer:
type: DatetimeStreamSlicer
cursor_field: "inserted_at"
datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ"
start_datetime:
datetime: "{{ config['date_from'] }}"
datetime_format: "%Y-%m-%d"
end_datetime:
datetime: "{{ today_utc() }}"
datetime_format: "%Y-%m-%d"
step: "1m"
end_time_option:
field_name: "date_to"
inject_into: "request_parameter"
start_time_option:
field_name: "date_from"
inject_into: "request_parameter"
properties:
$ref: "*ref(definitions.base_stream)"
$options:
name: "properties"
data_field: "properties"
retriever:
$ref: "*ref(definitions.retriever)"
requester:
$ref: "*ref(definitions.requester)"
path: "{{ 'surveys/' + config['survey_id'] + '/properties' if config['survey_id'] else 'surveys/' + stream_slice.id + '/properties' }}"
stream_slicer:
$ref: "*ref(definitions.surveys_slicer)"
answers:
$ref: "*ref(definitions.incremental_base_stream)"
$options:
name: "answers"
data_field: "answers"
retriever:
$ref: "*ref(definitions.retriever)"
requester:
$ref: "*ref(definitions.incremental_base_stream.retriever.requester)"
path: "{{ 'surveys/' + stream_slice.id + '/answers' }}"
stream_slicer:
type: CartesianProductStreamSlicer
stream_slicers:
- "*ref(definitions.surveys_slicer)"
- "*ref(definitions.date_slicer)"
answers_survey_group:
$ref: "*ref(definitions.incremental_base_stream)"
$options:
name: "answers_survey_group"
data_field: "answers"
retriever:
$ref: "*ref(definitions.retriever)"
requester:
$ref: "*ref(definitions.incremental_base_stream.retriever.requester)"
path: "{{ 'survey_groups/' + stream_slice.id + '/answers' }}"
stream_slicer:
type: CartesianProductStreamSlicer
stream_slicers:
- "*ref(definitions.survey_groups_slicer)"
- "*ref(definitions.date_slicer)"
streams:
- "*ref(definitions.surveys)"
- "*ref(definitions.survey_groups)"
- "*ref(definitions.properties)"
- "*ref(definitions.answers)"
- "*ref(definitions.answers_survey_group)"
check:
type: CheckStream
stream_names: ["surveys"]

View File

@@ -1,3 +0,0 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

View File

@@ -1,10 +0,0 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from pytest import fixture
@fixture
def config():
return {"api_token": "<Your API Key>", "date_from": "2021-07-01", "survey_id": "<survey_id>", "survey_group_id": "<survey_group_id>"}

View File

@@ -1,101 +0,0 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from unittest.mock import MagicMock
from airbyte_cdk.models import SyncMode
from pytest import fixture
from source_zenloop.source import Answers, AnswersSurveyGroup, IncrementalZenloopStream
@fixture
def patch_incremental_base_class(mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(IncrementalZenloopStream, "path", "v0/example_endpoint")
mocker.patch.object(IncrementalZenloopStream, "primary_key", "test_primary_key")
mocker.patch.object(IncrementalZenloopStream, "__abstractmethods__", set())
def test_cursor_field(patch_incremental_base_class, config):
stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"])
expected_cursor_field = "inserted_at"
assert stream.cursor_field == expected_cursor_field
def test_get_updated_state(patch_incremental_base_class, config):
stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"])
expected_cursor_field = "inserted_at"
inputs = {
"current_stream_state": {expected_cursor_field: "2021-07-24T03:30:30.038549Z"},
"latest_record": {"inserted_at": "2021-10-20T03:30:30.038549Z"},
}
expected_state = {expected_cursor_field: "2021-10-20T03:30:31.038549Z"}
assert stream.get_updated_state(**inputs) == expected_state
def test_stream_slices(patch_incremental_base_class, config):
expected_cursor_field = "inserted_at"
inputs = {
"sync_mode": SyncMode.incremental,
"cursor_field": expected_cursor_field,
"stream_state": {expected_cursor_field: "2021-10-20T03:30:30Z"},
}
expected_stream_slice = [None]
stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"])
assert list(stream.stream_slices(**inputs)) == expected_stream_slice
stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], None)
assert list(stream.stream_slices(**inputs)) == expected_stream_slice
stream = IncrementalZenloopStream(config["api_token"], config["date_from"], None, config["survey_group_id"])
assert list(stream.stream_slices(**inputs)) == expected_stream_slice
def test_supports_incremental(patch_incremental_base_class, mocker, config):
mocker.patch.object(IncrementalZenloopStream, "cursor_field", "dummy_field")
stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"])
assert stream.supports_incremental
def test_source_defined_cursor(patch_incremental_base_class, config):
stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"])
assert stream.source_defined_cursor
def test_stream_checkpoint_interval(patch_incremental_base_class, config):
stream = IncrementalZenloopStream(config["api_token"], config["date_from"], config["survey_id"], config["survey_group_id"])
expected_checkpoint_interval = 1000
assert stream.state_checkpoint_interval == expected_checkpoint_interval
def test_parse_response_answers(patch_incremental_base_class, config):
stream = Answers(**config)
response = MagicMock()
response.json.return_value = {"answers": [{"id": 123, "name": "John Doe"}]}
inputs = {"response": response}
expected_parsed_object = {"id": 123, "name": "John Doe"}
assert next(stream.parse_response(**inputs)) == expected_parsed_object
def test_parse_response_answers_survey_groups(patch_incremental_base_class, config):
stream = AnswersSurveyGroup(**config)
response = MagicMock()
response.json.return_value = {"answers": [{"id": 123, "name": "John Doe"}]}
inputs = {"response": response}
expected_parsed_object = {"id": 123, "name": "John Doe"}
assert next(stream.parse_response(**inputs)) == expected_parsed_object
def test_surveys_path(config):
stream = Answers(**config)
expected = "surveys/<survey_id>/answers"
assert stream.path() == expected
def test_survey_groups_path(config):
stream = AnswersSurveyGroup(**config)
expected = "survey_groups/<survey_group_id>/answers"
assert stream.path() == expected

View File

@@ -1,38 +0,0 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from unittest.mock import MagicMock
import responses
from source_zenloop.source import SourceZenloop
@responses.activate
def test_check_connection_success(mocker, config):
responses.add(
responses.GET,
"https://api.zenloop.com/v1/surveys",
)
source = SourceZenloop()
logger_mock = MagicMock()
assert source.check_connection(logger_mock, config) == (True, None)
@responses.activate
def test_check_connection_fail(mocker, config):
responses.add(responses.GET, "https://api.zenloop.com/v1/surveys", json={"error": "Unauthorized"}, status=401)
source = SourceZenloop()
logger_mock = MagicMock()
assert source.check_connection(logger_mock, config) == (
False,
"Unable to connect to Zenloop API with the provided credentials - 401 Client Error: Unauthorized for url: https://api.zenloop.com/v1/surveys",
)
def test_streams(mocker):
source = SourceZenloop()
config_mock = MagicMock()
streams = source.streams(config_mock)
expected_streams_number = 5
assert len(streams) == expected_streams_number

View File

@@ -1,113 +0,0 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from unittest.mock import MagicMock
import pytest
from source_zenloop.source import Properties, SurveyGroups, Surveys, ZenloopStream
@pytest.fixture
def patch_base_class(mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(ZenloopStream, "path", "v0/example_endpoint")
mocker.patch.object(ZenloopStream, "primary_key", "test_primary_key")
mocker.patch.object(ZenloopStream, "__abstractmethods__", set())
def test_request_params(patch_base_class, config):
stream = ZenloopStream(**config)
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": {"page": "1"}}
expected_params = {"page": "1"}
assert stream.request_params(**inputs) == expected_params
def test_next_page_token(patch_base_class, config):
stream = ZenloopStream(**config)
inputs = {"response": MagicMock()}
inputs["response"].json.return_value = {"meta": {"page": 1, "per_page": 12, "total": 8}}
expected_token = None
assert stream.next_page_token(**inputs) == expected_token
def test_parse_response(patch_base_class, config):
stream = ZenloopStream(**config)
response = MagicMock()
response.json.return_value = {"answers": [{"id": 123, "name": "John Doe"}]}
inputs = {"response": response}
expected_parsed_object = {"answers": [{"id": 123, "name": "John Doe"}]}
assert next(stream.parse_response(**inputs)) == expected_parsed_object
def test_parse_response_surveys(patch_base_class, config):
stream = Surveys(**config)
response = MagicMock()
response.json.return_value = {"surveys": [{"id": 123, "name": "John Doe"}]}
inputs = {"response": response}
expected_parsed_object = {"id": 123, "name": "John Doe"}
assert next(stream.parse_response(**inputs)) == expected_parsed_object
def test_parse_response_survey_groups(patch_base_class, config):
stream = SurveyGroups(**config)
response = MagicMock()
response.json.return_value = {"survey_groups": [{"id": 123, "name": "John Doe"}]}
inputs = {"response": response}
expected_parsed_object = {"id": 123, "name": "John Doe"}
assert next(stream.parse_response(**inputs)) == expected_parsed_object
def test_surveys_path(config):
stream = Surveys(**config)
expected = "surveys"
assert stream.path() == expected
def test_survey_groups_path(config):
stream = SurveyGroups(**config)
expected = "survey_groups"
assert stream.path() == expected
def test_properties_path(config):
stream = Properties(**config)
expected = "surveys/<survey_id>/properties"
assert stream.path() == expected
def test_request_headers(patch_base_class, config):
stream = ZenloopStream(**config)
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
expected_headers = {}
assert stream.request_headers(**inputs) == expected_headers
def test_http_method(patch_base_class, config):
stream = ZenloopStream(**config)
expected_method = "GET"
assert stream.http_method == expected_method
@pytest.mark.parametrize(
("http_status", "should_retry"),
[
(HTTPStatus.OK, False),
(HTTPStatus.BAD_REQUEST, False),
(HTTPStatus.TOO_MANY_REQUESTS, True),
(HTTPStatus.INTERNAL_SERVER_ERROR, True),
],
)
def test_should_retry(patch_base_class, config, http_status, should_retry):
response_mock = MagicMock()
response_mock.status_code = http_status
stream = ZenloopStream(**config)
assert stream.should_retry(response_mock) == should_retry
def test_backoff_time(patch_base_class, config):
response_mock = MagicMock()
stream = ZenloopStream(**config)
expected_backoff_time = None
assert stream.backoff_time(response_mock) == expected_backoff_time

View File

@@ -1,10 +1,47 @@
# Zenloop
## Sync overview
This page contains the setup guide and reference information for the Zenloop source connector.
This source can sync data for the [Zenloop API](https://docs.zenloop.com/reference). It supports both Full Refresh and Incremental syncs for Answer endpoints. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run.
## Prerequisites
<!-- env:cloud -->
**For Airbyte Cloud:**
### Output schema
1. [Log into your Airbyte Cloud](https://cloud.airbyte.io/workspaces).
2. Click **Sources** and then click **+ New source**.
3. On the Set up the source page, select **Zenloop** from the Source type dropdown.
4. Enter the name for the Zenloop connector.
5. Enter your **API token**
6. For **Date from**, enter the date in YYYY-MM-DDTHH:mm:ssZ format. The data added on and after this date will be replicated.
7. Enter your **Survey ID**. Zenloop Survey ID. Can be found <a href="https://app.zenloop.com/settings/api">here</a>. Leave empty to pull answers from all surveys. (Optional)
8. Enter your **Survey Group ID**. Zenloop Survey Group ID. Can be found by pulling All Survey Groups via SurveyGroups stream. Leave empty to pull answers from all survey groups. (Optional)
9. Click **Set up source**.
<!-- /env:cloud -->
<!-- env:oss -->
**For Airbyte Open Source:**
1. Navigate to the Airbyte Open Source dashboard.
2. Click **Sources** and then click **+ New source**.
3. On the Set up the source page, select **Zenloop** from the Source type dropdown.
4. Enter the name for the Zenloop connector.
5. Enter your **API token**
6. For **Date from**, enter the date in YYYY-MM-DDTHH:mm:ssZ format. The data added on and after this date will be replicated.
7. Enter your **Survey ID**. Zenloop Survey ID. Can be found <a href="https://app.zenloop.com/settings/api">here</a>. Leave empty to pull answers from all surveys. (Optional)
8. Enter your **Survey Group ID**. Zenloop Survey Group ID. Can be found by pulling All Survey Groups via SurveyGroups stream. Leave empty to pull answers from all survey groups. (Optional)
9. Click **Set up source**.
<!-- /env:oss -->
## Supported sync modes
The Zenloop source connector supports the following [sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-sync-modes):
| Feature | Supported?\(Yes/No\) |
| :---------------- | :------------------- |
| Full Refresh Sync | Yes |
| Incremental Sync | Yes |
| Namespaces | No |
## Supported Streams
This Source is capable of syncing the following core Streams:
@@ -16,44 +53,26 @@ This Source is capable of syncing the following core Streams:
The `Answers`, `AnswersSurveyGroup` and `Properties` stream respectively have an optional survey_id parameter that can be set by filling the `public_hash_id` field of the connector configuration. If not provided answers for all surveys (groups) will be pulled.
### Data type mapping
| Integration Type | Airbyte Type | Notes |
| :--------------- | :----------- | :---- |
| `string` | `string` | |
| `integer` | `integer` | |
| `number` | `number` | |
| `array` | `array` | |
| `object` | `object` | |
### Features
| Feature | Supported?\(Yes/No\) | Notes |
| :---------------- | :------------------- | :---- |
| Full Refresh Sync | Yes | |
| Incremental Sync | Yes | |
| Namespaces | No | |
### Performance considerations
## Performance considerations
The Zenloop connector should not run into Zenloop API limitations under normal usage. Please [create an issue](https://github.com/airbytehq/airbyte/issues) if you see any rate limit issues that are not automatically retried successfully.
## Getting started
## Data type map
### Requirements
* Zenloop account
* Zenloop API token
### Setup guide
Please register on Zenloop and retrieve your API token [here](https://app.zenloop.com/settings/api).
| Integration Type | Airbyte Type |
| :--------------- | :----------- |
| `string` | `string` |
| `integer` | `integer` |
| `number` | `number` |
| `array` | `array` |
| `object` | `object` |
## Changelog
| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :---------------------------- |
| 0.1.3 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream states. |
| 0.1.4 | 2022-11-18 | [19624](https://github.com/airbytehq/airbyte/pull/19624) | Migrate to low code |
| 0.1.3 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream states |
| 0.1.2 | 2022-08-22 | [15843](https://github.com/airbytehq/airbyte/pull/15843) | Adds Properties stream |
| 0.1.1 | 2021-10-26 | [8299](https://github.com/airbytehq/airbyte/pull/8299) | Fix missing seed files |
| 0.1.0 | 2021-10-26 | [7380](https://github.com/airbytehq/airbyte/pull/7380) | Initial Release |