1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams.py

502 lines
21 KiB
Python

# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
import json
import time
from abc import ABC
from datetime import timedelta
from functools import cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
import pendulum
import requests
from pendulum import Date
from requests.auth import AuthBase
from airbyte_cdk import BackoffStrategy
from airbyte_cdk.models import FailureType, SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, ErrorResolution, HttpStatusErrorHandler, ResponseAction
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from source_mixpanel.property_transformation import transform_property_names
from .utils import fix_date_time
class MixpanelStreamBackoffStrategy(BackoffStrategy):
def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
self.stream = stream
super().__init__(**kwargs)
def backoff_time(
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], **kwargs: Any
) -> Optional[float]:
if isinstance(response_or_exception, requests.Response):
retry_after = response_or_exception.headers.get("Retry-After")
if retry_after:
self._logger.debug(f"API responded with `Retry-After` header: {retry_after}")
return float(retry_after)
return None
class MixpanelStream(HttpStream, ABC):
"""
Formatted API Rate Limit (https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-API-Endpoints):
A maximum of 5 concurrent queries
60 queries per hour.
"""
DEFAULT_REQS_PER_HOUR_LIMIT = 60
@property
def state_checkpoint_interval(self) -> int:
# to meet the requirement of emitting state at least once per 15 minutes,
# we assume there's at least 1 record per request returned. Given that each request is followed by a 60 seconds sleep
# we'll have to emit state every 15 records
return 15
@property
def url_base(self):
prefix = "eu." if self.region == "EU" else ""
return f"https://{prefix}mixpanel.com/api/query/"
@property
def reqs_per_hour_limit(self):
# https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits
return self._reqs_per_hour_limit
@reqs_per_hour_limit.setter
def reqs_per_hour_limit(self, value):
self._reqs_per_hour_limit = value
def __init__(
self,
authenticator: AuthBase,
region: str,
project_timezone: Optional[str] = "US/Pacific",
start_date: Optional[Date] = None,
end_date: Optional[Date] = None,
date_window_size: int = 30, # in days
attribution_window: int = 0, # in days
export_lookback_window: int = 0, # in seconds
select_properties_by_default: bool = True,
project_id: int = None,
reqs_per_hour_limit: int = DEFAULT_REQS_PER_HOUR_LIMIT,
**kwargs,
):
self.start_date = start_date
self.end_date = end_date
self.date_window_size = date_window_size
self.attribution_window = attribution_window
self.export_lookback_window = export_lookback_window
self.additional_properties = select_properties_by_default
self.region = region
self.project_timezone = project_timezone
self.project_id = project_id
self._reqs_per_hour_limit = reqs_per_hour_limit
super().__init__(authenticator=authenticator)
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""Define abstract method"""
return None
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {"Accept": "application/json"}
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
json_response = response.json()
if self.data_field is not None:
data = json_response.get(self.data_field, [])
elif isinstance(json_response, list):
data = json_response
elif isinstance(json_response, dict):
data = [json_response]
for record in data:
fix_date_time(record)
yield record
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
**kwargs,
) -> Iterable[Mapping]:
# parse the whole response
yield from self.process_response(response, stream_state=stream_state, **kwargs)
if self.reqs_per_hour_limit > 0:
# we skip this block, if self.reqs_per_hour_limit = 0,
# in all other cases wait for X seconds to match API limitations
self.logger.info(f"Sleep for {3600 / self.reqs_per_hour_limit} seconds to match API limitations after reading from {self.name}")
time.sleep(3600 / self.reqs_per_hour_limit)
def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
return MixpanelStreamBackoffStrategy(stream=self)
def get_error_handler(self) -> Optional[ErrorHandler]:
return ExportErrorHandler(logger=self.logger, stream=self)
def get_stream_params(self) -> Mapping[str, Any]:
"""
Fetch required parameters in a given stream. Used to create sub-streams
"""
params = {
"authenticator": self._http_client._session.auth,
"region": self.region,
"project_timezone": self.project_timezone,
"reqs_per_hour_limit": self.reqs_per_hour_limit,
}
if self.project_id:
params["project_id"] = self.project_id
return params
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
if self.project_id:
return {"project_id": str(self.project_id)}
return {}
class IncrementalMixpanelStream(MixpanelStream, ABC):
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]:
updated_state = latest_record.get(self.cursor_field)
if updated_state:
state_value = current_stream_state.get(self.cursor_field)
if state_value:
updated_state = max(updated_state, state_value)
current_stream_state[self.cursor_field] = updated_state
return current_stream_state
class DateSlicesMixin:
raise_on_http_errors = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._timezone_mismatch = False
def parse_response(self, *args, **kwargs):
if self._timezone_mismatch:
return []
yield from super().parse_response(*args, **kwargs)
def stream_slices(
self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
# use the latest date between self.start_date and stream_state
start_date = self.start_date
cursor_value = None
if stream_state and self.cursor_field and self.cursor_field in stream_state:
# Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD')
# It also means that sync returns duplicated entries for the date from the state (date range is inclusive)
cursor_value = stream_state[self.cursor_field]
# This stream is only used for Export stream, so we use export_lookback_window here
cursor_value = (pendulum.parse(cursor_value) - timedelta(seconds=self.export_lookback_window)).to_iso8601_string()
stream_state_date = pendulum.parse(stream_state[self.cursor_field])
start_date = max(start_date, stream_state_date.date())
final_lookback_window = max(self.export_lookback_window, self.attribution_window * 24 * 60 * 60)
# move start_date back <attribution_window> days to sync data since that time as well
start_date = start_date - timedelta(seconds=final_lookback_window)
# end_date cannot be later than today
end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date())
while start_date <= end_date:
if self._timezone_mismatch:
return
current_end_date = start_date + timedelta(days=self.date_window_size - 1) # -1 is needed because dates are inclusive
stream_slice = {
"start_date": str(start_date),
"end_date": str(min(current_end_date, end_date)),
}
if cursor_value:
stream_slice[self.cursor_field] = cursor_value
yield stream_slice
# add 1 additional day because date range is inclusive
start_date = current_end_date + timedelta(days=1)
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
return {
**params,
"from_date": stream_slice["start_date"],
"to_date": stream_slice["end_date"],
}
class ExportErrorHandler(HttpStatusErrorHandler):
"""
Custom error handler for handling export errors specific to Mixpanel streams.
This handler addresses:
- 400 status code with "to_date cannot be later than today" message, indicating a potential timezone mismatch.
- ConnectionResetError during response parsing, indicating a need to retry the request.
If the response does not match these specific cases, the handler defers to the parent class's implementation.
Attributes:
stream (HttpStream): The HTTP stream associated with this error handler.
"""
def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
self.stream = stream
super().__init__(**kwargs)
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if isinstance(response_or_exception, requests.Response):
if response_or_exception.status_code == requests.codes.bad_request:
if "to_date cannot be later than today" in response_or_exception.text:
self.stream._timezone_mismatch = True
message = (
"Your project timezone must be misconfigured. Please set it to the one defined in your Mixpanel project settings. "
"Stopping current stream sync."
)
return ErrorResolution(
response_action=ResponseAction.IGNORE,
failure_type=FailureType.config_error,
error_message=message,
)
if "Unable to authenticate request" in response_or_exception.text:
message = (
f"Your credentials might have expired. Please update your config with valid credentials."
f" See more details: {response_or_exception.text}"
)
return ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message=message,
)
if response_or_exception.status_code == 402:
message = f"Unable to perform a request. Payment Required: {response_or_exception.json()['error']}"
return ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.transient_error,
error_message=message,
)
try:
# trying to parse response to avoid ConnectionResetError and retry if it occurs
self.stream.iter_dicts(response_or_exception.iter_lines(decode_unicode=True))
except ConnectionResetError:
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
)
return super().interpret_response(response_or_exception)
class ExportSchema(MixpanelStream):
"""
Export helper stream for dynamic schema extraction.
:: reqs_per_hour_limit: int - property is set to the value of 1 million,
to get the sleep time close to the zero, while generating dynamic schema.
When `reqs_per_hour_limit = 0` - it means we skip this limits.
"""
primary_key: str = None
data_field: str = None
reqs_per_hour_limit: int = 0 # see the docstring
def path(self, **kwargs) -> str:
return "events/properties/top"
def process_response(self, response: requests.Response, **kwargs) -> Iterable[str]:
"""
response.json() example:
{
"$browser": {
"count": 6
},
"$browser_version": {
"count": 6
},
"$current_url": {
"count": 6
},
"mp_lib": {
"count": 6
},
"noninteraction": {
"count": 6
},
"$event_name": {
"count": 6
},
"$duration_s": {},
"$event_count": {},
"$origin_end": {},
"$origin_start": {}
}
"""
records = response.json()
for property_name in records:
yield property_name
def iter_dicts(self, lines):
"""
The incoming stream has to be JSON lines format.
From time to time for some reason, the one record can be split into multiple lines.
We try to combine such split parts into one record only if parts go nearby.
"""
parts = []
for record_line in lines:
if record_line == "terminated early":
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
return
try:
yield json.loads(record_line)
except ValueError:
parts.append(record_line)
else:
parts = []
if len(parts) > 1:
try:
yield json.loads("".join(parts))
except ValueError:
pass
else:
parts = []
class Export(DateSlicesMixin, IncrementalMixpanelStream):
"""Export event data as it is received and stored within Mixpanel, complete with all event properties
(including distinct_id) and the exact timestamp the event was fired.
API Docs: https://developer.mixpanel.com/reference/export
Endpoint: https://data.mixpanel.com/api/2.0/export
Raw Export API Rate Limit (https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-API-Endpoints):
A maximum of 100 concurrent queries,
3 queries per second and 60 queries per hour.
"""
primary_key: str = None
cursor_field: str = "time"
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
@property
def url_base(self):
prefix = "-eu" if self.region == "EU" else ""
return f"https://data{prefix}.mixpanel.com/api/2.0/"
def path(self, **kwargs) -> str:
return "export"
def get_error_handler(self) -> Optional[ErrorHandler]:
return ExportErrorHandler(logger=self.logger, stream=self)
def iter_dicts(self, lines):
"""
The incoming stream has to be JSON lines format.
From time to time for some reason, the one record can be split into multiple lines.
We try to combine such split parts into one record only if parts go nearby.
"""
parts = []
for record_line in lines:
if record_line == "terminated early":
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
return
try:
yield json.loads(record_line)
except ValueError:
parts.append(record_line)
else:
parts = []
if len(parts) > 1:
try:
yield json.loads("".join(parts))
except ValueError:
pass
else:
parts = []
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""Export API return response in JSONL format but each line is a valid JSON object
Raw item example:
{
"event": "Viewed E-commerce Page",
"properties": {
"time": 1623860880,
"distinct_id": "1d694fd9-31a5-4b99-9eef-ae63112063ed",
"$browser": "Chrome", -> will be renamed to "browser"
"$browser_version": "91.0.4472.101",
"$current_url": "https://unblockdata.com/solutions/e-commerce/",
"$insert_id": "c5eed127-c747-59c8-a5ed-d766f48e39a4",
"$mp_api_endpoint": "api.mixpanel.com",
"mp_lib": "Segment: analytics-wordpress",
"mp_processing_time_ms": 1623886083321,
"noninteraction": true
}
}
"""
# We prefer response.iter_lines() to response.text.split_lines() as the later can missparse text properties embeding linebreaks
for record in self.iter_dicts(response.iter_lines(decode_unicode=True)):
# transform record into flat dict structure
item = {"event": record["event"]}
properties = record["properties"]
for result in transform_property_names(properties.keys()):
# Convert all values to string (this is default property type)
# because API does not provide properties type information
item[result.transformed_name] = str(properties[result.source_name])
# convert timestamp to datetime string
item["time"] = pendulum.from_timestamp(int(item["time"]), tz="UTC").to_iso8601_string()
yield item
@cache
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
Override as needed.
"""
schema = super().get_json_schema()
# Set whether to allow additional properties for engage and export endpoints
# Event and Engage properties are dynamic and depend on the properties provided on upload,
# when the Event or Engage (user/person) was created.
schema["additionalProperties"] = self.additional_properties
# read existing Export schema from API
schema_properties = ExportSchema(**self.get_stream_params()).read_records(sync_mode=SyncMode.full_refresh)
for result in transform_property_names(schema_properties):
# Schema does not provide exact property type
# string ONLY for event properties (no other datatypes)
# Reference: https://help.mixpanel.com/hc/en-us/articles/360001355266-Event-Properties#field-size-character-limits-for-event-properties
schema["properties"][result.transformed_name] = {"type": ["null", "string"]}
return schema
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
# additional filter by timestamp because required start date and end date only allow to filter by date
cursor_param = stream_slice.get(self.cursor_field)
if cursor_param:
timestamp = int(pendulum.parse(cursor_param).timestamp())
params["where"] = f'properties["$time"]>=datetime({timestamp})'
return params
def request_kwargs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {"stream": True}