Co-authored-by: Danylo Jablonski <150933663+DanyloGL@users.noreply.github.com> Co-authored-by: Natik Gadzhi <natik@respawn.io>
174 lines
7.4 KiB
Python
174 lines
7.4 KiB
Python
#
|
|
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
|
|
import logging
|
|
from collections import Counter
|
|
from json import JSONDecodeError
|
|
from typing import Any, List, Mapping, Tuple, Union
|
|
|
|
import requests
|
|
from requests_oauthlib import OAuth1
|
|
|
|
from airbyte_cdk.sources import AbstractSource
|
|
from airbyte_cdk.sources.streams import Stream
|
|
from source_netsuite.constraints import CUSTOM_INCREMENTAL_CURSOR, INCREMENTAL_CURSOR, META_PATH, RECORD_PATH, SCHEMA_HEADERS
|
|
from source_netsuite.streams import CustomIncrementalNetsuiteStream, IncrementalNetsuiteStream, NetsuiteStream
|
|
|
|
|
|
class SourceNetsuite(AbstractSource):
|
|
logger: logging.Logger = logging.getLogger("airbyte")
|
|
|
|
def auth(self, config: Mapping[str, Any]) -> OAuth1:
|
|
# the `realm` param should be in format of: 12345_SB1
|
|
realm = config["realm"].replace("-", "_").upper()
|
|
return OAuth1(
|
|
client_key=config["consumer_key"],
|
|
client_secret=config["consumer_secret"],
|
|
resource_owner_key=config["token_key"],
|
|
resource_owner_secret=config["token_secret"],
|
|
realm=realm,
|
|
signature_method="HMAC-SHA256",
|
|
)
|
|
|
|
def base_url(self, config: Mapping[str, Any]) -> str:
|
|
# the subdomain should be in format of: 12345-sb1
|
|
subdomain = config["realm"].replace("_", "-").lower()
|
|
return f"https://{subdomain}.suitetalk.api.netsuite.com"
|
|
|
|
def get_session(self, auth: OAuth1) -> requests.Session:
|
|
session = requests.Session()
|
|
session.auth = auth
|
|
return session
|
|
|
|
def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
|
|
auth = self.auth(config)
|
|
object_types = config.get("object_types")
|
|
base_url = self.base_url(config)
|
|
session = self.get_session(auth)
|
|
# if record types are specified make sure they are valid
|
|
if object_types:
|
|
# ensure there are no duplicate record types as this will break Airbyte
|
|
duplicates = [k for k, v in Counter(object_types).items() if v > 1]
|
|
if duplicates:
|
|
return False, f'Duplicate record type: {", ".join(duplicates)}'
|
|
# check connectivity to all provided `object_types`
|
|
for object in object_types:
|
|
try:
|
|
response = session.get(url=base_url + RECORD_PATH + object.lower(), params={"limit": 1})
|
|
response.raise_for_status()
|
|
return True, None
|
|
except requests.exceptions.HTTPError as e:
|
|
return False, e
|
|
else:
|
|
# if `object_types` are not provided, use `Contact` object
|
|
# there should be at least 1 contact available in every NetSuite account by default.
|
|
url = base_url + RECORD_PATH + "contact"
|
|
try:
|
|
response = session.get(url=url, params={"limit": 1})
|
|
response.raise_for_status()
|
|
return True, None
|
|
except requests.exceptions.HTTPError as e:
|
|
return False, e
|
|
|
|
def get_schemas(self, object_names: Union[List[str], str], session: requests.Session, metadata_url: str) -> Mapping[str, Any]:
|
|
"""
|
|
Handles multivariance of object_names type input and fetches the schema for each object type provided.
|
|
"""
|
|
try:
|
|
if isinstance(object_names, list):
|
|
schemas = {}
|
|
for object_name in object_names:
|
|
schemas.update(**self.fetch_schema(object_name, session, metadata_url))
|
|
return schemas
|
|
elif isinstance(object_names, str):
|
|
return self.fetch_schema(object_names, session, metadata_url)
|
|
else:
|
|
raise NotImplementedError(
|
|
f"Object Types has unknown structure, should be either `dict` or `str`, actual input: {object_names}"
|
|
)
|
|
except JSONDecodeError as e:
|
|
self.logger.error(f"Unexpected output while fetching the object schema. Full error: {e.__repr__()}")
|
|
|
|
def fetch_schema(self, object_name: str, session: requests.Session, metadata_url: str) -> Mapping[str, Any]:
|
|
"""
|
|
Calls the API for specific object type and returns schema as a dict.
|
|
"""
|
|
return {object_name.lower(): session.get(metadata_url + object_name, headers=SCHEMA_HEADERS).json()}
|
|
|
|
def generate_stream(
|
|
self,
|
|
session: requests.Session,
|
|
metadata_url: str,
|
|
schemas: dict,
|
|
object_name: str,
|
|
auth: OAuth1,
|
|
base_url: str,
|
|
start_datetime: str,
|
|
window_in_days: int,
|
|
max_retry: int = 3,
|
|
) -> Union[NetsuiteStream, IncrementalNetsuiteStream, CustomIncrementalNetsuiteStream]:
|
|
input_args = {
|
|
"auth": auth,
|
|
"object_name": object_name,
|
|
"base_url": base_url,
|
|
"start_datetime": start_datetime,
|
|
"window_in_days": window_in_days,
|
|
}
|
|
|
|
schema = schemas[object_name]
|
|
schema_props = schema.get("properties")
|
|
if schema_props:
|
|
if INCREMENTAL_CURSOR in schema_props.keys():
|
|
return IncrementalNetsuiteStream(**input_args)
|
|
elif CUSTOM_INCREMENTAL_CURSOR in schema_props.keys():
|
|
return CustomIncrementalNetsuiteStream(**input_args)
|
|
else:
|
|
# all other streams are full_refresh
|
|
return NetsuiteStream(**input_args)
|
|
else:
|
|
retry_attempt = 1
|
|
while retry_attempt <= max_retry:
|
|
self.logger.warn(f"Object `{object_name}` schema has missing `properties` key. Retry attempt: {retry_attempt}/{max_retry}")
|
|
# somethimes object metadata returns data with missing `properties` key,
|
|
# we should try to fetch metadata again to that object
|
|
schemas = self.get_schemas(object_name, session, metadata_url)
|
|
if schemas[object_name].get("properties"):
|
|
input_args.update(**{"session": session, "metadata_url": metadata_url, "schemas": schemas})
|
|
return self.generate_stream(**input_args)
|
|
retry_attempt += 1
|
|
self.logger.warn(f"Object `{object_name}` schema is not available. Skipping this stream.")
|
|
return None
|
|
|
|
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
|
auth = self.auth(config)
|
|
session = self.get_session(auth)
|
|
base_url = self.base_url(config)
|
|
metadata_url = base_url + META_PATH
|
|
object_names = config.get("object_types")
|
|
|
|
# retrieve all record types if `object_types` config field is not specified
|
|
if not object_names:
|
|
objects_metadata = session.get(metadata_url).json().get("items")
|
|
object_names = [object["name"] for object in objects_metadata]
|
|
|
|
input_args = {"session": session, "metadata_url": metadata_url}
|
|
schemas = self.get_schemas(object_names, **input_args)
|
|
input_args.update(
|
|
**{
|
|
"auth": auth,
|
|
"base_url": base_url,
|
|
"start_datetime": config["start_datetime"],
|
|
"window_in_days": config["window_in_days"],
|
|
"schemas": schemas,
|
|
}
|
|
)
|
|
# build streams
|
|
streams: list = []
|
|
for name in object_names:
|
|
stream = self.generate_stream(object_name=name.lower(), **input_args)
|
|
if stream:
|
|
streams.append(stream)
|
|
return streams
|