1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-cart/source_cart/streams.py
devin-ai-integration[bot] 9129cbee72 chore(source-cart): Upgrade to Python 3.13, base image 4.1.0, and CDK 7.x (#69783)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Alfredo Garcia <freddy.garcia7.fg@gmail.com>
2025-11-24 12:02:01 -06:00

183 lines
6.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import urllib.parse
from abc import ABC
from datetime import datetime
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union
import requests
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
class CartStream(HttpStream, ABC):
primary_key = "id"
def __init__(
self,
start_date: str,
end_date: str = None,
authenticator: AbstractHeaderAuthenticator = None,
**kwargs,
):
super().__init__(**kwargs)
self._start_date = start_date
self._end_date = end_date
self._authenticator = authenticator
@property
def url_base(self) -> str:
return self._authenticator.url_base()
@property
def data_field(self) -> str:
"""
Field of the response containing data.
By default the value self.name will be used if this property is empty or None
"""
return None
def path(self, **kwargs) -> str:
return self.name
@property
def max_retries(self) -> Union[int, None]:
return 3
def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
We dont need to check the response.status_code == 429 since this header exists only in this case.
Some endpoints or sometimes Cart.com API returns a datetie instead of the float value to wait to next request.
Also after calculating the float when Cart.com return a datetime using the value directly
causes Server Error after a few attempts. Because of this was created the `server_backoff` variable to give time
to server recover from too many requests.
"""
server_backoff = 3
retry_after = response.headers.get("Retry-After")
if retry_after:
try:
return float(retry_after)
except ValueError:
retry_after_datetime = datetime.strptime(retry_after, "%a, %d %b %Y %H:%M:%S %Z")
return float(server_backoff * abs(retry_after_datetime - datetime.now()).seconds)
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
response_json = response.json()
if response_json.get("next_page"):
next_query_string = urllib.parse.urlsplit(response_json.get("next_page")).query
params = dict(urllib.parse.parse_qsl(next_query_string))
return params
def request_headers(self, **kwargs) -> Mapping[str, Any]:
extra_params = {}
params = self.request_params(**kwargs)
extra_params = self._authenticator.extra_params(self, params)
return dict({"Cache-Control": "no-cache", "Content-Type": "application/json"}, **extra_params)
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
response_json = response.json()
result = response_json.get(self.data_field or self.name, [])
yield from result
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {"count": 1000}
if next_page_token:
params.update(next_page_token)
return params
class IncrementalCartStream(CartStream, ABC):
state_checkpoint_interval = 1000
cursor_field = "updated_at"
def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
"""
Generates a query for incremental logic
Docs: https://developers.cart.com/docs/rest-api/docs/query_syntax.md
"""
params = super().request_params(stream_state=stream_state, **kwargs)
cursor_value = stream_state.get(self.cursor_field) or self._start_date
params["sort"] = self.cursor_field
start_date = max(cursor_value, self._start_date)
query = f"gt:{start_date}"
if self._end_date and self._end_date > start_date:
query += f" AND lt:{self._end_date}"
params[self.cursor_field] = query
ord_params = ["count", "page", "sort", self.cursor_field]
ordered_params = {k: params[k] for k in ord_params if k in params}
return ordered_params
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
and returning an updated state object.
"""
latest_state = latest_record.get(self.cursor_field)
current_state = current_stream_state.get(self.cursor_field) or latest_state
if current_state:
return {self.cursor_field: max(latest_state, current_state)}
return {}
class CustomersCart(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1customers/get
"""
data_field = "customers"
def path(self, **kwargs) -> str:
return self.data_field
class Orders(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1orders/get
"""
class OrderPayments(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1order_payments/get
"""
data_field = "payments"
class OrderItems(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1order_items/get
"""
data_field = "items"
class OrderStatuses(CartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/ff5ada86bc8a0-get-order-statuses
"""
data_field = "order_statuses"
class Products(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1products/get
"""
class Addresses(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/b3A6MjMzMTc3Njc-get-addresses
"""