1
0
mirror of synced 2025-12-25 02:09:19 -05:00

feat(source-mixpanel): Add Export Lookback Window (#55673)

This commit is contained in:
Anatolii Yatsuk
2025-03-14 17:49:33 +02:00
committed by GitHub
parent 641cf9b414
commit b103b461e6
7 changed files with 86 additions and 6 deletions

View File

@@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 3.4.21
dockerImageTag: 3.5.0
dockerRepository: airbyte/source-mixpanel
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
githubIssueLabel: source-mixpanel

View File

@@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
version = "3.4.21"
version = "3.5.0"
name = "source-mixpanel"
description = "Source implementation for Mixpanel."
authors = ["Airbyte <contact@airbyte.io>"]

View File

@@ -83,6 +83,7 @@ class SourceMixpanel(YamlDeclarativeSource):
date_window_size,
project_id,
page_size,
export_lookback_window,
) = (
config.get("project_timezone", "US/Pacific"),
config.get("start_date"),
@@ -93,6 +94,7 @@ class SourceMixpanel(YamlDeclarativeSource):
config.get("date_window_size", 30),
config.get("credentials", dict()).get("project_id"),
config.get("page_size", 1000),
config.get("export_lookback_window", 0),
)
try:
project_timezone = pendulum.timezone(project_timezone)
@@ -109,6 +111,8 @@ class SourceMixpanel(YamlDeclarativeSource):
raise_config_error("Please provide a valid integer for the `Attribution window` parameter.")
if not isinstance(date_window_size, int) or date_window_size < 1:
raise_config_error("Please provide a valid integer for the `Date slicing window` parameter.")
if not isinstance(export_lookback_window, int) or export_lookback_window < 0:
raise_config_error("Please provide a valid integer for the `Export Lookback Window` parameter.")
auth = self.get_authenticator(config)
if isinstance(auth, TokenAuthenticatorBase64) and project_id:
@@ -126,5 +130,6 @@ class SourceMixpanel(YamlDeclarativeSource):
config["date_window_size"] = date_window_size
config["project_id"] = project_id
config["page_size"] = page_size
config["export_lookback_window"] = export_lookback_window
return config

View File

@@ -127,6 +127,14 @@
"type": "integer",
"minimum": 1,
"default": 1000
},
"export_lookback_window": {
"order": 10,
"title": "Export Lookback Window",
"description": "The number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to delays in event recording. Default is 0 seconds. Must be a non-negative integer.",
"type": "integer",
"minimum": 0,
"default": 0
}
}
}

View File

@@ -59,6 +59,7 @@ class MixpanelStream(HttpStream, ABC):
end_date: Optional[Date] = None,
date_window_size: int = 30, # in days
attribution_window: int = 0, # in days
export_lookback_window: int = 0, # in seconds
select_properties_by_default: bool = True,
project_id: int = None,
reqs_per_hour_limit: int = DEFAULT_REQS_PER_HOUR_LIMIT,
@@ -68,6 +69,7 @@ class MixpanelStream(HttpStream, ABC):
self.end_date = end_date
self.date_window_size = date_window_size
self.attribution_window = attribution_window
self.export_lookback_window = export_lookback_window
self.additional_properties = select_properties_by_default
self.region = region
self.project_timezone = project_timezone
@@ -166,11 +168,14 @@ class DateSlicesMixin:
# Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD')
# It also means that sync returns duplicated entries for the date from the state (date range is inclusive)
cursor_value = stream_state[self.cursor_field]
stream_state_date = pendulum.parse(stream_state[self.cursor_field]).date()
start_date = max(start_date, stream_state_date)
# This stream is only used for Export stream, so we use export_lookback_window here
cursor_value = (pendulum.parse(cursor_value) - timedelta(seconds=self.export_lookback_window)).to_iso8601_string()
stream_state_date = pendulum.parse(stream_state[self.cursor_field])
start_date = max(start_date, stream_state_date.date())
final_lookback_window = max(self.export_lookback_window, self.attribution_window * 24 * 60 * 60)
# move start_date back <attribution_window> days to sync data since that time as well
start_date = start_date - timedelta(days=self.attribution_window)
start_date = start_date - timedelta(seconds=final_lookback_window)
# end_date cannot be later than today
end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date())

View File

@@ -737,3 +737,63 @@ def test_export_iter_dicts(config):
assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record]
# drop record parts because they are not standing nearby
assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record]
def test_export_stream_lookback_window(requests_mock, export_response, config_raw, mocker):
"""Test that export_lookback_window correctly adjusts the start date during incremental sync and verifies slice parameters"""
config_raw["export_lookback_window"] = 7200 # 1 hour lookback
config_raw["start_date"] = "2021-06-01T00:00:00Z"
config_raw["end_date"] = "2021-07-10T00:00:00Z"
stream = init_stream("export", config=config_raw)
# Mock get_json_schema to avoid actual schema fetching
mocker.patch.object(
Export,
"get_json_schema",
return_value={
"type": "object",
"properties": {
"event": {"type": "string"},
"time": {"type": "string"},
"distinct_id": {"type": "string"},
"insert_id": {"type": "string"},
},
},
)
# Mock response with two records at different times in JSONL format
export_response_multiple = (
b'{"event": "Viewed Page", "properties": {"time": 1623860880, "distinct_id": "user1", "$insert_id": "insert1"}}\n'
b'{"event": "Clicked Button", "properties": {"time": 1623864480, "distinct_id": "user2", "$insert_id": "insert2"}}'
)
requests_mock.register_uri(
"GET",
get_url_to_mock(stream),
content=export_response_multiple, # Use content directly for bytes
status_code=200,
)
# State with a timestamp 1 hour ago from the latest record
stream_state = {"time": "2021-06-16T16:28:00Z"}
stream_slices = list(stream.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state))
assert len(stream_slices) > 0 # Ensure we have at least one slice
stream_slice = stream_slices[0]
# Verify slice parameters
expected_start = pendulum.parse("2021-06-16T14:28:00Z") # 16:28:00 - 2 hours due to lookback
expected_end = pendulum.parse("2021-07-10T00:00:00Z") # From config end_date
# Note: start_date might differ due to date_window_size slicing, adjust if needed
assert pendulum.parse(stream_slice["start_date"]) == pendulum.parse("2021-06-11T00:00:00Z") # Adjusted by attribution_window
assert pendulum.parse(stream_slice["end_date"]) == expected_end
assert pendulum.parse(stream_slice["time"]) == expected_start
# Read records and verify both are included due to lookback
records = list(stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice))
assert len(records) == 2
# Verify updated state is set to the latest record time
new_state = stream.get_updated_state(stream_state, records[-1])
assert new_state["time"] == "2021-06-16T17:28:00Z"

View File

@@ -20,7 +20,8 @@ To set up the Mixpanel source connector, you'll need a Mixpanel [Service Account
10. For **End Date**, enter the date in YYYY-MM-DD format.
11. For **Region**, enter the [region](https://help.mixpanel.com/hc/en-us/articles/360039135652-Data-Residency-in-EU) for your Mixpanel project.
12. For **Date slicing window**, enter the number of days to slice through data. If you encounter RAM usage issues due to a huge amount of data in each window, try using a lower value for this parameter.
13. Click **Set up source**.
13. For **Export Lookback Window**, enter the number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This ensures no data is missed due to event recording delays. Default is 0 seconds.
14. Click **Set up source**.
## Supported sync modes
@@ -58,6 +59,7 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.5.0 | 2025-03-10 | [55673](https://github.com/airbytehq/airbyte/pull/55673) | Add Export Lookback Window |
| 3.4.21 | 2025-03-06 | [55224](https://github.com/airbytehq/airbyte/pull/55224) | Migrate streams to latest endpoint |
| 3.4.20 | 2025-03-01 | [54769](https://github.com/airbytehq/airbyte/pull/54769) | Update dependencies |
| 3.4.19 | 2025-02-22 | [54319](https://github.com/airbytehq/airbyte/pull/54319) | Update dependencies |