Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Alfredo Garcia <freddy.garcia7.fg@gmail.com>
183 lines
6.1 KiB
Python
183 lines
6.1 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
|
|
import urllib.parse
|
|
from abc import ABC
|
|
from datetime import datetime
|
|
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.streams.http import HttpStream
|
|
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
|
|
|
|
|
|
class CartStream(HttpStream, ABC):
|
|
primary_key = "id"
|
|
|
|
def __init__(
|
|
self,
|
|
start_date: str,
|
|
end_date: str = None,
|
|
authenticator: AbstractHeaderAuthenticator = None,
|
|
**kwargs,
|
|
):
|
|
super().__init__(**kwargs)
|
|
self._start_date = start_date
|
|
self._end_date = end_date
|
|
self._authenticator = authenticator
|
|
|
|
@property
|
|
def url_base(self) -> str:
|
|
return self._authenticator.url_base()
|
|
|
|
@property
|
|
def data_field(self) -> str:
|
|
"""
|
|
Field of the response containing data.
|
|
By default the value self.name will be used if this property is empty or None
|
|
"""
|
|
return None
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return self.name
|
|
|
|
@property
|
|
def max_retries(self) -> Union[int, None]:
|
|
return 3
|
|
|
|
def backoff_time(self, response: requests.Response) -> Optional[float]:
|
|
"""
|
|
We dont need to check the response.status_code == 429 since this header exists only in this case.
|
|
Some endpoints or sometimes Cart.com API returns a datetie instead of the float value to wait to next request.
|
|
Also after calculating the float when Cart.com return a datetime using the value directly
|
|
causes Server Error after a few attempts. Because of this was created the `server_backoff` variable to give time
|
|
to server recover from too many requests.
|
|
"""
|
|
server_backoff = 3
|
|
retry_after = response.headers.get("Retry-After")
|
|
if retry_after:
|
|
try:
|
|
return float(retry_after)
|
|
except ValueError:
|
|
retry_after_datetime = datetime.strptime(retry_after, "%a, %d %b %Y %H:%M:%S %Z")
|
|
return float(server_backoff * abs(retry_after_datetime - datetime.now()).seconds)
|
|
|
|
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
|
response_json = response.json()
|
|
|
|
if response_json.get("next_page"):
|
|
next_query_string = urllib.parse.urlsplit(response_json.get("next_page")).query
|
|
params = dict(urllib.parse.parse_qsl(next_query_string))
|
|
return params
|
|
|
|
def request_headers(self, **kwargs) -> Mapping[str, Any]:
|
|
extra_params = {}
|
|
params = self.request_params(**kwargs)
|
|
extra_params = self._authenticator.extra_params(self, params)
|
|
return dict({"Cache-Control": "no-cache", "Content-Type": "application/json"}, **extra_params)
|
|
|
|
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
|
|
response_json = response.json()
|
|
result = response_json.get(self.data_field or self.name, [])
|
|
yield from result
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
params = {"count": 1000}
|
|
if next_page_token:
|
|
params.update(next_page_token)
|
|
return params
|
|
|
|
|
|
class IncrementalCartStream(CartStream, ABC):
|
|
state_checkpoint_interval = 1000
|
|
cursor_field = "updated_at"
|
|
|
|
def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
|
|
"""
|
|
Generates a query for incremental logic
|
|
|
|
Docs: https://developers.cart.com/docs/rest-api/docs/query_syntax.md
|
|
"""
|
|
params = super().request_params(stream_state=stream_state, **kwargs)
|
|
cursor_value = stream_state.get(self.cursor_field) or self._start_date
|
|
params["sort"] = self.cursor_field
|
|
start_date = max(cursor_value, self._start_date)
|
|
query = f"gt:{start_date}"
|
|
if self._end_date and self._end_date > start_date:
|
|
query += f" AND lt:{self._end_date}"
|
|
|
|
params[self.cursor_field] = query
|
|
|
|
ord_params = ["count", "page", "sort", self.cursor_field]
|
|
ordered_params = {k: params[k] for k in ord_params if k in params}
|
|
return ordered_params
|
|
|
|
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
"""
|
|
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
|
|
and returning an updated state object.
|
|
"""
|
|
latest_state = latest_record.get(self.cursor_field)
|
|
current_state = current_stream_state.get(self.cursor_field) or latest_state
|
|
if current_state:
|
|
return {self.cursor_field: max(latest_state, current_state)}
|
|
return {}
|
|
|
|
|
|
class CustomersCart(IncrementalCartStream):
|
|
"""
|
|
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1customers/get
|
|
"""
|
|
|
|
data_field = "customers"
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return self.data_field
|
|
|
|
|
|
class Orders(IncrementalCartStream):
|
|
"""
|
|
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1orders/get
|
|
"""
|
|
|
|
|
|
class OrderPayments(IncrementalCartStream):
|
|
"""
|
|
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1order_payments/get
|
|
"""
|
|
|
|
data_field = "payments"
|
|
|
|
|
|
class OrderItems(IncrementalCartStream):
|
|
"""
|
|
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1order_items/get
|
|
"""
|
|
|
|
data_field = "items"
|
|
|
|
|
|
class OrderStatuses(CartStream):
|
|
"""
|
|
Docs: https://developers.cart.com/docs/rest-api/ff5ada86bc8a0-get-order-statuses
|
|
"""
|
|
|
|
data_field = "order_statuses"
|
|
|
|
|
|
class Products(IncrementalCartStream):
|
|
"""
|
|
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1products/get
|
|
"""
|
|
|
|
|
|
class Addresses(IncrementalCartStream):
|
|
"""
|
|
Docs: https://developers.cart.com/docs/rest-api/b3A6MjMzMTc3Njc-get-addresses
|
|
"""
|