1
0
mirror of synced 2025-12-23 21:03:15 -05:00

🎉 Source Chartmogul: Add CustomerCount stream (#10756)

* 🎉 Source Chartmogul: Add CustomerCount stream

* Update description

* address comments

* update changelog

* format source file

* run seed file

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
This commit is contained in:
Titas Skrebe
2022-03-03 00:09:54 +02:00
committed by GitHub
parent 2b6926d707
commit c81ceb2e5d
13 changed files with 117 additions and 8 deletions

View File

@@ -119,7 +119,7 @@
- name: Chartmogul
sourceDefinitionId: b6604cbd-1b12-4c08-8767-e140d0fb0877
dockerRepository: airbyte/source-chartmogul
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/chartmogul
icon: chartmogul.svg
sourceType: api

View File

@@ -1000,7 +1000,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-chartmogul:0.1.0"
- dockerImage: "airbyte/source-chartmogul:0.1.1"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/chartmogul"
connectionSpecification:
@@ -1009,6 +1009,8 @@
type: "object"
required:
- "api_key"
- "start_date"
- "interval"
additionalProperties: false
properties:
api_key:
@@ -1024,6 +1026,17 @@
examples:
- "2017-01-25T00:00:00Z"
order: 1
interval:
type: "string"
description: "Some APIs such as <a href=\"https://dev.chartmogul.com/reference/endpoint-overview-metrics-api\"\
>Metrics</a> require intervals to cluster data."
enum:
- "day"
- "week"
- "month"
- "quarter"
default: "month"
order: 2
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []

View File

@@ -34,5 +34,5 @@ COPY source_chartmogul ./source_chartmogul
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-chartmogul

View File

@@ -5,6 +5,7 @@ Chartmogul is an online subscription analytics platform. It retrieves data from
Connector currently implements following full refresh streams:
* [Customers](https://dev.chartmogul.com/reference/list-customers)
* [CustomerCount] (https://dev.chartmogul.com/reference/retrieve-customer-count)
* [Activities](https://dev.chartmogul.com/reference/list-activities)
`start_date` config is used for retrieving `Activies`. `Customers` stream does not use this config. Even if it was possible to filter by `start_date`, it would cause issues when modeling data. That is because activies after `start_date` can be triggered by customers who were created way before that.

View File

@@ -17,6 +17,15 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "customer_count",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -1,3 +1,5 @@
{
"api_key": "<invalid_key>"
"api_key": "<invalid_key>",
"start_date": "2017-01-25T00:00:00Z",
"interval": "day"
}

View File

@@ -1,4 +1,5 @@
{
"api_key": "<api-key>",
"start_date": "2022-01-05T12:09:00Z"
"start_date": "2022-01-05T12:09:00Z",
"interval": "day"
}

View File

@@ -17,6 +17,15 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "customer_count",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -0,0 +1,12 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"date": {
"type": ["string"]
},
"customers": {
"type": ["integer"]
}
}
}

View File

@@ -4,13 +4,16 @@
from abc import ABC
from base64 import b64encode
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib.parse import urljoin
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams.http.exceptions import RequestBodyException
# Basic full refresh stream
@@ -70,6 +73,52 @@ class Activities(ChartmogulStream):
return "v1/activities"
class CustomerCount(ChartmogulStream):
primary_key = "date"
def __init__(self, start_date: str, interval: str, **kwargs):
super().__init__(**kwargs)
self.start_date = start_date
self.end_date = datetime.now().strftime("%Y-%m-%d")
self.interval = interval
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None
def request_body_data(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
return {
"start-date": self.start_date,
"end-date": self.end_date,
"interval": self.interval,
}
def _create_prepared_request(
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None
) -> requests.PreparedRequest:
"""
Override to make possible sending http body with GET request.
"""
args = {"method": self.http_method, "url": urljoin(self.url_base, path), "headers": headers, "params": params}
if json and data:
raise RequestBodyException(
"At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
)
elif json:
args["json"] = json
elif data:
args["data"] = data
return self._session.prepare_request(requests.Request(**args))
def path(self, **kwargs) -> str:
return "v1/metrics/customer-count"
class HttpBasicAuthenticator(TokenAuthenticator):
def __init__(self, token: str, auth_method: str = "Basic", **kwargs):
auth_string = f"{token}:".encode("utf8")
@@ -91,4 +140,8 @@ class SourceChartmogul(AbstractSource):
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = HttpBasicAuthenticator(config["api_key"], auth_method="Basic")
return [Customers(authenticator=auth), Activities(authenticator=auth, start_date=config.get("start_date"))]
return [
Customers(authenticator=auth),
CustomerCount(authenticator=auth, start_date=config.get("start_date"), interval=config.get("interval")),
Activities(authenticator=auth, start_date=config.get("start_date")),
]

View File

@@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Chartmogul Spec",
"type": "object",
"required": ["api_key"],
"required": ["api_key", "start_date", "interval"],
"additionalProperties": false,
"properties": {
"api_key": {
@@ -19,6 +19,13 @@
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. When feasible, any data before this date will not be replicated.",
"examples": ["2017-01-25T00:00:00Z"],
"order": 1
},
"interval": {
"type": "string",
"description": "Some APIs such as <a href=\"https://dev.chartmogul.com/reference/endpoint-overview-metrics-api\">Metrics</a> require intervals to cluster data.",
"enum": ["day", "week", "month", "quarter"],
"default": "month",
"order": 2
}
}
}

View File

@@ -26,5 +26,5 @@ def test_streams(mocker):
source = SourceChartmogul()
config_mock = MagicMock()
streams = source.streams(config_mock)
expected_streams_number = 2
expected_streams_number = 3
assert len(streams) == expected_streams_number

View File

@@ -15,6 +15,7 @@ If `start_date` is set, it will only apply to `Activities` stream. `Customers`'
This Source is capable of syncing the following streams:
* [Customers](https://dev.chartmogul.com/reference/list-customers)
* [CustomerCount] (https://dev.chartmogul.com/reference/retrieve-customer-count)
* [Activities](https://dev.chartmogul.com/reference/list-activities)
### Features
@@ -44,4 +45,5 @@ Please read [How to find your API key](https://dev.chartmogul.com/docs/authentic
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.1 | 2022-03-02 | [10756](https://github.com/airbytehq/airbyte/pull/10756) | Add new stream: customer-count |
| 0.1.0 | 2022-01-10 | [9381](https://github.com/airbytehq/airbyte/pull/9381) | New Source: Chartmogul |