1
0
mirror of synced 2025-12-30 12:04:43 -05:00
Files
airbyte/airbyte-integrations/connectors/source-paystack/source_paystack/streams.py
folusoogunlana 0e70491e0f 🎉 New Source: Paystack 🌟 (#7214)
* feat(67): add support for 'spec' using Python HTTP API source template and stripe as an example

* chore(67): add sample state and config

* feat(67): add check functionality for paystack source by fetching first customer

* feat(67): add support for discover and read customer stream

* feat(67): add paystack source connector to UI

* feat(67): update source definitions to use 0.1.0

* Hacktoberfest 67 paystack source (#1)

* feat(67): add support for 'spec' using Python HTTP API source template and stripe as an example

* chore(67): add sample state and config

* feat(67): add check functionality for paystack source by fetching first customer

* feat(67): add support for discover and read customer stream

* feat(67): add paystack source connector to UI

* feat(67): update source definitions to use 0.1.0

Co-authored-by: Foluso Ogunlana <foluso_ogunlana@stearsng.com>

* feat(67): update stream state cursor field to be integer and to match API record field name

* chore(67): add unit tests for source and streams

* chore(67): store formatted date time in state to match type of catalog

* chore(67): add configuration for acceptance integration tests

* docs(67): update docs and summary with paystack

* chore(67): add essential schemas to be catalogued for new streams

* feat(67): add support for critical streams - transactions subscriptions transfers refunds settlements

* docs(67): update image and bootstrap

* chore(67): update builds.md to include paystack badge

* docs(67): add changelog and source definition JSON file

* docs(67): add paystack to integrations readme

* chore(67): update check_connection to airbyte standard

* refactor to simplify streams and remove constants file

* fix(67): correct "null, null" values in schemas

* chore(67): update file formatting with gradle format

Co-authored-by: Foluso <5675998+foogunlana@users.noreply.github.com>
2021-11-01 23:37:00 +05:30

180 lines
5.3 KiB
Python

#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import math
from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, MutableMapping, Optional
import pendulum
import requests
from airbyte_cdk.sources.streams.http import HttpStream
class PaystackStream(HttpStream, ABC):
url_base = "https://api.paystack.co/"
primary_key = "id"
def __init__(self, start_date: str, **kwargs):
super().__init__(**kwargs)
self.start_date = pendulum.parse(start_date).int_timestamp
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
decoded_response = response.json()
page = decoded_response["meta"]["page"]
pageCount = decoded_response["meta"]["pageCount"]
if page < pageCount:
return {"page": page + 1}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {"perPage": 200}
if next_page_token:
params.update(next_page_token)
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield from response.json().get("data", []) # Paystack puts records in a container array "data"
class IncrementalPaystackStream(PaystackStream, ABC):
# Paystack (like Stripe) returns most recently created objects first, so we don't want to persist state until the entire stream has been read
state_checkpoint_interval = math.inf
def __init__(self, lookback_window_days: int = 0, **kwargs):
super().__init__(**kwargs)
self.lookback_window_days = lookback_window_days
@property
@abstractmethod
def cursor_field(self) -> str:
"""
Defining a cursor field indicates that a stream is incremental, so any incremental stream must extend this class
and define a cursor field.
"""
pass
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
and returning an updated state object.
"""
latest_record_created = latest_record.get(self.cursor_field)
return {
self.cursor_field: max(
latest_record_created,
current_stream_state.get(self.cursor_field, None),
key=lambda d: pendulum.parse(d).int_timestamp if d else 0,
)
}
def request_params(self, stream_state: Mapping[str, Any] = None, **kwargs):
stream_state = stream_state or {}
params = super().request_params(stream_state=stream_state, **kwargs)
params["from"] = self._get_start_date(stream_state)
return params
def _get_start_date(self, stream_state) -> str:
start_point = self.start_date
if stream_state and self.cursor_field in stream_state:
stream_record_created = stream_state[self.cursor_field]
start_point = max(start_point, pendulum.parse(stream_record_created).int_timestamp)
if start_point and self.lookback_window_days:
self.logger.info(f"Applying lookback window of {self.lookback_window_days} days to stream {self.name}")
start_point = pendulum.from_timestamp(start_point).subtract(days=abs(self.lookback_window_days)).int_timestamp
return pendulum.from_timestamp(start_point).isoformat().replace("+00:00", "Z")
class Customers(IncrementalPaystackStream):
"""
API docs: https://paystack.com/docs/api/#customer-list
"""
cursor_field = "createdAt"
def path(self, **kwargs) -> str:
return "customer"
class Disputes(IncrementalPaystackStream):
"""
API docs: https://paystack.com/docs/api/#dispute-list
"""
cursor_field = "createdAt"
def path(self, **kwargs) -> str:
return "dispute"
class Invoices(IncrementalPaystackStream):
"""
API docs: https://paystack.com/docs/api/#invoice-list
"""
cursor_field = "created_at"
def path(self, **kwargs) -> str:
return "paymentrequest"
class Refunds(IncrementalPaystackStream):
"""
API docs: https://paystack.com/docs/api/#refund-list
"""
cursor_field = "createdAt"
def path(self, **kwargs) -> str:
return "refund"
class Settlements(IncrementalPaystackStream):
"""
API docs: https://paystack.com/docs/api/#settlement
"""
cursor_field = "createdAt"
def path(self, **kwargs) -> str:
return "settlement"
class Subscriptions(IncrementalPaystackStream):
"""
API docs: https://paystack.com/docs/api/#subscription-list
"""
cursor_field = "createdAt"
def path(self, **kwargs) -> str:
return "subscription"
class Transactions(IncrementalPaystackStream):
"""
API docs: https://paystack.com/docs/api/#transaction-list
"""
cursor_field = "createdAt"
def path(self, **kwargs) -> str:
return "transaction"
class Transfers(IncrementalPaystackStream):
"""
API docs: https://paystack.com/docs/api/#transfer-list
"""
cursor_field = "createdAt"
def path(self, **kwargs) -> str:
return "transfer"