1
0
mirror of synced 2025-12-23 11:57:55 -05:00
Files
airbyte/airbyte-integrations/connectors/source-tplcentral/source_tplcentral/source.py
Juozas V 22dcb0f674 🎉 New Source: 3PL Central (#7322)
* Add source-tplcentral

* Add source definition

* Add missing file

* Update catalog

* Update sample config

* Update formatting

* Update invalid config

* Fix primary key identifier

* Update naming

* Rename variable

* Parametrize page size

* Implement Items stream

* Update schemas

* Update response normalization

* Add arrow

* Implement items stream cursor

* Fix types

* Cleanup

* Add customers stream

* Fix items sort

* Add stock_details stream

* Add inventory stream

* Add orders stream

* Refactor streams to a separate file

* Update cursor handling

* Refactor request_params

* Simplify

* Update params generation

* Update response parse

* Refactor stream+slices

* Refactor parse_response

* Cleanup

* New order

* Fix style

* Refactor parse_reponse

* Fix cursor

* Don't fetch next page after the last

* Fix schema errors

* Flatten shared schemas

* Inline refs

* Fix handling None stream_state

* Update schemas

* Implement primary_key and cursor fields

* Fix style

* Fix style

* Fix schemas

* Fix deep_map

* Fix items schema

* Fix cursor formatting

* Update integration tests assets

* Update README.md

* Remove TODO.md

* Update spec.json

* Cleanup

* Add bootstrap.md

* Increase page sizes

* Increase state checkpoint interval

* Add documentation

* Update links to the documentation

* Add build status badge

* Implement test_source.py

* Improve code testability

* Add stream tests

* Implement test_incremental_streams

* Add requests-mock dependency

* Fix formatting

* Update author

* Run ./gradlew format

* Cleanup failed merge artifacts

* Update sample_state.json

* Add stream documentation urls

* Cleanup

* Preserve upstream naming

* Fix primary key

* Fix configured catalog

* Update schemas

* Update catalog.json

* Fix tests

* Add schema source files

* Split configured catalogs

* Cleanup

* Update documentationUrl

* Add new properties

* Run gradlew format

* Remove additionalProperties: false from the schemas

* Revert "Remove additionalProperties: false from the schemas"

This reverts commit d4e8fea2fd.
2021-12-28 13:35:36 +05:30

93 lines
2.9 KiB
Python

#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
from typing import Any, List, Mapping, MutableMapping, Tuple
import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
from requests.auth import HTTPBasicAuth
from source_tplcentral.streams import Customers, Inventory, Items, Orders, StockDetails, StockSummaries
class TplcentralAuthenticator(Oauth2Authenticator):
def __init__(
self,
token_refresh_endpoint: str,
client_id: str,
client_secret: str,
user_login_id: int = None,
user_login: str = None,
):
super().__init__(
token_refresh_endpoint=token_refresh_endpoint,
client_id=client_id,
client_secret=client_secret,
refresh_token=None,
)
self.user_login_id = user_login_id
self.user_login = user_login
def get_refresh_request_body(self) -> Mapping[str, Any]:
payload: MutableMapping[str, Any] = {
"grant_type": "client_credentials",
}
if self.scopes:
payload["scopes"] = self.scopes
if self.user_login_id:
payload["user_login_id"] = self.user_login_id
if self.user_login:
payload["user_login"] = self.user_login
return payload
def refresh_access_token(self) -> Tuple[str, int]:
try:
response = requests.post(
self.token_refresh_endpoint, auth=HTTPBasicAuth(self.client_id, self.client_secret), json=self.get_refresh_request_body()
)
response.raise_for_status()
response_json = response.json()
return response_json[self.access_token_name], response_json[self.expires_in_name]
except Exception as e:
raise Exception(f"Error while refreshing access token: {e}") from e
class SourceTplcentral(AbstractSource):
def _auth(self, config):
return TplcentralAuthenticator(
token_refresh_endpoint=f"{config['url_base']}AuthServer/api/Token",
client_id=config["client_id"],
client_secret=config["client_secret"],
user_login_id=config.get("user_login_id"),
user_login=config.get("user_login"),
)
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
try:
self._auth(config).get_auth_header()
except Exception as e:
return None, e
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
config["authenticator"] = self._auth(config)
return [
StockSummaries(config),
Customers(config),
Items(config),
StockDetails(config),
Inventory(config),
Orders(config),
]