Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Alfredo Garcia <freddy.garcia7.fg@gmail.com>
158 lines
6.2 KiB
Python
158 lines
6.2 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import base64
|
|
import codecs
|
|
import hashlib
|
|
import hmac
|
|
import logging
|
|
import urllib.parse
|
|
from enum import Enum
|
|
from functools import wraps
|
|
from typing import Any, List, Mapping, Tuple
|
|
|
|
import pendulum
|
|
import requests
|
|
from pendulum.parsing.exceptions import ParserError
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
from airbyte_cdk.sources import AbstractSource
|
|
from airbyte_cdk.sources.streams import Stream
|
|
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
|
|
|
|
from .streams import Addresses, CustomersCart, OrderItems, OrderPayments, Orders, OrderStatuses, Products
|
|
|
|
|
|
class AuthMethod(Enum):
|
|
CENTRAL_API_ROUTER = 1
|
|
SINGLE_STORE_ACCESS_TOKEN = 2
|
|
|
|
|
|
class CustomHeaderAuthenticator(AbstractHeaderAuthenticator):
|
|
def __init__(self, access_token, store_name):
|
|
self.auth_method = AuthMethod.SINGLE_STORE_ACCESS_TOKEN
|
|
self._store_name = store_name
|
|
self._access_token = access_token
|
|
|
|
def get_auth_header(self) -> Mapping[str, Any]:
|
|
return {"X-AC-Auth-Token": self._access_token}
|
|
|
|
def url_base(self) -> str:
|
|
return f"https://{self._store_name}/api/v1/"
|
|
|
|
def extra_params(self, stream, params):
|
|
return {}
|
|
|
|
|
|
class CentralAPIHeaderAuthenticator(AbstractHeaderAuthenticator):
|
|
def __init__(self, user_name, user_secret, site_id):
|
|
self.auth_method = AuthMethod.CENTRAL_API_ROUTER
|
|
self.user_name = user_name
|
|
self.user_secret = user_secret
|
|
self.site_id = site_id
|
|
|
|
def get_auth_header(self) -> Mapping[str, Any]:
|
|
"""
|
|
This method is not implemented here because for the Central API Router
|
|
needs to build the header for each request based
|
|
on path + parameters (next token, pagination, page size)
|
|
To solve this the logic was moved to `request_headers` in CartStream class.
|
|
"""
|
|
return {}
|
|
|
|
def url_base(self) -> str:
|
|
return "https://public.americommerce.com/api/v1/"
|
|
|
|
def extra_params(self, stream, params):
|
|
return self.generate_auth_signature(stream, params)
|
|
|
|
def generate_auth_signature(self, stream, params) -> Mapping[str, Any]:
|
|
"""
|
|
How to build signature:
|
|
1. build a string concatenated with:
|
|
request method (uppercase) & request path and query & provisioning user name
|
|
example: GET&/api/v1/customers&myUser
|
|
2. Generate HMACSHA256 hash using this string as the input, and the provisioning user secret as the key
|
|
3. Base64 this hash to be used as the final value in the header
|
|
"""
|
|
path_with_params = f"/api/v1/{stream.path()}?{urllib.parse.urlencode(params)}"
|
|
msg = codecs.encode(f"GET&{path_with_params}&{self.user_name}")
|
|
key = codecs.encode(self.user_secret)
|
|
dig = hmac.new(key=key, msg=msg, digestmod=hashlib.sha256).digest()
|
|
auth_signature = base64.b64encode(dig).decode()
|
|
return {"X-AC-PUB-Site-ID": self.site_id, "X-AC-PUB-User": self.user_name, "X-AC-PUB-Auth-Signature": auth_signature}
|
|
|
|
|
|
class SourceCart(AbstractSource):
|
|
def validate_config_values(func):
|
|
"""Check input config values for check_connection and stream functions. It will raise an exception if there is an parsing error"""
|
|
|
|
@wraps(func)
|
|
def decorator(self_, *args, **kwargs):
|
|
for arg in args:
|
|
if isinstance(arg, Mapping):
|
|
try:
|
|
# parse date strings by the pendulum library. It will raise the exception ParserError if it is some format mistakes.
|
|
pendulum.parse(arg["start_date"])
|
|
# try to check an end_date value. It can be ussed for different CI tests
|
|
end_date = arg.get("end_date")
|
|
if end_date:
|
|
pendulum.parse(end_date)
|
|
except ParserError as e:
|
|
raise Exception(f"{str(e)}. Example: 2021-01-01T00:00:00Z")
|
|
break
|
|
|
|
return func(self_, *args, **kwargs)
|
|
|
|
return decorator
|
|
|
|
def get_auth(self, config):
|
|
credentials = config.get("credentials", {})
|
|
auth_method = credentials.get("auth_type")
|
|
|
|
if auth_method == AuthMethod.CENTRAL_API_ROUTER.name:
|
|
authenticator = CentralAPIHeaderAuthenticator(
|
|
user_name=credentials["user_name"], user_secret=credentials["user_secret"], site_id=credentials["site_id"]
|
|
)
|
|
elif auth_method == AuthMethod.SINGLE_STORE_ACCESS_TOKEN.name:
|
|
authenticator = CustomHeaderAuthenticator(access_token=credentials["access_token"], store_name=credentials["store_name"])
|
|
else:
|
|
raise NotImplementedError(f"Authentication method: {auth_method} not implemented.")
|
|
|
|
return authenticator
|
|
|
|
@validate_config_values
|
|
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
|
|
try:
|
|
authenticator = self.get_auth(config)
|
|
stream = Products(authenticator=authenticator, start_date=config["start_date"])
|
|
records = stream.read_records(sync_mode=SyncMode.full_refresh)
|
|
next(records)
|
|
return True, None
|
|
except Exception as e:
|
|
if isinstance(e, requests.exceptions.HTTPError) and e.response.status_code == 401:
|
|
return False, f"Please check your access token. Error: {repr(e)}"
|
|
if isinstance(e, requests.exceptions.ConnectionError):
|
|
err_message = f"Please check your `store_name` or internet connection. Error: {repr(e)}"
|
|
return False, err_message
|
|
return False, repr(e)
|
|
|
|
@validate_config_values
|
|
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
|
authenticator = self.get_auth(config)
|
|
args = {
|
|
"authenticator": authenticator,
|
|
"start_date": config["start_date"],
|
|
"end_date": config.get("end_date"),
|
|
}
|
|
return [
|
|
CustomersCart(**args),
|
|
Orders(**args),
|
|
OrderPayments(**args),
|
|
OrderStatuses(**args),
|
|
OrderItems(**args),
|
|
Products(**args),
|
|
Addresses(**args),
|
|
]
|