Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: Christo Grabowski <108154848+ChristoGrab@users.noreply.github.com> Co-authored-by: ChristoGrab <christo.grab@gmail.com>
93 lines
3.9 KiB
Python
93 lines
3.9 KiB
Python
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
|
|
from typing import Any, Iterable, MutableMapping
|
|
|
|
import dpath
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
|
|
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
|
|
ConfigTransformation,
|
|
)
|
|
from airbyte_cdk.utils import AirbyteTracedException
|
|
from airbyte_cdk.utils.traced_exception import FailureType
|
|
|
|
|
|
class MailChimpRecordExtractorEmailActivity(DpathExtractor):
|
|
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[str, Any]]:
|
|
records = super().extract_records(response=response)
|
|
yield from ({**record, **activity_item} for record in records for activity_item in record.pop("activity", []))
|
|
|
|
|
|
class ExtractAndSetDataCenterConfigValue(ConfigTransformation):
|
|
def transform(self, config: MutableMapping[str, Any]) -> None:
|
|
"""
|
|
Extract the data center from auth credentials and add it to the config.
|
|
For API key auth, extract from the API key itself.
|
|
For OAuth, make an HTTP request to get the data center.
|
|
"""
|
|
|
|
# Exit early if the data center is already in the config
|
|
if config.get("data_center"):
|
|
return
|
|
|
|
try:
|
|
if config.get("credentials", {}).get("auth_type") == "oauth2.0":
|
|
self._extract_data_center_from_oauth(config)
|
|
else:
|
|
self._extract_data_center_from_apikey(config)
|
|
except AirbyteTracedException:
|
|
# Re-raise AirbyteTracedException as-is
|
|
raise
|
|
except Exception as e:
|
|
# Convert other exceptions to AirbyteTracedException
|
|
raise AirbyteTracedException(
|
|
failure_type=FailureType.config_error,
|
|
internal_message=f"Failed to extract data center: {str(e)}",
|
|
message=("Unable to extract data center from credentials. " "Please check your configuration and try again."),
|
|
) from e
|
|
|
|
@staticmethod
|
|
def _extract_data_center_from_oauth(config: MutableMapping[str, Any]) -> None:
|
|
"""Make a request to oauth2/metadata endpoint to get the data center."""
|
|
access_token = config.get("credentials", {}).get("access_token")
|
|
|
|
response = requests.get(
|
|
"https://login.mailchimp.com/oauth2/metadata", headers={"Authorization": f"OAuth {access_token}"}, timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Mailchimp returns a 200 response with an error key if the token is invalid
|
|
error = response.json().get("error")
|
|
if error == "invalid_token":
|
|
raise AirbyteTracedException(
|
|
failure_type=FailureType.config_error,
|
|
internal_message=error,
|
|
message=("The access token you provided was invalid. " "Please check your credentials and try again."),
|
|
)
|
|
|
|
# Extract data center from the "dc" field
|
|
data_center = response.json().get("dc")
|
|
if data_center:
|
|
dpath.new(config, ["data_center"], data_center)
|
|
|
|
@staticmethod
|
|
def _extract_data_center_from_apikey(config: MutableMapping[str, Any]) -> None:
|
|
"""Extract the data center directly from the API key."""
|
|
|
|
# Backward compatibility - check for API key at top level
|
|
if config.get("apikey"):
|
|
api_key = config["apikey"]
|
|
if api_key and "-" in api_key:
|
|
# API key format: "prefix-datacenter"
|
|
data_center = api_key.split("-")[-1]
|
|
dpath.new(config, ["data_center"], data_center)
|
|
return
|
|
|
|
# API key flow - extract data center from API key
|
|
api_key = config.get("credentials", {}).get("apikey")
|
|
if api_key and "-" in api_key:
|
|
# API key format: "prefix-datacenter"
|
|
data_center = api_key.split("-")[-1]
|
|
dpath.new(config, ["data_center"], data_center)
|