1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-netsuite/source_netsuite/streams.py
Dhroov Makwana 7f01a90ba7 🚨Source Netsuite: Migrate to poetry (#42857)
Co-authored-by: Danylo Jablonski <150933663+DanyloGL@users.noreply.github.com>
Co-authored-by: Natik Gadzhi <natik@respawn.io>
2025-02-05 10:37:22 -08:00

290 lines
12 KiB
Python

#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from abc import ABC
from datetime import date, datetime, timedelta
from json import JSONDecodeError
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union
import requests
from requests_oauthlib import OAuth1
from airbyte_cdk.sources.streams.http import HttpStream
from source_netsuite.constraints import (
CUSTOM_INCREMENTAL_CURSOR,
INCREMENTAL_CURSOR,
META_PATH,
NETSUITE_INPUT_DATE_FORMATS,
NETSUITE_OUTPUT_DATETIME_FORMAT,
RECORD_PATH,
REFERAL_SCHEMA,
REFERAL_SCHEMA_URL,
SCHEMA_HEADERS,
USLESS_SCHEMA_ELEMENTS,
)
from source_netsuite.errors import NETSUITE_ERRORS_MAPPING, DateFormatExeption
class NetsuiteStream(HttpStream, ABC):
def __init__(
self,
auth: OAuth1,
object_name: str,
base_url: str,
start_datetime: str,
window_in_days: int,
):
self.object_name = object_name
self.base_url = base_url
self.start_datetime = start_datetime
self.window_in_days = window_in_days
self.schemas = {} # store subschemas to reduce API calls
super().__init__(authenticator=auth)
primary_key = "id"
# instance input date format format selector
index_datetime_format = 0
raise_on_http_errors = True
@property
def default_datetime_format(self) -> str:
return NETSUITE_INPUT_DATE_FORMATS[self.index_datetime_format]
@property
def name(self) -> str:
return self.object_name
@property
def url_base(self) -> str:
return self.base_url
def path(self, **kwargs) -> str:
return RECORD_PATH + self.object_name
@property
def ref_schema(self) -> Mapping[str, str]:
schema = REFERAL_SCHEMA
schema["properties"]["links"]["items"] = self.get_schema(REFERAL_SCHEMA_URL)
return schema
def get_schema(self, ref: str) -> Union[Mapping[str, Any], str]:
def get_json_response(response: requests.Response) -> dict:
try:
return response.json()
except JSONDecodeError as e:
self.logger.error(f"Cannot get schema for {self.name}, actual response: {e.response.text}")
raise
# try to retrieve the schema from the cache
schema = self.schemas.get(ref)
if not schema:
resp = self._session.get(url=self.url_base + ref, headers=SCHEMA_HEADERS)
# some schemas, like transaction, do not exist because they refer to multiple
# record types, e.g. sales order/invoice ... in this case we can't retrieve
# the correct schema, so we just put the json in a string
if resp.status_code == 404:
schema = {"title": ref, "type": "string"}
else:
# check for 200 status
resp.raise_for_status
# handle response
schema = get_json_response(resp)
self.schemas[ref] = schema
return schema
def build_schema(self, record: Any) -> Mapping[str, Any]:
# recursively build a schema with subschemas
if isinstance(record, dict):
# Netsuite schemas do not specify if fields can be null, or not
# as Airbyte expects, so we have to allow every field to be null
property_type = record.get("type")
property_type_list = property_type if isinstance(property_type, list) else [property_type]
# ensure there is a type, type is the json schema type and not a property
# and null has not already been added
if property_type and not isinstance(property_type, dict) and "null" not in property_type_list:
record["type"] = ["null"] + property_type_list
# removing non-functional elements from schema
[record.pop(element) for element in USLESS_SCHEMA_ELEMENTS if record.get(element)]
ref = record.get("$ref")
if ref:
return self.get_schema(ref) if ref == REFERAL_SCHEMA_URL else self.ref_schema
else:
return {k: self.build_schema(v) for k, v in record.items()}
else:
return record
def get_json_schema(self, **kwargs) -> dict:
schema = self.get_schema(META_PATH + self.name)
return self.build_schema(schema)
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
resp = response.json()
has_more = resp.get("hasMore")
if has_more:
return {"offset": resp["offset"] + resp["count"]}
return None
def format_date(self, last_modified_date: str) -> str:
# the date format returned is differnet than what we need to use in the query
lmd_datetime = datetime.strptime(last_modified_date, NETSUITE_OUTPUT_DATETIME_FORMAT)
return lmd_datetime.strftime(self.default_datetime_format)
def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = {}
if next_page_token:
params.update(**next_page_token)
return params
def fetch_record(self, record: Mapping[str, Any], request_kwargs: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
url = record["links"][0]["href"]
args = {"method": "GET", "url": url, "params": {"expandSubResources": True}}
prep_req = self._session.prepare_request(requests.Request(**args))
response = self._send_request(prep_req, request_kwargs)
# sometimes response.status_code == 400,
# but contains json elements with error description,
# to avoid passing it as {TYPE: RECORD}, we filter response by status
if response.status_code == requests.codes.ok:
yield response.json()
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
**kwargs,
) -> Iterable[Mapping]:
records = response.json().get("items")
request_kwargs = self.request_kwargs(stream_slice, next_page_token)
if records:
for record in records:
# make sub-requests for each record fetched
yield from self.fetch_record(record, request_kwargs)
def should_retry(self, response: requests.Response) -> bool:
if response.status_code in NETSUITE_ERRORS_MAPPING.keys():
message = response.json().get("o:errorDetails")
if isinstance(message, list):
error_code = message[0].get("o:errorCode")
detail_message = message[0].get("detail")
known_error = NETSUITE_ERRORS_MAPPING.get(response.status_code)
if error_code in known_error.keys():
setattr(self, "raise_on_http_errors", False)
# handle data-format error
if "INVALID_PARAMETER" in error_code and "failed with date format" in detail_message:
self.logger.warn(f"Stream `{self.name}`: cannot read using date format `{self.default_datetime_format}")
self.index_datetime_format += 1
if self.index_datetime_format < len(NETSUITE_INPUT_DATE_FORMATS):
self.logger.warn(f"Stream `{self.name}`: retry using next date format `{self.default_datetime_format}")
raise DateFormatExeption
else:
self.logger.error(f"DATE FORMAT exception. Cannot read using known formats {NETSUITE_INPUT_DATE_FORMATS}")
# handle other known errors
self.logger.error(f"Stream `{self.name}`: {error_code} error occured, full error message: {detail_message}")
return False
else:
return super().should_retry(response)
return super().should_retry(response)
def read_records(
self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, **kwargs
) -> Iterable[Mapping[str, Any]]:
try:
yield from super().read_records(stream_slice=stream_slice, stream_state=stream_state, **kwargs)
except DateFormatExeption:
"""continue trying other formats, until the list is exhausted"""
class IncrementalNetsuiteStream(NetsuiteStream):
@property
def cursor_field(self) -> str:
return INCREMENTAL_CURSOR
def filter_records_newer_than_state(
self,
stream_state: Mapping[str, Any] = None,
records: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""Parse the records with respect to `stream_state` for `incremental` sync."""
if stream_state:
for record in records:
if record.get(self.cursor_field, self.start_datetime) >= stream_state.get(self.cursor_field):
yield record
else:
yield from records
def get_state_value(self, stream_state: Mapping[str, Any] = None) -> str:
"""
Sometimes the object has no `cursor_field` value assigned, and the ` "" ` emmited as state value,
this causes conflicts with datetime lib to parse the `time component`,
to avoid the errors we falling back to default start_date from input config.
"""
state = stream_state.get(self.cursor_field) if stream_state else self.start_datetime
if not state:
self.logger.info(f"Stream state for `{self.name}` was not emmited, falling back to default value: {self.start_datetime}")
return self.start_datetime
return state
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
**kwargs,
) -> Iterable[Mapping]:
records = super().parse_response(response, stream_state, stream_slice, next_page_token)
yield from self.filter_records_newer_than_state(stream_state, records)
def get_updated_state(
self,
current_stream_state: MutableMapping[str, Any],
latest_record: Mapping[str, Any],
) -> Mapping[str, Any]:
latest_cursor = latest_record.get(self.cursor_field, self.start_datetime)
current_cursor = current_stream_state.get(self.cursor_field, self.start_datetime)
return {self.cursor_field: max(latest_cursor, current_cursor)}
def request_params(
self, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = {**(next_page_token or {})}
if stream_slice:
params.update(
**{"q": f'{self.cursor_field} AFTER "{stream_slice["start"]}" AND {self.cursor_field} BEFORE "{stream_slice["end"]}"'}
)
return params
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
# Netsuite cannot order records returned by the API, so we need stream slices
# to maintain state properly https://docs.airbyte.com/connector-development/cdk-python/incremental-stream/#streamstream_slices
slices = []
state = self.get_state_value(stream_state)
start = datetime.strptime(state, NETSUITE_OUTPUT_DATETIME_FORMAT).date()
# handle abnormal state values
if start > date.today():
return slices
else:
while start <= date.today():
next_day = start + timedelta(days=self.window_in_days)
slice_start = start.strftime(self.default_datetime_format)
slice_end = next_day.strftime(self.default_datetime_format)
yield {"start": slice_start, "end": slice_end}
start = next_day
class CustomIncrementalNetsuiteStream(IncrementalNetsuiteStream):
@property
def cursor_field(self) -> str:
return CUSTOM_INCREMENTAL_CURSOR