295 lines
9.4 KiB
Python
295 lines
9.4 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Iterable, Mapping, MutableMapping, Optional
|
|
from urllib.parse import parse_qsl, urlparse
|
|
|
|
import arrow
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.streams.http import HttpStream
|
|
from source_tplcentral.util import deep_get, normalize
|
|
|
|
|
|
class TplcentralStream(HttpStream, ABC):
|
|
url_base = None
|
|
|
|
def __init__(self, config) -> None:
|
|
super().__init__(authenticator=config["authenticator"])
|
|
|
|
self.url_base = config["url_base"]
|
|
self.customer_id = config.get("customer_id")
|
|
self.facility_id = config.get("facility_id")
|
|
self.start_date = config.get("start_date")
|
|
|
|
self.total_results_field = "TotalResults"
|
|
|
|
primary_key = "_id"
|
|
|
|
@property
|
|
def page_size(self):
|
|
None
|
|
|
|
@property
|
|
def upstream_primary_key(self):
|
|
None
|
|
|
|
@property
|
|
def upstream_cursor_field(self):
|
|
None
|
|
|
|
@property
|
|
@abstractmethod
|
|
def collection_field(self) -> str:
|
|
pass
|
|
|
|
def next_page_token(self, response: requests.Response, **kwargs) -> Optional[Mapping[str, Any]]:
|
|
data = response.json()
|
|
total = data[self.total_results_field]
|
|
|
|
pgsiz = self.page_size or len(data[self.collection_field])
|
|
|
|
url = urlparse(response.request.url)
|
|
qs = dict(parse_qsl(url.query))
|
|
|
|
pgsiz = int(qs.get("pgsiz", pgsiz))
|
|
pgnum = int(qs.get("pgnum", 1))
|
|
|
|
if pgsiz > 0 and pgsiz * pgnum < total:
|
|
return {
|
|
"pgsiz": pgsiz,
|
|
"pgnum": pgnum + 1,
|
|
}
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
return next_page_token
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
records = normalize(response.json()[self.collection_field])
|
|
|
|
for record in records:
|
|
if self.upstream_primary_key:
|
|
record[self.primary_key] = deep_get(record, self.upstream_primary_key)
|
|
if self.upstream_cursor_field:
|
|
record[self.cursor_field] = deep_get(record, self.upstream_cursor_field)
|
|
yield record
|
|
|
|
|
|
class StockSummaries(TplcentralStream):
|
|
# https://api.3plcentral.com/rels/inventory/stocksummaries
|
|
collection_field = "Summaries"
|
|
primary_key = ["FacilityId", "_item_identifier_id"]
|
|
page_size = 500
|
|
|
|
def path(
|
|
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> str:
|
|
return "inventory/stocksummaries"
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
records = super().parse_response(response, **kwargs)
|
|
|
|
for record in records:
|
|
record["_item_identifier_id"] = deep_get(record, "ItemIdentifier.Id")
|
|
yield record
|
|
|
|
|
|
class Customers(TplcentralStream):
|
|
# https://api.3plcentral.com/rels/customers/customers
|
|
upstream_primary_key = "ReadOnly.CustomerId"
|
|
collection_field = "ResourceList"
|
|
page_size = 100
|
|
|
|
def path(
|
|
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> str:
|
|
return "customers"
|
|
|
|
|
|
class IncrementalTplcentralStream(TplcentralStream, ABC):
|
|
state_checkpoint_interval = 100
|
|
|
|
cursor_field = "_cursor"
|
|
|
|
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
current = current_stream_state.get(self.cursor_field, "")
|
|
latest = latest_record.get(self.cursor_field, "")
|
|
|
|
if current and latest:
|
|
return {self.cursor_field: max(arrow.get(latest), arrow.get(current)).datetime.replace(tzinfo=None).isoformat()}
|
|
|
|
return {self.cursor_field: max(latest, current)}
|
|
|
|
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
|
|
if stream_state is None:
|
|
stream_state = {}
|
|
|
|
return [{self.cursor_field: stream_state.get(self.cursor_field, self.start_date)}]
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
params = super().request_params(
|
|
stream_state=stream_state,
|
|
stream_slice=stream_slice,
|
|
next_page_token=next_page_token,
|
|
)
|
|
|
|
return params or {}
|
|
|
|
|
|
class Items(IncrementalTplcentralStream):
|
|
# https://api.3plcentral.com/rels/customers/items
|
|
upstream_primary_key = "ItemId"
|
|
upstream_cursor_field = "ReadOnly.LastModifiedDate"
|
|
collection_field = "ResourceList"
|
|
page_size = 100
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return f"customers/{self.customer_id}/items"
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
params = super().request_params(
|
|
stream_state=stream_state,
|
|
stream_slice=stream_slice,
|
|
next_page_token=next_page_token,
|
|
)
|
|
|
|
params.update({"sort": self.upstream_cursor_field})
|
|
cursor = stream_slice.get(self.cursor_field)
|
|
if cursor:
|
|
params.update({"rql": f"{self.upstream_cursor_field}=ge={cursor}"})
|
|
|
|
return params
|
|
|
|
|
|
class StockDetails(IncrementalTplcentralStream):
|
|
# https://api.3plcentral.com/rels/inventory/stockdetails
|
|
upstream_primary_key = "ReceiveItemId"
|
|
upstream_cursor_field = "ReceivedDate"
|
|
collection_field = "ResourceList"
|
|
page_size = 500
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return "inventory/stockdetails"
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
params = super().request_params(
|
|
stream_state=stream_state,
|
|
stream_slice=stream_slice,
|
|
next_page_token=next_page_token,
|
|
)
|
|
|
|
params.update(
|
|
{
|
|
"customerid": self.customer_id,
|
|
"facilityid": self.facility_id,
|
|
"sort": self.upstream_cursor_field,
|
|
}
|
|
)
|
|
cursor = stream_slice.get(self.cursor_field)
|
|
if cursor:
|
|
params.update({"rql": f"{self.upstream_cursor_field}=ge={cursor}"})
|
|
|
|
return params
|
|
|
|
|
|
class Inventory(IncrementalTplcentralStream):
|
|
# https://api.3plcentral.com/rels/inventory/inventory
|
|
upstream_primary_key = "ReceiveItemId"
|
|
upstream_cursor_field = "ReceivedDate"
|
|
collection_field = "ResourceList"
|
|
page_size = 1000
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return "inventory"
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
params = super().request_params(
|
|
stream_state=stream_state,
|
|
stream_slice=stream_slice,
|
|
next_page_token=next_page_token,
|
|
)
|
|
|
|
params.update(
|
|
{
|
|
"sort": self.upstream_cursor_field,
|
|
"rql": ";".join(
|
|
[
|
|
f"CustomerIdentifier.Id=={self.customer_id}",
|
|
f"FacilityIdentifier.Id=={self.facility_id}",
|
|
]
|
|
),
|
|
}
|
|
)
|
|
cursor = stream_slice.get(self.cursor_field)
|
|
if cursor:
|
|
params.update(
|
|
{
|
|
"rql": ";".join(
|
|
[
|
|
params["rql"],
|
|
f"{self.upstream_cursor_field}=ge={cursor}",
|
|
]
|
|
)
|
|
}
|
|
)
|
|
|
|
return params
|
|
|
|
|
|
class Orders(IncrementalTplcentralStream):
|
|
# https://api.3plcentral.com/rels/orders/orders
|
|
upstream_primary_key = "ReadOnly.OrderId"
|
|
upstream_cursor_field = "ReadOnly.LastModifiedDate"
|
|
collection_field = "ResourceList"
|
|
page_size = 1000
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return "orders"
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
params = super().request_params(
|
|
stream_state=stream_state,
|
|
stream_slice=stream_slice,
|
|
next_page_token=next_page_token,
|
|
)
|
|
|
|
params.update(
|
|
{
|
|
"sort": self.upstream_cursor_field,
|
|
"rql": ";".join(
|
|
[
|
|
f"ReadOnly.CustomerIdentifier.Id=={self.customer_id}",
|
|
f"ReadOnly.FacilityIdentifier.Id=={self.facility_id}",
|
|
]
|
|
),
|
|
}
|
|
)
|
|
cursor = stream_slice.get(self.cursor_field)
|
|
if cursor:
|
|
params.update(
|
|
{
|
|
"rql": ";".join(
|
|
[
|
|
params["rql"],
|
|
f"{self.upstream_cursor_field}=ge={cursor}",
|
|
]
|
|
)
|
|
}
|
|
)
|
|
|
|
return params
|