1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-recharge/source_recharge/streams.py
2024-12-18 14:05:43 -08:00

137 lines
5.4 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from abc import ABC
from enum import Enum
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
import pendulum
import requests
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from source_recharge.components.recharge_error_handler import RechargeErrorHandler
class ApiVersion(Enum):
DEPRECATED = "2021-01"
MODERN = "2021-11"
class RechargeTokenAuthenticator(TokenAuthenticator):
def get_auth_header(self) -> Mapping[str, Any]:
return {"X-Recharge-Access-Token": self._token}
class Orders(HttpStream, ABC):
"""
Orders Stream: https://developer.rechargepayments.com/v1-shopify?python#list-orders
Notes:
Using `2021-01` the: `email`, `first_name`, `last_name` columns are not available,
because these are not present in `2021-11` as DEPRECATED fields.
"""
primary_key: str = "id"
url_base: str = "https://api.rechargeapps.com/"
cursor_field: str = "updated_at"
page_size: int = 250
page_num: int = 1
period_in_days: int = 30 # Slice data request for 1 month
raise_on_http_errors: bool = True
state_checkpoint_interval: int = 250
# registering the default schema transformation
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
def __init__(self, config: Mapping[str, Any], **kwargs) -> None:
super().__init__(**kwargs)
self._start_date = config["start_date"]
self.api_version = ApiVersion.DEPRECATED if config.get("use_orders_deprecated_api") else ApiVersion.MODERN
@property
def data_path(self) -> str:
return self.name
def request_headers(self, **kwargs) -> Mapping[str, Any]:
return {"x-recharge-version": self.api_version.value}
def path(self, **kwargs) -> str:
return self.name
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
next_page_token = None
if self.api_version == ApiVersion.MODERN:
cursor = response.json().get("next_cursor")
if cursor:
next_page_token = {"cursor": cursor}
else:
stream_data = self.get_stream_data(response.json())
if len(stream_data) == self.page_size:
self.page_num += 1
next_page_token = {"page": self.page_num}
return next_page_token
def _update_params_with_min_max_date_range(
self,
params: MutableMapping[str, Any],
stream_slice: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
params.update(
{
"sort_by": "updated_at-asc",
"updated_at_min": (stream_slice or {}).get("start_date"),
"updated_at_max": (stream_slice or {}).get("end_date"),
}
)
return params
def request_params(
self, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = {"limit": self.page_size}
if self.api_version == ApiVersion.MODERN:
# if a cursor value is passed, only limit can be passed with it!
if next_page_token:
params.update(next_page_token)
else:
params = self._update_params_with_min_max_date_range(params, stream_slice)
return params
else:
params = self._update_params_with_min_max_date_range(params, stream_slice)
if next_page_token:
params.update(next_page_token)
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()
stream_data = self.get_stream_data(response_data)
yield from stream_data
def get_stream_data(self, response_data: Any) -> List[dict]:
if self.data_path:
return response_data.get(self.data_path, [])
else:
return [response_data]
def get_error_handler(self) -> ErrorHandler:
return RechargeErrorHandler(logger=self.logger)
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
start_date_value = (stream_state or {}).get(self.cursor_field, self._start_date) if self.cursor_field else self._start_date
now = pendulum.now()
# dates are inclusive, so we add 1 second so that time periods do not overlap
start_date = pendulum.parse(start_date_value).add(seconds=1)
while start_date <= now:
end_date = start_date.add(days=self.period_in_days)
yield {"start_date": start_date.strftime("%Y-%m-%d %H:%M:%S"), "end_date": end_date.strftime("%Y-%m-%d %H:%M:%S")}
start_date = end_date.add(seconds=1)
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
latest_benchmark = latest_record[self.cursor_field]
if current_stream_state.get(self.cursor_field):
return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])}
return {self.cursor_field: latest_benchmark}