502 lines
21 KiB
Python
502 lines
21 KiB
Python
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
|
|
|
import json
|
|
import time
|
|
from abc import ABC
|
|
from datetime import timedelta
|
|
from functools import cache
|
|
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
|
|
|
|
import pendulum
|
|
import requests
|
|
from pendulum import Date
|
|
from requests.auth import AuthBase
|
|
|
|
from airbyte_cdk import BackoffStrategy
|
|
from airbyte_cdk.models import FailureType, SyncMode
|
|
from airbyte_cdk.sources.streams.http import HttpStream
|
|
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, ErrorResolution, HttpStatusErrorHandler, ResponseAction
|
|
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
|
|
from source_mixpanel.property_transformation import transform_property_names
|
|
|
|
from .utils import fix_date_time
|
|
|
|
|
|
class MixpanelStreamBackoffStrategy(BackoffStrategy):
|
|
def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
|
|
self.stream = stream
|
|
super().__init__(**kwargs)
|
|
|
|
def backoff_time(
|
|
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], **kwargs: Any
|
|
) -> Optional[float]:
|
|
if isinstance(response_or_exception, requests.Response):
|
|
retry_after = response_or_exception.headers.get("Retry-After")
|
|
if retry_after:
|
|
self._logger.debug(f"API responded with `Retry-After` header: {retry_after}")
|
|
return float(retry_after)
|
|
return None
|
|
|
|
|
|
class MixpanelStream(HttpStream, ABC):
|
|
"""
|
|
Formatted API Rate Limit (https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-API-Endpoints):
|
|
A maximum of 5 concurrent queries
|
|
60 queries per hour.
|
|
"""
|
|
|
|
DEFAULT_REQS_PER_HOUR_LIMIT = 60
|
|
|
|
@property
|
|
def state_checkpoint_interval(self) -> int:
|
|
# to meet the requirement of emitting state at least once per 15 minutes,
|
|
# we assume there's at least 1 record per request returned. Given that each request is followed by a 60 seconds sleep
|
|
# we'll have to emit state every 15 records
|
|
return 15
|
|
|
|
@property
|
|
def url_base(self):
|
|
prefix = "eu." if self.region == "EU" else ""
|
|
return f"https://{prefix}mixpanel.com/api/query/"
|
|
|
|
@property
|
|
def reqs_per_hour_limit(self):
|
|
# https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits
|
|
return self._reqs_per_hour_limit
|
|
|
|
@reqs_per_hour_limit.setter
|
|
def reqs_per_hour_limit(self, value):
|
|
self._reqs_per_hour_limit = value
|
|
|
|
def __init__(
|
|
self,
|
|
authenticator: AuthBase,
|
|
region: str,
|
|
project_timezone: Optional[str] = "US/Pacific",
|
|
start_date: Optional[Date] = None,
|
|
end_date: Optional[Date] = None,
|
|
date_window_size: int = 30, # in days
|
|
attribution_window: int = 0, # in days
|
|
export_lookback_window: int = 0, # in seconds
|
|
select_properties_by_default: bool = True,
|
|
project_id: int = None,
|
|
reqs_per_hour_limit: int = DEFAULT_REQS_PER_HOUR_LIMIT,
|
|
**kwargs,
|
|
):
|
|
self.start_date = start_date
|
|
self.end_date = end_date
|
|
self.date_window_size = date_window_size
|
|
self.attribution_window = attribution_window
|
|
self.export_lookback_window = export_lookback_window
|
|
self.additional_properties = select_properties_by_default
|
|
self.region = region
|
|
self.project_timezone = project_timezone
|
|
self.project_id = project_id
|
|
self._reqs_per_hour_limit = reqs_per_hour_limit
|
|
super().__init__(authenticator=authenticator)
|
|
|
|
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
|
"""Define abstract method"""
|
|
return None
|
|
|
|
def request_headers(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> Mapping[str, Any]:
|
|
return {"Accept": "application/json"}
|
|
|
|
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
json_response = response.json()
|
|
if self.data_field is not None:
|
|
data = json_response.get(self.data_field, [])
|
|
elif isinstance(json_response, list):
|
|
data = json_response
|
|
elif isinstance(json_response, dict):
|
|
data = [json_response]
|
|
|
|
for record in data:
|
|
fix_date_time(record)
|
|
yield record
|
|
|
|
def parse_response(
|
|
self,
|
|
response: requests.Response,
|
|
stream_state: Mapping[str, Any],
|
|
**kwargs,
|
|
) -> Iterable[Mapping]:
|
|
# parse the whole response
|
|
yield from self.process_response(response, stream_state=stream_state, **kwargs)
|
|
|
|
if self.reqs_per_hour_limit > 0:
|
|
# we skip this block, if self.reqs_per_hour_limit = 0,
|
|
# in all other cases wait for X seconds to match API limitations
|
|
self.logger.info(f"Sleep for {3600 / self.reqs_per_hour_limit} seconds to match API limitations after reading from {self.name}")
|
|
time.sleep(3600 / self.reqs_per_hour_limit)
|
|
|
|
def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
|
|
return MixpanelStreamBackoffStrategy(stream=self)
|
|
|
|
def get_error_handler(self) -> Optional[ErrorHandler]:
|
|
return ExportErrorHandler(logger=self.logger, stream=self)
|
|
|
|
def get_stream_params(self) -> Mapping[str, Any]:
|
|
"""
|
|
Fetch required parameters in a given stream. Used to create sub-streams
|
|
"""
|
|
params = {
|
|
"authenticator": self._http_client._session.auth,
|
|
"region": self.region,
|
|
"project_timezone": self.project_timezone,
|
|
"reqs_per_hour_limit": self.reqs_per_hour_limit,
|
|
}
|
|
if self.project_id:
|
|
params["project_id"] = self.project_id
|
|
return params
|
|
|
|
def request_params(
|
|
self,
|
|
stream_state: Mapping[str, Any],
|
|
stream_slice: Mapping[str, Any] = None,
|
|
next_page_token: Mapping[str, Any] = None,
|
|
) -> MutableMapping[str, Any]:
|
|
if self.project_id:
|
|
return {"project_id": str(self.project_id)}
|
|
return {}
|
|
|
|
|
|
class IncrementalMixpanelStream(MixpanelStream, ABC):
|
|
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]:
|
|
updated_state = latest_record.get(self.cursor_field)
|
|
if updated_state:
|
|
state_value = current_stream_state.get(self.cursor_field)
|
|
if state_value:
|
|
updated_state = max(updated_state, state_value)
|
|
current_stream_state[self.cursor_field] = updated_state
|
|
return current_stream_state
|
|
|
|
|
|
class DateSlicesMixin:
|
|
raise_on_http_errors = True
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._timezone_mismatch = False
|
|
|
|
def parse_response(self, *args, **kwargs):
|
|
if self._timezone_mismatch:
|
|
return []
|
|
yield from super().parse_response(*args, **kwargs)
|
|
|
|
def stream_slices(
|
|
self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
|
|
) -> Iterable[Optional[Mapping[str, Any]]]:
|
|
# use the latest date between self.start_date and stream_state
|
|
start_date = self.start_date
|
|
cursor_value = None
|
|
|
|
if stream_state and self.cursor_field and self.cursor_field in stream_state:
|
|
# Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD')
|
|
# It also means that sync returns duplicated entries for the date from the state (date range is inclusive)
|
|
cursor_value = stream_state[self.cursor_field]
|
|
# This stream is only used for Export stream, so we use export_lookback_window here
|
|
cursor_value = (pendulum.parse(cursor_value) - timedelta(seconds=self.export_lookback_window)).to_iso8601_string()
|
|
stream_state_date = pendulum.parse(stream_state[self.cursor_field])
|
|
start_date = max(start_date, stream_state_date.date())
|
|
|
|
final_lookback_window = max(self.export_lookback_window, self.attribution_window * 24 * 60 * 60)
|
|
# move start_date back <attribution_window> days to sync data since that time as well
|
|
start_date = start_date - timedelta(seconds=final_lookback_window)
|
|
|
|
# end_date cannot be later than today
|
|
end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date())
|
|
|
|
while start_date <= end_date:
|
|
if self._timezone_mismatch:
|
|
return
|
|
current_end_date = start_date + timedelta(days=self.date_window_size - 1) # -1 is needed because dates are inclusive
|
|
stream_slice = {
|
|
"start_date": str(start_date),
|
|
"end_date": str(min(current_end_date, end_date)),
|
|
}
|
|
if cursor_value:
|
|
stream_slice[self.cursor_field] = cursor_value
|
|
yield stream_slice
|
|
# add 1 additional day because date range is inclusive
|
|
start_date = current_end_date + timedelta(days=1)
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
params = super().request_params(stream_state, stream_slice, next_page_token)
|
|
return {
|
|
**params,
|
|
"from_date": stream_slice["start_date"],
|
|
"to_date": stream_slice["end_date"],
|
|
}
|
|
|
|
|
|
class ExportErrorHandler(HttpStatusErrorHandler):
|
|
"""
|
|
Custom error handler for handling export errors specific to Mixpanel streams.
|
|
|
|
This handler addresses:
|
|
- 400 status code with "to_date cannot be later than today" message, indicating a potential timezone mismatch.
|
|
- ConnectionResetError during response parsing, indicating a need to retry the request.
|
|
|
|
If the response does not match these specific cases, the handler defers to the parent class's implementation.
|
|
|
|
Attributes:
|
|
stream (HttpStream): The HTTP stream associated with this error handler.
|
|
"""
|
|
|
|
def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
|
|
self.stream = stream
|
|
super().__init__(**kwargs)
|
|
|
|
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
|
|
if isinstance(response_or_exception, requests.Response):
|
|
if response_or_exception.status_code == requests.codes.bad_request:
|
|
if "to_date cannot be later than today" in response_or_exception.text:
|
|
self.stream._timezone_mismatch = True
|
|
message = (
|
|
"Your project timezone must be misconfigured. Please set it to the one defined in your Mixpanel project settings. "
|
|
"Stopping current stream sync."
|
|
)
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.IGNORE,
|
|
failure_type=FailureType.config_error,
|
|
error_message=message,
|
|
)
|
|
if "Unable to authenticate request" in response_or_exception.text:
|
|
message = (
|
|
f"Your credentials might have expired. Please update your config with valid credentials."
|
|
f" See more details: {response_or_exception.text}"
|
|
)
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.FAIL,
|
|
failure_type=FailureType.config_error,
|
|
error_message=message,
|
|
)
|
|
if response_or_exception.status_code == 402:
|
|
message = f"Unable to perform a request. Payment Required: {response_or_exception.json()['error']}"
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.FAIL,
|
|
failure_type=FailureType.transient_error,
|
|
error_message=message,
|
|
)
|
|
try:
|
|
# trying to parse response to avoid ConnectionResetError and retry if it occurs
|
|
self.stream.iter_dicts(response_or_exception.iter_lines(decode_unicode=True))
|
|
except ConnectionResetError:
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.transient_error,
|
|
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
|
|
)
|
|
return super().interpret_response(response_or_exception)
|
|
|
|
|
|
class ExportSchema(MixpanelStream):
|
|
"""
|
|
Export helper stream for dynamic schema extraction.
|
|
:: reqs_per_hour_limit: int - property is set to the value of 1 million,
|
|
to get the sleep time close to the zero, while generating dynamic schema.
|
|
When `reqs_per_hour_limit = 0` - it means we skip this limits.
|
|
"""
|
|
|
|
primary_key: str = None
|
|
data_field: str = None
|
|
reqs_per_hour_limit: int = 0 # see the docstring
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return "events/properties/top"
|
|
|
|
def process_response(self, response: requests.Response, **kwargs) -> Iterable[str]:
|
|
"""
|
|
response.json() example:
|
|
{
|
|
"$browser": {
|
|
"count": 6
|
|
},
|
|
"$browser_version": {
|
|
"count": 6
|
|
},
|
|
"$current_url": {
|
|
"count": 6
|
|
},
|
|
"mp_lib": {
|
|
"count": 6
|
|
},
|
|
"noninteraction": {
|
|
"count": 6
|
|
},
|
|
"$event_name": {
|
|
"count": 6
|
|
},
|
|
"$duration_s": {},
|
|
"$event_count": {},
|
|
"$origin_end": {},
|
|
"$origin_start": {}
|
|
}
|
|
"""
|
|
records = response.json()
|
|
for property_name in records:
|
|
yield property_name
|
|
|
|
def iter_dicts(self, lines):
|
|
"""
|
|
The incoming stream has to be JSON lines format.
|
|
From time to time for some reason, the one record can be split into multiple lines.
|
|
We try to combine such split parts into one record only if parts go nearby.
|
|
"""
|
|
parts = []
|
|
for record_line in lines:
|
|
if record_line == "terminated early":
|
|
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
|
|
return
|
|
try:
|
|
yield json.loads(record_line)
|
|
except ValueError:
|
|
parts.append(record_line)
|
|
else:
|
|
parts = []
|
|
|
|
if len(parts) > 1:
|
|
try:
|
|
yield json.loads("".join(parts))
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
parts = []
|
|
|
|
|
|
class Export(DateSlicesMixin, IncrementalMixpanelStream):
|
|
"""Export event data as it is received and stored within Mixpanel, complete with all event properties
|
|
(including distinct_id) and the exact timestamp the event was fired.
|
|
|
|
API Docs: https://developer.mixpanel.com/reference/export
|
|
Endpoint: https://data.mixpanel.com/api/2.0/export
|
|
|
|
Raw Export API Rate Limit (https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-API-Endpoints):
|
|
A maximum of 100 concurrent queries,
|
|
3 queries per second and 60 queries per hour.
|
|
"""
|
|
|
|
primary_key: str = None
|
|
cursor_field: str = "time"
|
|
|
|
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
|
|
|
|
@property
|
|
def url_base(self):
|
|
prefix = "-eu" if self.region == "EU" else ""
|
|
return f"https://data{prefix}.mixpanel.com/api/2.0/"
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return "export"
|
|
|
|
def get_error_handler(self) -> Optional[ErrorHandler]:
|
|
return ExportErrorHandler(logger=self.logger, stream=self)
|
|
|
|
def iter_dicts(self, lines):
|
|
"""
|
|
The incoming stream has to be JSON lines format.
|
|
From time to time for some reason, the one record can be split into multiple lines.
|
|
We try to combine such split parts into one record only if parts go nearby.
|
|
"""
|
|
parts = []
|
|
for record_line in lines:
|
|
if record_line == "terminated early":
|
|
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
|
|
return
|
|
try:
|
|
yield json.loads(record_line)
|
|
except ValueError:
|
|
parts.append(record_line)
|
|
else:
|
|
parts = []
|
|
|
|
if len(parts) > 1:
|
|
try:
|
|
yield json.loads("".join(parts))
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
parts = []
|
|
|
|
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
"""Export API return response in JSONL format but each line is a valid JSON object
|
|
Raw item example:
|
|
{
|
|
"event": "Viewed E-commerce Page",
|
|
"properties": {
|
|
"time": 1623860880,
|
|
"distinct_id": "1d694fd9-31a5-4b99-9eef-ae63112063ed",
|
|
"$browser": "Chrome", -> will be renamed to "browser"
|
|
"$browser_version": "91.0.4472.101",
|
|
"$current_url": "https://unblockdata.com/solutions/e-commerce/",
|
|
"$insert_id": "c5eed127-c747-59c8-a5ed-d766f48e39a4",
|
|
"$mp_api_endpoint": "api.mixpanel.com",
|
|
"mp_lib": "Segment: analytics-wordpress",
|
|
"mp_processing_time_ms": 1623886083321,
|
|
"noninteraction": true
|
|
}
|
|
}
|
|
"""
|
|
|
|
# We prefer response.iter_lines() to response.text.split_lines() as the later can missparse text properties embeding linebreaks
|
|
for record in self.iter_dicts(response.iter_lines(decode_unicode=True)):
|
|
# transform record into flat dict structure
|
|
item = {"event": record["event"]}
|
|
properties = record["properties"]
|
|
for result in transform_property_names(properties.keys()):
|
|
# Convert all values to string (this is default property type)
|
|
# because API does not provide properties type information
|
|
item[result.transformed_name] = str(properties[result.source_name])
|
|
|
|
# convert timestamp to datetime string
|
|
item["time"] = pendulum.from_timestamp(int(item["time"]), tz="UTC").to_iso8601_string()
|
|
|
|
yield item
|
|
|
|
@cache
|
|
def get_json_schema(self) -> Mapping[str, Any]:
|
|
"""
|
|
:return: A dict of the JSON schema representing this stream.
|
|
|
|
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
|
|
Override as needed.
|
|
"""
|
|
|
|
schema = super().get_json_schema()
|
|
|
|
# Set whether to allow additional properties for engage and export endpoints
|
|
# Event and Engage properties are dynamic and depend on the properties provided on upload,
|
|
# when the Event or Engage (user/person) was created.
|
|
schema["additionalProperties"] = self.additional_properties
|
|
|
|
# read existing Export schema from API
|
|
schema_properties = ExportSchema(**self.get_stream_params()).read_records(sync_mode=SyncMode.full_refresh)
|
|
for result in transform_property_names(schema_properties):
|
|
# Schema does not provide exact property type
|
|
# string ONLY for event properties (no other datatypes)
|
|
# Reference: https://help.mixpanel.com/hc/en-us/articles/360001355266-Event-Properties#field-size-character-limits-for-event-properties
|
|
schema["properties"][result.transformed_name] = {"type": ["null", "string"]}
|
|
|
|
return schema
|
|
|
|
def request_params(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> MutableMapping[str, Any]:
|
|
params = super().request_params(stream_state, stream_slice, next_page_token)
|
|
# additional filter by timestamp because required start date and end date only allow to filter by date
|
|
cursor_param = stream_slice.get(self.cursor_field)
|
|
if cursor_param:
|
|
timestamp = int(pendulum.parse(cursor_param).timestamp())
|
|
params["where"] = f'properties["$time"]>=datetime({timestamp})'
|
|
return params
|
|
|
|
def request_kwargs(
|
|
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
|
|
) -> Mapping[str, Any]:
|
|
return {"stream": True}
|