🎉 Source Iterable: add new streams (#5915)
* Add new streams * Upd requirements versions * Upd docs * Remove tests for the templates stream * Upd csv field parsing * Fix file permissions * Set dependency version * Refactor * Merge * Upd licence * Add bulk metrics retrieving * Actualize schema
This commit is contained in:
@@ -12,5 +12,5 @@ RUN pip install .
|
||||
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
|
||||
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
|
||||
|
||||
LABEL io.airbyte.version=0.1.7
|
||||
LABEL io.airbyte.version=0.1.8
|
||||
LABEL io.airbyte.name=airbyte/source-iterable
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
# See [Source Acceptance Tests](https://docs.airbyte.io/contributing-to-airbyte/building-new-connector/source-acceptance-tests.md)
|
||||
# for more information about how to configure these tests
|
||||
connector_image: airbyte/source-iterable:dev
|
||||
tests:
|
||||
spec:
|
||||
- spec_path: "source_iterable/spec.json"
|
||||
connection:
|
||||
- config_path: "secrets/config.json"
|
||||
status: "succeed"
|
||||
- config_path: "integration_tests/invalid_config.json"
|
||||
status: "failed"
|
||||
discovery:
|
||||
- config_path: "secrets/config.json"
|
||||
basic_read:
|
||||
- config_path: "secrets/config.json"
|
||||
configured_catalog_path: "integration_tests/catalog.json"
|
||||
empty_streams: ['email_send_skip', 'email_complaint']
|
||||
full_refresh:
|
||||
- config_path: "secrets/config.json"
|
||||
configured_catalog_path: "integration_tests/catalog.json"
|
||||
incremental:
|
||||
- config_path: "secrets/config.json"
|
||||
configured_catalog_path: "integration_tests/configured_catalog.json"
|
||||
future_state_path: "integration_tests/abnormal_state.json"
|
||||
15
airbyte-integrations/connectors/source-iterable/acceptance-test-docker.sh
Executable file
15
airbyte-integrations/connectors/source-iterable/acceptance-test-docker.sh
Executable file
@@ -0,0 +1,15 @@
|
||||
#!/usr/bin/env sh
|
||||
|
||||
# Build latest connector image
|
||||
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
|
||||
|
||||
# Pull latest acctest image
|
||||
docker pull airbyte/source-acceptance-test:latest
|
||||
|
||||
# Run
|
||||
docker run --rm -it \
|
||||
-v /var/run/docker.sock:/var/run/docker.sock \
|
||||
-v /tmp:/tmp \
|
||||
-v $(pwd):/test_input \
|
||||
airbyte/source-acceptance-test \
|
||||
--acceptance-test-config /test_input
|
||||
@@ -1,20 +1,13 @@
|
||||
plugins {
|
||||
id 'airbyte-python'
|
||||
id 'airbyte-docker'
|
||||
id 'airbyte-standard-source-test-file'
|
||||
id 'airbyte-source-acceptance-test'
|
||||
}
|
||||
|
||||
airbytePython {
|
||||
moduleDirectory 'source_iterable'
|
||||
}
|
||||
|
||||
airbyteStandardSourceTestFile {
|
||||
specPath = "source_iterable/spec.json"
|
||||
configPath = "secrets/config.json"
|
||||
configuredCatalogPath = "sample_files/configured_catalog.json"
|
||||
}
|
||||
|
||||
|
||||
dependencies {
|
||||
implementation files(project(':airbyte-integrations:bases:base-standard-source-test-file').airbyteDocker.outputs)
|
||||
implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"users": {
|
||||
"profileUpdatedAt": "2121-04-14T17:00:41+00:00"
|
||||
},
|
||||
"email_unsubscribe": {
|
||||
"createdAt": "2121-04-14T17:00:44+00:00"
|
||||
},
|
||||
"email_subscribe": {
|
||||
"createdAt": "2121-04-14T16:52:45+00:00"
|
||||
},
|
||||
"email_send": {
|
||||
"createdAt": "2121-04-14T16:25:56+00:00"
|
||||
},
|
||||
"email_open": {
|
||||
"createdAt": "2121-04-14T17:00:11+00:00"
|
||||
},
|
||||
"email_click": {
|
||||
"createdAt": "2121-04-14T16:55:14+00:00"
|
||||
},
|
||||
"email_bounce": {
|
||||
"createdAt": "2121-04-14T16:29:39+00:00"
|
||||
},
|
||||
"templates": {
|
||||
"createdAt": "2121-04-14T16:23:30.700000+00:00"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
#
|
||||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import pytest
|
||||
|
||||
pytest_plugins = ("source_acceptance_test.plugin",)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def connector_setup():
|
||||
"""This fixture is a placeholder for external resources that acceptance test might require."""
|
||||
yield
|
||||
@@ -0,0 +1,186 @@
|
||||
{
|
||||
"streams": [
|
||||
{
|
||||
"stream": {
|
||||
"name": "campaigns",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "campaigns_metrics",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "channels",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_bounce",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_click",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_complaint",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_open",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_send",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_send_skip",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_subscribe",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_unsubscribe",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "events",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "lists",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "list_users",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "message_types",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "metadata",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "templates",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "users",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["profileUpdatedAt"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "append"
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,175 @@
|
||||
{
|
||||
"streams": [
|
||||
{
|
||||
"stream": {
|
||||
"name": "campaigns",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "campaigns_metrics",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "channels",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_bounce",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_click",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_complaint",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_open",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_send",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_send_skip",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_subscribe",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "email_unsubscribe",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["createdAt"]
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "append"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "events",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "lists",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "list_users",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "message_types",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "metadata",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh"]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"destination_sync_mode": "overwrite"
|
||||
},
|
||||
{
|
||||
"stream": {
|
||||
"name": "users",
|
||||
"json_schema": {},
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": true,
|
||||
"default_cursor_field": ["profileUpdatedAt"]
|
||||
},
|
||||
"sync_mode": "incremental",
|
||||
"destination_sync_mode": "append"
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"api_key": "test-api-key",
|
||||
"start_date": "2020-12-12T00:00:00Z"
|
||||
}
|
||||
@@ -1 +1,3 @@
|
||||
# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies.
|
||||
-e ../../bases/source-acceptance-test
|
||||
-e .
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,4 +1,4 @@
|
||||
{
|
||||
"api_key": "<your api_key>>",
|
||||
"api_key": "<your api_key>",
|
||||
"start_date": "2021-04-01T00:00:00Z"
|
||||
}
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
{
|
||||
"users": {
|
||||
"profileUpdatedAt": "2021-04-14T17:00:41+00:00"
|
||||
},
|
||||
"email_unsubscribe": {
|
||||
"createdAt": "2021-04-14T17:00:44+00:00"
|
||||
},
|
||||
"email_subscribe": {
|
||||
"createdAt": "2021-04-14T16:52:45+00:00"
|
||||
},
|
||||
"email_send": {
|
||||
"createdAt": "2021-04-14T16:25:56+00:00"
|
||||
},
|
||||
"email_open": {
|
||||
"createdAt": "2021-04-14T17:00:11+00:00"
|
||||
},
|
||||
"email_click": {
|
||||
"createdAt": "2021-04-14T16:55:14+00:00"
|
||||
},
|
||||
"email_bounce": {
|
||||
"createdAt": "2021-04-14T16:29:39+00:00"
|
||||
},
|
||||
"templates": {
|
||||
"createdAt": "2021-04-14T16:23:30.700000+00:00"
|
||||
}
|
||||
}
|
||||
@@ -6,12 +6,12 @@
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
MAIN_REQUIREMENTS = [
|
||||
"airbyte-cdk==0.1.3",
|
||||
"pendulum==1.2.0",
|
||||
"requests==2.25.1",
|
||||
"airbyte-cdk~=0.1",
|
||||
"pendulum~=1.2",
|
||||
"requests~=2.25",
|
||||
]
|
||||
|
||||
TEST_REQUIREMENTS = ["pytest==6.1.2"]
|
||||
TEST_REQUIREMENTS = ["pytest~=6.1"]
|
||||
|
||||
|
||||
setup(
|
||||
|
||||
161
airbyte-integrations/connectors/source-iterable/source_iterable/api.py
Normal file → Executable file
161
airbyte-integrations/connectors/source-iterable/source_iterable/api.py
Normal file → Executable file
@@ -2,23 +2,29 @@
|
||||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
|
||||
import csv
|
||||
import json
|
||||
import urllib.parse as urlparse
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union
|
||||
from io import StringIO
|
||||
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
|
||||
|
||||
import pendulum
|
||||
import requests
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources.streams.http import HttpStream
|
||||
|
||||
EVENT_ROWS_LIMIT = 200
|
||||
CAMPAIGNS_PER_REQUEST = 20
|
||||
|
||||
|
||||
class IterableStream(HttpStream, ABC):
|
||||
url_base = "https://api.iterable.com/api/"
|
||||
|
||||
# Hardcode the value because it is not returned from the API
|
||||
BACKOFF_TIME_CONSTANT = 10.0
|
||||
# define date-time fields with potential wrong format
|
||||
|
||||
url_base = "https://api.iterable.com/api/"
|
||||
primary_key = "id"
|
||||
|
||||
def __init__(self, api_key, **kwargs):
|
||||
@@ -46,16 +52,24 @@ class IterableStream(HttpStream, ABC):
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
response_json = response.json()
|
||||
yield from response_json.get(self.data_field, [])
|
||||
records = response_json.get(self.data_field, [])
|
||||
|
||||
for record in records:
|
||||
yield record
|
||||
|
||||
|
||||
class IterableExportStream(IterableStream, ABC):
|
||||
|
||||
cursor_field = "createdAt"
|
||||
primary_key = None
|
||||
|
||||
def __init__(self, start_date, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._start_date = pendulum.parse(start_date)
|
||||
self.stream_params = {"dataTypeName": self.data_field}
|
||||
|
||||
cursor_field = "createdAt"
|
||||
def path(self, **kwargs) -> str:
|
||||
return "/export/data.json"
|
||||
|
||||
@staticmethod
|
||||
def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime:
|
||||
@@ -72,10 +86,14 @@ class IterableExportStream(IterableStream, ABC):
|
||||
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
|
||||
and returning an updated state object.
|
||||
"""
|
||||
latest_benchmark = self._field_to_datetime(latest_record[self.cursor_field])
|
||||
latest_benchmark = latest_record[self.cursor_field]
|
||||
if current_stream_state.get(self.cursor_field):
|
||||
return {self.cursor_field: str(max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field])))}
|
||||
return {self.cursor_field: str(latest_benchmark)}
|
||||
return {
|
||||
self.cursor_field: max(
|
||||
latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field])
|
||||
).to_datetime_string()
|
||||
}
|
||||
return {self.cursor_field: latest_benchmark.to_datetime_string()}
|
||||
|
||||
def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
|
||||
|
||||
@@ -90,12 +108,11 @@ class IterableExportStream(IterableStream, ABC):
|
||||
)
|
||||
return params
|
||||
|
||||
def path(self, **kwargs) -> str:
|
||||
return "/export/data.json"
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
for obj in response.iter_lines():
|
||||
yield json.loads(obj)
|
||||
record = json.loads(obj)
|
||||
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
|
||||
yield record
|
||||
|
||||
|
||||
class Lists(IterableStream):
|
||||
@@ -106,17 +123,18 @@ class Lists(IterableStream):
|
||||
|
||||
|
||||
class ListUsers(IterableStream):
|
||||
primary_key = "listId"
|
||||
data_field = "getUsers"
|
||||
name = "list_users"
|
||||
|
||||
def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
|
||||
return f"lists/{self.data_field}?listId={stream_slice['list_id']}"
|
||||
|
||||
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
|
||||
lists = Lists(api_key=self._api_key)
|
||||
for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)):
|
||||
yield {"list_id": list_record["id"]}
|
||||
|
||||
def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
|
||||
return f"lists/{self.data_field}?listId={stream_slice['list_id']}"
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
list_id = self._get_list_id(response.url)
|
||||
for user in response.iter_lines():
|
||||
@@ -138,6 +156,83 @@ class Campaigns(IterableStream):
|
||||
return "campaigns"
|
||||
|
||||
|
||||
class CampaignsMetrics(IterableStream):
|
||||
primary_key = None
|
||||
data_field = None
|
||||
|
||||
def __init__(self, api_key: str, start_date: str):
|
||||
"""
|
||||
https://api.iterable.com/api/docs#campaigns_metrics
|
||||
"""
|
||||
super().__init__(api_key)
|
||||
self.start_date = start_date
|
||||
|
||||
def path(self, **kwargs) -> str:
|
||||
return "campaigns/metrics"
|
||||
|
||||
def request_params(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> MutableMapping[str, Any]:
|
||||
params = super().request_params(**kwargs)
|
||||
params["campaignId"] = stream_slice.get("campaign_ids")
|
||||
params["startDateTime"] = self.start_date
|
||||
|
||||
return params
|
||||
|
||||
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
|
||||
lists = Campaigns(api_key=self._api_key)
|
||||
campaign_ids = []
|
||||
for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)):
|
||||
campaign_ids.append(list_record["id"])
|
||||
|
||||
if len(campaign_ids) == CAMPAIGNS_PER_REQUEST:
|
||||
yield {"campaign_ids": campaign_ids}
|
||||
campaign_ids = []
|
||||
|
||||
if campaign_ids:
|
||||
yield {"campaign_ids": campaign_ids}
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
content = response.content.decode()
|
||||
records = self._parse_csv_string_to_dict(content)
|
||||
|
||||
for record in records:
|
||||
yield {"data": record}
|
||||
|
||||
@staticmethod
|
||||
def _parse_csv_string_to_dict(csv_string: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Parse a response with a csv type to dict object
|
||||
Example:
|
||||
csv_string = "a,b,c,d
|
||||
1,2,,3
|
||||
6,,1,2"
|
||||
|
||||
output = [{"a": 1, "b": 2, "d": 3},
|
||||
{"a": 6, "c": 1, "d": 2}]
|
||||
|
||||
|
||||
:param csv_string: API endpoint response with csv format
|
||||
:return: parsed API response
|
||||
|
||||
"""
|
||||
|
||||
reader = csv.DictReader(StringIO(csv_string), delimiter=",")
|
||||
result = []
|
||||
|
||||
for row in reader:
|
||||
for key, value in row.items():
|
||||
if value == "":
|
||||
continue
|
||||
try:
|
||||
row[key] = int(value)
|
||||
except ValueError:
|
||||
row[key] = float(value)
|
||||
row = {k: v for k, v in row.items() if v != ""}
|
||||
|
||||
result.append(row)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class Channels(IterableStream):
|
||||
data_field = "channels"
|
||||
|
||||
@@ -185,6 +280,36 @@ class EmailUnsubscribe(IterableExportStream):
|
||||
data_field = "emailUnsubscribe"
|
||||
|
||||
|
||||
class Events(IterableStream):
|
||||
"""
|
||||
https://api.iterable.com/api/docs#events_User_events
|
||||
"""
|
||||
primary_key = None
|
||||
data_field = "events"
|
||||
page_size = EVENT_ROWS_LIMIT
|
||||
|
||||
def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
|
||||
return f"events/{stream_slice['email']}"
|
||||
|
||||
def request_params(self, **kwargs) -> MutableMapping[str, Any]:
|
||||
params = super().request_params(**kwargs)
|
||||
params["limit"] = self.page_size
|
||||
|
||||
return params
|
||||
|
||||
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
|
||||
lists = ListUsers(api_key=self._api_key)
|
||||
stream_slices = lists.stream_slices()
|
||||
|
||||
for stream_slice in stream_slices:
|
||||
for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh), stream_slice=stream_slice):
|
||||
yield {"email": list_record["email"]}
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
for record in super().parse_response(response, **kwargs):
|
||||
yield {"data": record}
|
||||
|
||||
|
||||
class MessageTypes(IterableStream):
|
||||
data_field = "messageTypes"
|
||||
name = "message_types"
|
||||
@@ -194,6 +319,7 @@ class MessageTypes(IterableStream):
|
||||
|
||||
|
||||
class Metadata(IterableStream):
|
||||
primary_key = None
|
||||
data_field = "results"
|
||||
|
||||
def path(self, **kwargs) -> str:
|
||||
@@ -216,7 +342,10 @@ class Templates(IterableExportStream):
|
||||
|
||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||
response_json = response.json()
|
||||
yield from response_json.get(self.data_field, [])
|
||||
records = response_json.get(self.data_field, [])
|
||||
|
||||
for record in records:
|
||||
yield record
|
||||
|
||||
|
||||
class Users(IterableExportStream):
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"properties": {
|
||||
"data": {
|
||||
"type": ["null", "object"]
|
||||
}
|
||||
},
|
||||
"type": ["null", "object"]
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"properties": {
|
||||
"data": {
|
||||
"type": ["null", "object"]
|
||||
}
|
||||
},
|
||||
"type": ["null", "object"]
|
||||
}
|
||||
@@ -1,67 +1,7 @@
|
||||
{
|
||||
"properties": {
|
||||
"table": {
|
||||
"name": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"key": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"size": {
|
||||
"type": ["null", "integer"]
|
||||
},
|
||||
"lastModified": {
|
||||
"type": ["null", "integer"]
|
||||
},
|
||||
"value": {
|
||||
"type": ["null", "object"],
|
||||
"properties": {
|
||||
"inventory": {
|
||||
"type": ["null", "integer"]
|
||||
},
|
||||
"name": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"sku": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"url": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"description": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"price": {
|
||||
"type": ["null", "integer"]
|
||||
},
|
||||
"product_type": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"compare_at_price": {
|
||||
"type": ["null", "number"]
|
||||
},
|
||||
"id": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"product_id": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"categories": {
|
||||
"type": ["null", "array"],
|
||||
"items": {}
|
||||
},
|
||||
"vendor": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"discount": {
|
||||
"type": ["null", "integer"]
|
||||
},
|
||||
"imageUrl": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"handle": {
|
||||
"type": ["null", "string"]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"type": ["null", "object"]
|
||||
|
||||
@@ -11,6 +11,7 @@ from airbyte_cdk.sources.streams import Stream
|
||||
|
||||
from .api import (
|
||||
Campaigns,
|
||||
CampaignsMetrics,
|
||||
Channels,
|
||||
EmailBounce,
|
||||
EmailClick,
|
||||
@@ -20,6 +21,7 @@ from .api import (
|
||||
EmailSendSkip,
|
||||
EmailSubscribe,
|
||||
EmailUnsubscribe,
|
||||
Events,
|
||||
Lists,
|
||||
ListUsers,
|
||||
MessageTypes,
|
||||
@@ -41,6 +43,7 @@ class SourceIterable(AbstractSource):
|
||||
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
||||
return [
|
||||
Campaigns(api_key=config["api_key"]),
|
||||
CampaignsMetrics(api_key=config["api_key"], start_date=config["start_date"]),
|
||||
Channels(api_key=config["api_key"]),
|
||||
EmailBounce(api_key=config["api_key"], start_date=config["start_date"]),
|
||||
EmailClick(api_key=config["api_key"], start_date=config["start_date"]),
|
||||
@@ -50,6 +53,7 @@ class SourceIterable(AbstractSource):
|
||||
EmailSendSkip(api_key=config["api_key"], start_date=config["start_date"]),
|
||||
EmailSubscribe(api_key=config["api_key"], start_date=config["start_date"]),
|
||||
EmailUnsubscribe(api_key=config["api_key"], start_date=config["start_date"]),
|
||||
Events(api_key=config["api_key"]),
|
||||
Lists(api_key=config["api_key"]),
|
||||
ListUsers(api_key=config["api_key"]),
|
||||
MessageTypes(api_key=config["api_key"]),
|
||||
|
||||
Reference in New Issue
Block a user