🐛 Source Stripe: add availability strategy (#28911)
* Connector health: source hubspot, gitlab, snapchat-marketing: fix builds * source stripe: add availaility strategy * upd changelog * update error message map * update availability strategy
This commit is contained in:
@@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
|
||||
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
|
||||
|
||||
|
||||
LABEL io.airbyte.version=3.17.1
|
||||
LABEL io.airbyte.version=3.17.2
|
||||
LABEL io.airbyte.name=airbyte/source-stripe
|
||||
|
||||
@@ -5,7 +5,7 @@ data:
|
||||
connectorSubtype: api
|
||||
connectorType: source
|
||||
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
|
||||
dockerImageTag: 3.17.1
|
||||
dockerImageTag: 3.17.2
|
||||
dockerRepository: airbyte/source-stripe
|
||||
githubIssueLabel: source-stripe
|
||||
icon: stripe.svg
|
||||
|
||||
@@ -8,6 +8,35 @@ from typing import Optional, Tuple
|
||||
from airbyte_cdk.sources import Source
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
|
||||
from requests import HTTPError
|
||||
|
||||
STRIPE_ERROR_CODES = {
|
||||
"more_permissions_required": "This is most likely due to insufficient permissions on the credentials in use. "
|
||||
"Try to grant required permissions/scopes or re-authenticate",
|
||||
"account_invalid": "The card, or account the card is connected to, is invalid. You need to contact your card issuer "
|
||||
"to check that the card is working correctly.",
|
||||
"oauth_not_supported": "Please use a different authentication method.",
|
||||
}
|
||||
|
||||
|
||||
class StripeAvailabilityStrategy(HttpAvailabilityStrategy):
|
||||
def handle_http_error(
|
||||
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
|
||||
) -> Tuple[bool, Optional[str]]:
|
||||
status_code = error.response.status_code
|
||||
if status_code not in [400, 403]:
|
||||
raise error
|
||||
parsed_error = error.response.json()
|
||||
error_code = parsed_error.get("error", {}).get("code")
|
||||
error_message = STRIPE_ERROR_CODES.get(error_code, parsed_error.get("error", {}).get("message"))
|
||||
if not error_message:
|
||||
raise error
|
||||
doc_ref = self._visit_docs_message(logger, source)
|
||||
reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {error_message}. {doc_ref} "
|
||||
response_error_message = stream.parse_response_error_message(error.response)
|
||||
if response_error_message:
|
||||
reason += response_error_message
|
||||
return False, reason
|
||||
|
||||
|
||||
class StripeSubStreamAvailabilityStrategy(HttpAvailabilityStrategy):
|
||||
|
||||
@@ -666,27 +666,7 @@
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"shipping_address": {
|
||||
"type": ["null", "object"],
|
||||
"properties": {
|
||||
"city": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"country": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"line1": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"line2": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"postal_code": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"state": {
|
||||
"type": ["null", "string"]
|
||||
}
|
||||
}
|
||||
"$ref": "address.json"
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -701,27 +681,7 @@
|
||||
"type": ["null", "object"],
|
||||
"properties": {
|
||||
"billing_address": {
|
||||
"type": ["null", "object"],
|
||||
"properties": {
|
||||
"city": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"country": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"line1": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"line2": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"postal_code": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"state": {
|
||||
"type": ["null", "string"]
|
||||
}
|
||||
}
|
||||
"$ref": "address.json"
|
||||
},
|
||||
"email": {
|
||||
"type": ["null", "string"]
|
||||
@@ -730,27 +690,7 @@
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"shipping_address": {
|
||||
"type": ["null", "object"],
|
||||
"properties": {
|
||||
"city": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"country": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"line1": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"line2": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"postal_code": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"state": {
|
||||
"type": ["null", "string"]
|
||||
}
|
||||
}
|
||||
"$ref": "address.json"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,9 @@
|
||||
"amount_details": {
|
||||
"type": ["null", "object"],
|
||||
"properties": {
|
||||
"atm_fee": ["null", "integer"]
|
||||
"atm_fee": {
|
||||
"type": ["null", "integer"]
|
||||
}
|
||||
}
|
||||
},
|
||||
"authorization": {
|
||||
|
||||
@@ -13,14 +13,8 @@ from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
||||
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
|
||||
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
|
||||
from source_stripe.availability_strategy import StripeSubStreamAvailabilityStrategy
|
||||
from source_stripe.availability_strategy import StripeAvailabilityStrategy, StripeSubStreamAvailabilityStrategy
|
||||
|
||||
STRIPE_ERROR_CODES: List = [
|
||||
# stream requires additional permissions
|
||||
"more_permissions_required",
|
||||
# account_id doesn't have the access to the stream
|
||||
"account_invalid",
|
||||
]
|
||||
STRIPE_API_VERSION = "2022-11-15"
|
||||
|
||||
|
||||
@@ -30,6 +24,10 @@ class StripeStream(HttpStream, ABC):
|
||||
DEFAULT_SLICE_RANGE = 365
|
||||
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
|
||||
|
||||
@property
|
||||
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
|
||||
return StripeAvailabilityStrategy()
|
||||
|
||||
def __init__(self, start_date: int, account_id: str, slice_range: int = DEFAULT_SLICE_RANGE, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.account_id = account_id
|
||||
@@ -66,27 +64,6 @@ class StripeStream(HttpStream, ABC):
|
||||
response_json = response.json()
|
||||
yield from response_json.get("data", []) # Stripe puts records in a container array "data"
|
||||
|
||||
def read_records(
|
||||
self,
|
||||
sync_mode: SyncMode,
|
||||
cursor_field: List[str] = None,
|
||||
stream_slice: Mapping[str, Any] = None,
|
||||
stream_state: Mapping[str, Any] = None,
|
||||
) -> Iterable[Mapping[str, Any]]:
|
||||
try:
|
||||
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
|
||||
except requests.exceptions.HTTPError as e:
|
||||
status_code = e.response.status_code
|
||||
parsed_error = e.response.json()
|
||||
error_code = parsed_error.get("error", {}).get("code")
|
||||
error_message = parsed_error.get("message")
|
||||
# if the API Key doesn't have required permissions to particular stream, this stream will be skipped
|
||||
if status_code == 403 and error_code in STRIPE_ERROR_CODES:
|
||||
self.logger.warn(f"Stream {self.name} is skipped, due to {error_code}. Full message: {error_message}")
|
||||
pass
|
||||
else:
|
||||
self.logger.error(f"Syncing stream {self.name} is failed, due to {error_code}. Full message: {error_message}")
|
||||
|
||||
|
||||
class BasePaginationStripeStream(StripeStream, ABC):
|
||||
def request_params(
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
#
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import pytest
|
||||
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def disable_cache(mocker):
|
||||
mocker.patch(
|
||||
"source_stripe.streams.Customers.use_cache",
|
||||
new_callable=mocker.PropertyMock,
|
||||
return_value=False
|
||||
)
|
||||
mocker.patch(
|
||||
"source_stripe.streams.Transfers.use_cache",
|
||||
new_callable=mocker.PropertyMock,
|
||||
return_value=False
|
||||
)
|
||||
mocker.patch(
|
||||
"source_stripe.streams.Subscriptions.use_cache",
|
||||
new_callable=mocker.PropertyMock,
|
||||
return_value=False
|
||||
)
|
||||
mocker.patch(
|
||||
"source_stripe.streams.SubscriptionItems.use_cache",
|
||||
new_callable=mocker.PropertyMock,
|
||||
return_value=False
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="config")
|
||||
def config_fixture():
|
||||
config = {"client_secret": "sk_test(live)_<secret>",
|
||||
"account_id": "<account_id>", "start_date": "2020-05-01T00:00:00Z"}
|
||||
return config
|
||||
|
||||
|
||||
@pytest.fixture(name="stream_args")
|
||||
def stream_args_fixture():
|
||||
authenticator = TokenAuthenticator("sk_test(live)_<secret>")
|
||||
args = {
|
||||
"authenticator": authenticator,
|
||||
"account_id": "<account_id>",
|
||||
"start_date": 1588315041,
|
||||
"slice_range": 365,
|
||||
}
|
||||
return args
|
||||
@@ -45,30 +45,16 @@ def test_source_streams():
|
||||
assert len(streams) == 46
|
||||
|
||||
|
||||
@pytest.fixture(name="config")
|
||||
def config_fixture():
|
||||
config = {"client_secret": "sk_test(live)_<secret>",
|
||||
"account_id": "<account_id>", "start_date": "2020-05-01T00:00:00Z"}
|
||||
return config
|
||||
|
||||
|
||||
@pytest.fixture(name="logger_mock")
|
||||
def logger_mock_fixture():
|
||||
return patch("source_tiktok_marketing.source.logger")
|
||||
@patch.object(source_stripe.source, "stripe")
|
||||
def test_source_check_connection_ok(mocked_client, config):
|
||||
assert SourceStripe().check_connection(None, config=config) == (True, None)
|
||||
|
||||
|
||||
@patch.object(source_stripe.source, "stripe")
|
||||
def test_source_check_connection_ok(mocked_client, config, logger_mock):
|
||||
assert SourceStripe().check_connection(
|
||||
logger_mock, config=config) == (True, None)
|
||||
|
||||
|
||||
@patch.object(source_stripe.source, "stripe")
|
||||
def test_source_check_connection_failure(mocked_client, config, logger_mock):
|
||||
def test_source_check_connection_failure(mocked_client, config):
|
||||
exception = Exception("Test")
|
||||
mocked_client.Account.retrieve = Mock(side_effect=exception)
|
||||
assert SourceStripe().check_connection(
|
||||
logger_mock, config=config) == (False, exception)
|
||||
assert SourceStripe().check_connection(None, config=config) == (False, exception)
|
||||
|
||||
|
||||
@patch.object(source_stripe.source, "stripe")
|
||||
|
||||
@@ -2,9 +2,12 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import logging
|
||||
|
||||
import pendulum
|
||||
import pytest
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from source_stripe.availability_strategy import STRIPE_ERROR_CODES
|
||||
from source_stripe.streams import (
|
||||
ApplicationFees,
|
||||
ApplicationFeesRefunds,
|
||||
@@ -152,12 +155,6 @@ def test_sub_stream(requests_mock):
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(name="config")
|
||||
def config_fixture():
|
||||
config = {"authenticator": "authenticator", "account_id": "<account_id>", "start_date": 1596466368}
|
||||
return config
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"stream_cls, kwargs, expected",
|
||||
[
|
||||
@@ -198,9 +195,9 @@ def test_path_and_headers(
|
||||
stream_cls,
|
||||
kwargs,
|
||||
expected,
|
||||
config,
|
||||
stream_args,
|
||||
):
|
||||
stream = stream_cls(**config)
|
||||
stream = stream_cls(**stream_args)
|
||||
assert stream.path(**kwargs) == expected
|
||||
headers = stream.request_headers(**kwargs)
|
||||
assert headers["Stripe-Version"] == "2022-11-15"
|
||||
@@ -241,6 +238,44 @@ def test_request_params(
|
||||
stream,
|
||||
kwargs,
|
||||
expected,
|
||||
config,
|
||||
stream_args,
|
||||
):
|
||||
assert stream(**config).request_params(**kwargs) == expected
|
||||
assert stream(**stream_args).request_params(**kwargs) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"stream_cls",
|
||||
(
|
||||
ApplicationFees,
|
||||
Customers,
|
||||
BalanceTransactions,
|
||||
Charges,
|
||||
Coupons,
|
||||
Disputes,
|
||||
Events,
|
||||
Invoices,
|
||||
InvoiceItems,
|
||||
Payouts,
|
||||
Plans,
|
||||
Prices,
|
||||
Products,
|
||||
Subscriptions,
|
||||
SubscriptionSchedule,
|
||||
Transfers,
|
||||
Refunds,
|
||||
PaymentIntents,
|
||||
CheckoutSessions,
|
||||
PromotionCodes,
|
||||
ExternalAccount,
|
||||
SetupIntents,
|
||||
ShippingRates
|
||||
)
|
||||
)
|
||||
def test_403_error_handling(stream_args, stream_cls, requests_mock):
|
||||
stream = stream_cls(**stream_args)
|
||||
logger = logging.getLogger("airbyte")
|
||||
for error_code in STRIPE_ERROR_CODES:
|
||||
requests_mock.get(f"{stream.url_base}{stream.path()}", status_code=403, json={"error": {"code": f"{error_code}"}})
|
||||
available, message = stream.check_availability(logger)
|
||||
assert not available
|
||||
assert STRIPE_ERROR_CODES[error_code] in message
|
||||
|
||||
@@ -104,6 +104,7 @@ The Stripe connector should not run into Stripe API limitations under normal usa
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------| :------------------------------------------------------- |:-----------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 3.17.2 | 2023-08-01 | [28911](https://github.com/airbytehq/airbyte/pull/28911) | Fix stream schemas, remove custom 403 error handling |
|
||||
| 3.17.1 | 2023-08-01 | [28887](https://github.com/airbytehq/airbyte/pull/28887) | Fix `Invoices` schema |
|
||||
| 3.17.0 | 2023-07-28 | [26127](https://github.com/airbytehq/airbyte/pull/26127) | Add `Prices` stream |
|
||||
| 3.16.0 | 2023-07-27 | [28776](https://github.com/airbytehq/airbyte/pull/28776) | Add new fields to stream schemas |
|
||||
|
||||
Reference in New Issue
Block a user