1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py

157 lines
6.8 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import base64
import copy
import logging
import os
from functools import wraps
from typing import Any, List, Mapping, MutableMapping, Optional
import pendulum
from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.source import TState
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import BasicHttpAuthenticator, TokenAuthenticator
from airbyte_cdk.utils import AirbyteTracedException
from source_mixpanel.streams import Export
def adapt_validate_if_testing(func):
"""
Due to API limitations (60 requests per hour) it is impossible to run acceptance tests in normal mode,
so we're reducing amount of requests by aligning start date if `AVAILABLE_TESTING_RANGE_DAYS` flag is set in env variables.
"""
@wraps(func)
def wrapper(self, config):
config = func(self, config)
available_testing_range_days = int(os.environ.get("AVAILABLE_TESTING_RANGE_DAYS", 0))
if available_testing_range_days:
logger = logging.getLogger("airbyte")
logger.info("SOURCE IN TESTING MODE, DO NOT USE IN PRODUCTION!")
if (config["end_date"] - config["start_date"]).days > available_testing_range_days:
config["start_date"] = config["end_date"].subtract(days=available_testing_range_days)
return config
return wrapper
def raise_config_error(message: str, original_error: Optional[Exception] = None):
config_error = AirbyteTracedException(message=message, internal_message=message, failure_type=FailureType.config_error)
if original_error:
raise config_error from original_error
raise config_error
class TokenAuthenticatorBase64(TokenAuthenticator):
def __init__(self, token: str):
token = base64.b64encode(token.encode("utf8")).decode("utf8")
super().__init__(token=token, auth_method="Basic")
class SourceMixpanel(YamlDeclarativeSource):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
super().__init__(catalog=catalog, config=config, state=state, **{"path_to_yaml": "manifest.yaml"})
@staticmethod
def validate_date(name: str, date_str: str, default: pendulum.date) -> pendulum.date:
if not date_str:
return default
try:
return pendulum.parse(date_str).date()
except pendulum.parsing.exceptions.ParserError as e:
raise_config_error(f"Could not parse {name}: {date_str}. Please enter a valid {name}.", e)
@adapt_validate_if_testing
def _validate_and_transform(self, config: MutableMapping[str, Any]):
(
project_timezone,
start_date,
end_date,
attribution_window,
select_properties_by_default,
region,
date_window_size,
project_id,
page_size,
export_lookback_window,
) = (
config.get("project_timezone", "US/Pacific"),
config.get("start_date"),
config.get("end_date"),
config.get("attribution_window", 5),
config.get("select_properties_by_default", True),
config.get("region", "US"),
config.get("date_window_size", 30),
config.get("credentials", dict()).get("project_id"),
config.get("page_size", 1000),
config.get("export_lookback_window", 0),
)
try:
project_timezone = pendulum.timezone(project_timezone)
except pendulum.tz.zoneinfo.exceptions.InvalidTimezone as e:
raise_config_error(f"Could not parse time zone: {project_timezone}, please enter a valid timezone.", e)
if region not in ("US", "EU"):
raise_config_error("Region must be either EU or US.")
if select_properties_by_default not in (True, False, "", None):
raise_config_error("Please provide a valid True/False value for the `Select properties by default` parameter.")
if not isinstance(attribution_window, int) or attribution_window < 0:
raise_config_error("Please provide a valid integer for the `Attribution window` parameter.")
if not isinstance(date_window_size, int) or date_window_size < 1:
raise_config_error("Please provide a valid integer for the `Date slicing window` parameter.")
if not isinstance(export_lookback_window, int) or export_lookback_window < 0:
raise_config_error("Please provide a valid integer for the `Export Lookback Window` parameter.")
auth = self.get_authenticator(config)
if isinstance(auth, TokenAuthenticatorBase64) and project_id:
config.get("credentials").pop("project_id")
if isinstance(auth, BasicHttpAuthenticator) and not isinstance(project_id, int):
raise_config_error("Required parameter 'project_id' missing or malformed. Please provide a valid project ID.")
today = pendulum.today(tz=project_timezone).date()
config["project_timezone"] = project_timezone
config["start_date"] = self.validate_date("start date", start_date, today.subtract(days=365))
config["end_date"] = self.validate_date("end date", end_date, today.subtract(days=1))
config["attribution_window"] = attribution_window
config["select_properties_by_default"] = select_properties_by_default
config["region"] = region
config["date_window_size"] = date_window_size
config["project_id"] = project_id
config["page_size"] = page_size
config["export_lookback_window"] = export_lookback_window
return config
@staticmethod
def get_authenticator(config: Mapping[str, Any]) -> TokenAuthenticator:
credentials = config["credentials"]
username = credentials.get("username")
secret = credentials.get("secret")
if username and secret:
return BasicHttpAuthenticator(username=username, password=secret)
return TokenAuthenticatorBase64(token=credentials["api_secret"])
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
credentials = config.get("credentials")
if not credentials.get("option_title"):
if credentials.get("api_secret"):
credentials["option_title"] = "Project Secret"
else:
credentials["option_title"] = "Service Account"
streams = super().streams(config=config)
config_transformed = copy.deepcopy(config)
config_transformed = self._validate_and_transform(config_transformed)
auth = self.get_authenticator(config)
streams.append(Export(authenticator=auth, **config_transformed))
return streams