1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-tplcentral/source_tplcentral/streams.py
2024-12-18 14:05:43 -08:00

295 lines
9.4 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, MutableMapping, Optional
from urllib.parse import parse_qsl, urlparse
import arrow
import requests
from airbyte_cdk.sources.streams.http import HttpStream
from source_tplcentral.util import deep_get, normalize
class TplcentralStream(HttpStream, ABC):
url_base = None
def __init__(self, config) -> None:
super().__init__(authenticator=config["authenticator"])
self.url_base = config["url_base"]
self.customer_id = config.get("customer_id")
self.facility_id = config.get("facility_id")
self.start_date = config.get("start_date")
self.total_results_field = "TotalResults"
primary_key = "_id"
@property
def page_size(self):
None
@property
def upstream_primary_key(self):
None
@property
def upstream_cursor_field(self):
None
@property
@abstractmethod
def collection_field(self) -> str:
pass
def next_page_token(self, response: requests.Response, **kwargs) -> Optional[Mapping[str, Any]]:
data = response.json()
total = data[self.total_results_field]
pgsiz = self.page_size or len(data[self.collection_field])
url = urlparse(response.request.url)
qs = dict(parse_qsl(url.query))
pgsiz = int(qs.get("pgsiz", pgsiz))
pgnum = int(qs.get("pgnum", 1))
if pgsiz > 0 and pgsiz * pgnum < total:
return {
"pgsiz": pgsiz,
"pgnum": pgnum + 1,
}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return next_page_token
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
records = normalize(response.json()[self.collection_field])
for record in records:
if self.upstream_primary_key:
record[self.primary_key] = deep_get(record, self.upstream_primary_key)
if self.upstream_cursor_field:
record[self.cursor_field] = deep_get(record, self.upstream_cursor_field)
yield record
class StockSummaries(TplcentralStream):
# https://api.3plcentral.com/rels/inventory/stocksummaries
collection_field = "Summaries"
primary_key = ["FacilityId", "_item_identifier_id"]
page_size = 500
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return "inventory/stocksummaries"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
records = super().parse_response(response, **kwargs)
for record in records:
record["_item_identifier_id"] = deep_get(record, "ItemIdentifier.Id")
yield record
class Customers(TplcentralStream):
# https://api.3plcentral.com/rels/customers/customers
upstream_primary_key = "ReadOnly.CustomerId"
collection_field = "ResourceList"
page_size = 100
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return "customers"
class IncrementalTplcentralStream(TplcentralStream, ABC):
state_checkpoint_interval = 100
cursor_field = "_cursor"
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
current = current_stream_state.get(self.cursor_field, "")
latest = latest_record.get(self.cursor_field, "")
if current and latest:
return {self.cursor_field: max(arrow.get(latest), arrow.get(current)).datetime.replace(tzinfo=None).isoformat()}
return {self.cursor_field: max(latest, current)}
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
if stream_state is None:
stream_state = {}
return [{self.cursor_field: stream_state.get(self.cursor_field, self.start_date)}]
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
return params or {}
class Items(IncrementalTplcentralStream):
# https://api.3plcentral.com/rels/customers/items
upstream_primary_key = "ItemId"
upstream_cursor_field = "ReadOnly.LastModifiedDate"
collection_field = "ResourceList"
page_size = 100
def path(self, **kwargs) -> str:
return f"customers/{self.customer_id}/items"
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
params.update({"sort": self.upstream_cursor_field})
cursor = stream_slice.get(self.cursor_field)
if cursor:
params.update({"rql": f"{self.upstream_cursor_field}=ge={cursor}"})
return params
class StockDetails(IncrementalTplcentralStream):
# https://api.3plcentral.com/rels/inventory/stockdetails
upstream_primary_key = "ReceiveItemId"
upstream_cursor_field = "ReceivedDate"
collection_field = "ResourceList"
page_size = 500
def path(self, **kwargs) -> str:
return "inventory/stockdetails"
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
params.update(
{
"customerid": self.customer_id,
"facilityid": self.facility_id,
"sort": self.upstream_cursor_field,
}
)
cursor = stream_slice.get(self.cursor_field)
if cursor:
params.update({"rql": f"{self.upstream_cursor_field}=ge={cursor}"})
return params
class Inventory(IncrementalTplcentralStream):
# https://api.3plcentral.com/rels/inventory/inventory
upstream_primary_key = "ReceiveItemId"
upstream_cursor_field = "ReceivedDate"
collection_field = "ResourceList"
page_size = 1000
def path(self, **kwargs) -> str:
return "inventory"
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
params.update(
{
"sort": self.upstream_cursor_field,
"rql": ";".join(
[
f"CustomerIdentifier.Id=={self.customer_id}",
f"FacilityIdentifier.Id=={self.facility_id}",
]
),
}
)
cursor = stream_slice.get(self.cursor_field)
if cursor:
params.update(
{
"rql": ";".join(
[
params["rql"],
f"{self.upstream_cursor_field}=ge={cursor}",
]
)
}
)
return params
class Orders(IncrementalTplcentralStream):
# https://api.3plcentral.com/rels/orders/orders
upstream_primary_key = "ReadOnly.OrderId"
upstream_cursor_field = "ReadOnly.LastModifiedDate"
collection_field = "ResourceList"
page_size = 1000
def path(self, **kwargs) -> str:
return "orders"
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)
params.update(
{
"sort": self.upstream_cursor_field,
"rql": ";".join(
[
f"ReadOnly.CustomerIdentifier.Id=={self.customer_id}",
f"ReadOnly.FacilityIdentifier.Id=={self.facility_id}",
]
),
}
)
cursor = stream_slice.get(self.cursor_field)
if cursor:
params.update(
{
"rql": ";".join(
[
params["rql"],
f"{self.upstream_cursor_field}=ge={cursor}",
]
)
}
)
return params